diff --git a/SSU.cpp b/SSU.cpp index a036d1ff..d033ec36 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -72,21 +72,17 @@ namespace ssu // session confirmed ProcessSessionConfirmed (buf, len); break; - case eSessionRelayRequestSent: + case eSessionStateRelayRequestSent: // relay response - ProcessRelayResponse (buf,len); + ProcessRelayResponse (buf, len); + m_Server.DeleteSession (this); break; - case eSessionRelayResponseReceived: + case eSessionStateIntroduced: // HolePunch received LogPrint ("SSU HolePuch of ", len, " bytes received"); - m_State = eSessionStateEstablished; - Established (); - break; - case eSessionRelayRequestReceived: - // HolePunch m_State = eSessionStateUnknown; Connect (); - break; + break; default: LogPrint ("SSU state not implemented yet"); } @@ -114,6 +110,10 @@ namespace ssu m_Server.DeleteSession (this); // delete this break; } + case PAYLOAD_TYPE_RELAY_RESPONSE: + LogPrint ("SSU relay response received though established session"); + // Ignore it for now + break; case PAYLOAD_TYPE_RELAY_INTRO: LogPrint ("SSU relay intro received"); ProcessRelayIntro (buf + sizeof (SSUHeader), len - sizeof (SSUHeader)); @@ -151,7 +151,7 @@ namespace ssu if (!Validate (buf, len, introKey)) { LogPrint ("MAC verification intro key failed"); - Failed (); + m_Server.DeleteSession (this); return; } @@ -273,7 +273,7 @@ namespace ssu m_Server.Send (buf, 304, m_RemoteEndpoint); } - void SSUSession::SendRelayRequest (const i2p::data::RouterInfo::Introducer& introducer) + void SSUSession::SendRelayRequest (uint32_t iTag, const uint8_t * iKey) { auto address = i2p::context.GetRouterInfo ().GetSSUAddress (); if (!address) @@ -284,7 +284,7 @@ namespace ssu uint8_t buf[96 + 18]; uint8_t * payload = buf + sizeof (SSUHeader); - *(uint32_t *)payload = htobe32 (introducer.iTag); + *(uint32_t *)payload = htobe32 (iTag); payload += 4; *payload = 0; // no address payload++; @@ -299,8 +299,13 @@ namespace ssu uint8_t iv[16]; rnd.GenerateBlock (iv, 16); // random iv - FillHeaderAndEncrypt (PAYLOAD_TYPE_RELAY_REQUEST, buf, 96, introducer.iKey, iv, introducer.iKey); - m_State = eSessionRelayRequestSent; + if (m_State == eSessionStateEstablished) + FillHeaderAndEncrypt (PAYLOAD_TYPE_RELAY_REQUEST, buf, 96, m_SessionKey, iv, m_MacKey); + else + { + FillHeaderAndEncrypt (PAYLOAD_TYPE_RELAY_REQUEST, buf, 96, iKey, iv, iKey); + m_State = eSessionStateRelayRequestSent; + } m_Server.Send (buf, 96, m_RemoteEndpoint); } @@ -404,27 +409,22 @@ namespace ssu { Decrypt (buf, len, address->key); SSUHeader * header = (SSUHeader *)buf; - if ((header->flag >> 4) == PAYLOAD_TYPE_RELAY_RESPONSE) + if (header->GetPayloadType () == PAYLOAD_TYPE_RELAY_RESPONSE) { LogPrint ("Relay response received"); - m_State = eSessionRelayRequestReceived; uint8_t * payload = buf + sizeof (SSUHeader); - payload++; - boost::asio::ip::address_v4 remoteIP (be32toh (*(uint32_t* )(payload))); - payload += 4; - uint16_t remotePort = be16toh (*(uint16_t *)(payload)); - payload += 2; - boost::asio::ip::udp::endpoint newRemoteEndpoint(remoteIP, remotePort); - m_Server.ReassignSession (m_RemoteEndpoint, newRemoteEndpoint); - m_RemoteEndpoint = newRemoteEndpoint; - payload++; + payload++; // remote size + //boost::asio::ip::address_v4 remoteIP (be32toh (*(uint32_t* )(payload))); + payload += 4; // remote address + //uint16_t remotePort = be16toh (*(uint16_t *)(payload)); + payload += 2; // remote port + payload++; // our size boost::asio::ip::address_v4 ourIP (be32toh (*(uint32_t* )(payload))); - payload += 4; + payload += 4; // our address uint16_t ourPort = be16toh (*(uint16_t *)(payload)); - payload += 2; + payload += 2; // our port LogPrint ("Our external address is ", ourIP.to_string (), ":", ourPort); i2p::context.UpdateAddress (ourIP.to_string ().c_str ()); - m_State= eSessionRelayResponseReceived; } else LogPrint ("Unexpected payload type ", (int)(header->flag >> 4)); @@ -524,16 +524,25 @@ namespace ssu } } - void SSUSession::ConnectThroughIntroducer (const i2p::data::RouterInfo::Introducer& introducer) + void SSUSession::Introduce (uint32_t iTag, const uint8_t * iKey) { if (m_State == eSessionStateUnknown) { // set connect timer m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); m_Timer.async_wait (boost::bind (&SSUSession::HandleConnectTimer, - this, boost::asio::placeholders::error)); - SendRelayRequest (introducer); - } + this, boost::asio::placeholders::error)); + } + SendRelayRequest (iTag, iKey); + } + + void SSUSession::WaitForIntroduction () + { + m_State = eSessionStateIntroduced; + // set connect timer + m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); + m_Timer.async_wait (boost::bind (&SSUSession::HandleConnectTimer, + this, boost::asio::placeholders::error)); } void SSUSession::Close () @@ -556,7 +565,7 @@ namespace ssu Send (it); m_DelayedMessages.clear (); } - if (m_PeerTest) + if (m_PeerTest && (m_RemoteRouter && m_RemoteRouter->IsPeerTesting ())) SendPeerTest (); ScheduleTermination (); } @@ -723,19 +732,46 @@ namespace ssu uint16_t port = *(uint16_t *)buf; // use it as is buf += 2; // port uint8_t * introKey = buf; - if (port) + if (port && !address) { - LogPrint ("SSU peer test. We are Charlie"); - Send (PAYLOAD_TYPE_PEER_TEST, buf1, len); // back to Bob - if (address) - SendPeerTest (nonce, be32toh (*(uint32_t *)address), be16toh (port), introKey); // to Alice + LogPrint ("Address of ", size, " bytes not supported"); + return; + } + if (m_PeerTestNonces.count (nonce) > 0) + { + // existing test + if (m_PeerTest) + { + LogPrint ("SSU peer test from Bob. We are Alice"); + m_PeerTestNonces.erase (nonce); + m_PeerTest = false; + } + else if (port) + { + LogPrint ("SSU peer test from Charlie. We are Bob"); + // TODO: back to Alice + } else - LogPrint ("Address of ", size, " bytes not supported"); + { + LogPrint ("SSU peer test from Alice. We are Charlie"); + //SendPeerTest (nonce, be32toh (*(uint32_t *)address), be16toh (port), introKey); // to Alice + } } else { - LogPrint ("SSU peer test. We are Bob"); - // TODO: + // new test + m_PeerTestNonces.insert (nonce); + if (port) + { + LogPrint ("SSU peer test from Bob. We are Charlie"); + Send (PAYLOAD_TYPE_PEER_TEST, buf1, len); // back to Bob + SendPeerTest (nonce, be32toh (*(uint32_t *)address), be16toh (port), introKey); // to Alice + } + else + { + LogPrint ("SSU peer test from Alice. We are Bob"); + // TODO: find Charlie + } } } @@ -777,6 +813,7 @@ namespace ssu CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator (); uint32_t nonce = 0; rnd.GenerateWord32 (nonce); + m_PeerTestNonces.insert (nonce); *(uint32_t *)payload = htobe32 (nonce); payload += 4; // nonce *payload = 4; @@ -813,11 +850,11 @@ namespace ssu void SSUSession::SendSesionDestroyed () { - uint8_t buf[48 + 18], iv[16]; - CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator (); - rnd.GenerateBlock (iv, 16); // random iv - if (m_State == eSessionStateEstablished) + if (HasSessionKey ()) { + uint8_t buf[48 + 18], iv[16]; + CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator (); + rnd.GenerateBlock (iv, 16); // random iv // encrypt message with session key FillHeaderAndEncrypt (PAYLOAD_TYPE_SESSION_DESTROYED, buf, 48, m_SessionKey, iv, m_MacKey); m_Server.Send (buf, 48, m_RemoteEndpoint); @@ -977,11 +1014,12 @@ namespace ssu else { // otherwise create new session + session = new SSUSession (*this, remoteEndpoint, router, peerTest); + m_Sessions[remoteEndpoint] = session; + if (!router->UsesIntroducer ()) { - // connect directly - session = new SSUSession (*this, remoteEndpoint, router, peerTest); - m_Sessions[remoteEndpoint] = session; + // connect directly LogPrint ("Creating new SSU session to [", router->GetIdentHashAbbreviation (), "] ", remoteEndpoint.address ().to_string (), ":", remoteEndpoint.port ()); session->Connect (); @@ -989,24 +1027,27 @@ namespace ssu else { // connect through introducer + session->WaitForIntroduction (); if (address->introducers.size () > 0) { auto& introducer = address->introducers[0]; // TODO: boost::asio::ip::udp::endpoint introducerEndpoint (introducer.iHost, introducer.iPort); - it = m_Sessions.find (introducerEndpoint); - if (it == m_Sessions.end ()) - { - session = new SSUSession (*this, introducerEndpoint, router); - m_Sessions[introducerEndpoint] = session; - LogPrint ("Creating new SSU session to [", router->GetIdentHashAbbreviation (), + LogPrint ("Creating new SSU session to [", router->GetIdentHashAbbreviation (), "] through introducer ", introducerEndpoint.address ().to_string (), ":", introducerEndpoint.port ()); - session->ConnectThroughIntroducer (introducer); - } - else + it = m_Sessions.find (introducerEndpoint); + SSUSession * introducerSession = nullptr; + if (it != m_Sessions.end ()) { LogPrint ("Session to introducer already exists"); - // TODO: + introducerSession = it->second; } + else + { + LogPrint ("New session to introducer created"); + introducerSession = new SSUSession (*this, introducerEndpoint, router); + m_Sessions[introducerEndpoint] = introducerSession; + } + introducerSession->Introduce (introducer.iTag, introducer.iKey); } else LogPrint ("Router is unreachable, but not introducers presentd. Ignored"); @@ -1038,19 +1079,6 @@ namespace ssu } m_Sessions.clear (); } - - void SSUServer::ReassignSession (const boost::asio::ip::udp::endpoint& oldEndpoint, const boost::asio::ip::udp::endpoint& newEndpoint) - { - auto it = m_Sessions.find (oldEndpoint); - if (it != m_Sessions.end ()) - { - auto session = it->second; - m_Sessions.erase (it); - m_Sessions[newEndpoint] = session; - LogPrint ("SSU session reassigned from ", oldEndpoint.address ().to_string (), ":", oldEndpoint.port (), - " to ", newEndpoint.address ().to_string (), ":", newEndpoint.port ()); - } - } } } diff --git a/SSU.h b/SSU.h index ab60c71b..3f7ac96b 100644 --- a/SSU.h +++ b/SSU.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -60,9 +61,9 @@ namespace ssu eSessionStateCreatedReceived, eSessionStateConfirmedSent, eSessionStateConfirmedReceived, - eSessionRelayRequestSent, - eSessionRelayRequestReceived, - eSessionRelayResponseReceived, + eSessionStateRelayRequestSent, + eSessionStateRelayRequestReceived, + eSessionStateIntroduced, eSessionStateEstablished, eSessionStateFailed }; @@ -78,7 +79,8 @@ namespace ssu ~SSUSession (); void Connect (); - void ConnectThroughIntroducer (const i2p::data::RouterInfo::Introducer& introducer); + void Introduce (uint32_t iTag, const uint8_t * iKey); + void WaitForIntroduction (); void Close (); boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; }; const i2p::data::RouterInfo * GetRemoteRouter () const { return m_RemoteRouter; }; @@ -94,7 +96,7 @@ namespace ssu void ProcessSessionRequest (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void SendSessionRequest (); - void SendRelayRequest (const i2p::data::RouterInfo::Introducer& introducer); + void SendRelayRequest (uint32_t iTag, const uint8_t * iKey); void ProcessSessionCreated (uint8_t * buf, size_t len); void SendSessionCreated (const uint8_t * x); void ProcessSessionConfirmed (uint8_t * buf, size_t len); @@ -116,6 +118,7 @@ namespace ssu void Decrypt (uint8_t * buf, size_t len, const uint8_t * aesKey); bool Validate (uint8_t * buf, size_t len, const uint8_t * macKey); const uint8_t * GetIntroKey () const; + bool HasSessionKey () const { return m_State == eSessionStateCreatedReceived || m_State == eSessionStateRequestReceived; }; void ScheduleTermination (); void HandleTerminationTimer (const boost::system::error_code& ecode); @@ -130,6 +133,7 @@ namespace ssu bool m_PeerTest; SessionState m_State; uint32_t m_RelayTag; + std::set m_PeerTestNonces; CryptoPP::CBC_Mode::Encryption m_Encryption; CryptoPP::CBC_Mode::Decryption m_Decryption; uint8_t m_SessionKey[32], m_MacKey[32]; @@ -153,7 +157,6 @@ namespace ssu boost::asio::io_service& GetService () { return m_Socket.get_io_service(); }; const boost::asio::ip::udp::endpoint& GetEndpoint () const { return m_Endpoint; }; void Send (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& to); - void ReassignSession (const boost::asio::ip::udp::endpoint& oldEndpoint, const boost::asio::ip::udp::endpoint& newEndpoint); private: diff --git a/Streaming.cpp b/Streaming.cpp index 090a0d04..f8c63378 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -242,20 +242,6 @@ namespace stream m_ReceiveQueue.WakeUp (); } } - - size_t Stream::Receive (uint8_t * buf, size_t len, int timeout) - { - if (!m_IsOpen) return 0; - if (m_ReceiveQueue.IsEmpty ()) - { - if (!timeout) return 0; - if (!m_ReceiveQueue.Wait (timeout, 0)) - return 0; - } - - // either non-empty or we have received something - return ConcatenatePackets (buf, len); - } size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) { diff --git a/Streaming.h b/Streaming.h index b1706e4a..4d597345 100644 --- a/Streaming.h +++ b/Streaming.h @@ -80,7 +80,7 @@ namespace stream void HandleNextPacket (Packet * packet); size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds - size_t Receive (uint8_t * buf, size_t len, int timeout = 0); // returns 0 if timeout expired + template void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0);