diff --git a/Destination.cpp b/Destination.cpp index 13431b33..da877f53 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -9,9 +9,9 @@ namespace i2p { -namespace stream +namespace client { - StreamingDestination::StreamingDestination (bool isPublic, i2p::data::SigningKeyType sigType): + ClientDestination::ClientDestination (bool isPublic, i2p::data::SigningKeyType sigType): m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) { @@ -23,7 +23,7 @@ namespace stream LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created"); } - StreamingDestination::StreamingDestination (const std::string& fullPath, bool isPublic): + ClientDestination::ClientDestination (const std::string& fullPath, bool isPublic): m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) { @@ -58,7 +58,7 @@ namespace stream m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel } - StreamingDestination::StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic): + ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic): m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_Keys (keys), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) { @@ -69,7 +69,7 @@ namespace stream LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created"); } - StreamingDestination::~StreamingDestination () + ClientDestination::~ClientDestination () { Stop (); for (auto it: m_RemoteLeaseSets) @@ -81,30 +81,23 @@ namespace stream delete m_Service; } - void StreamingDestination::Run () + void ClientDestination::Run () { if (m_Service) m_Service->run (); } - void StreamingDestination::Start () + void ClientDestination::Start () { m_Service = new boost::asio::io_service; m_Work = new boost::asio::io_service::work (*m_Service); m_Pool->SetActive (true); m_IsRunning = true; - m_Thread = new std::thread (std::bind (&StreamingDestination::Run, this)); + m_Thread = new std::thread (std::bind (&ClientDestination::Run, this)); } - void StreamingDestination::Stop () + void ClientDestination::Stop () { - ResetAcceptor (); - { - std::unique_lock l(m_StreamsMutex); - for (auto it: m_Streams) - delete it.second; - m_Streams.clear (); - } if (m_Pool) i2p::tunnel::tunnels.StopTunnelPool (m_Pool); m_IsRunning = false; @@ -120,7 +113,54 @@ namespace stream delete m_Service; m_Service = nullptr; } - void StreamingDestination::SendTunnelDataMsgs (const std::vector& msgs) + const i2p::data::LeaseSet * ClientDestination::FindLeaseSet (const i2p::data::IdentHash& ident) + { + auto it = m_RemoteLeaseSets.find (ident); + if (it != m_RemoteLeaseSets.end ()) + { + if (it->second->HasNonExpiredLeases ()) + return it->second; + else + { + LogPrint ("All leases of remote LeaseSet expired. Request it"); + i2p::data::netdb.RequestDestination (ident, true, m_Pool); + } + } + else + { + auto ls = i2p::data::netdb.FindLeaseSet (ident); + if (ls) + { + ls = new i2p::data::LeaseSet (*ls); + m_RemoteLeaseSets[ident] = ls; + return ls; + } + } + return nullptr; + } + + const i2p::data::LeaseSet * ClientDestination::GetLeaseSet () + { + if (!m_Pool) return nullptr; + if (!m_LeaseSet) + UpdateLeaseSet (); + return m_LeaseSet; + } + + void ClientDestination::UpdateLeaseSet () + { + auto newLeaseSet = new i2p::data::LeaseSet (*m_Pool); + if (!m_LeaseSet) + m_LeaseSet = newLeaseSet; + else + { + // TODO: implement it better + *m_LeaseSet = *newLeaseSet; + delete newLeaseSet; + } + } + + void ClientDestination::SendTunnelDataMsgs (const std::vector& msgs) { m_CurrentOutboundTunnel = m_Pool->GetNextOutboundTunnel (m_CurrentOutboundTunnel); if (m_CurrentOutboundTunnel) @@ -133,6 +173,141 @@ namespace stream } } + void ClientDestination::ProcessGarlicMessage (I2NPMessage * msg) + { + m_Service->post (boost::bind (&ClientDestination::HandleGarlicMessage, this, msg)); + } + + void ClientDestination::ProcessDeliveryStatusMessage (I2NPMessage * msg) + { + m_Service->post (boost::bind (&ClientDestination::HandleDeliveryStatusMessage, this, msg)); + } + + void ClientDestination::HandleI2NPMessage (const uint8_t * buf, size_t len, i2p::tunnel::InboundTunnel * from) + { + I2NPHeader * header = (I2NPHeader *)buf; + switch (header->typeID) + { + case eI2NPData: + HandleDataMessage (buf + sizeof (I2NPHeader), be16toh (header->size)); + break; + case eI2NPDatabaseStore: + HandleDatabaseStoreMessage (buf + sizeof (I2NPHeader), be16toh (header->size)); + i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); // TODO: remove + break; + default: + i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); + } + } + + void ClientDestination::HandleDatabaseStoreMessage (const uint8_t * buf, size_t len) + { + I2NPDatabaseStoreMsg * msg = (I2NPDatabaseStoreMsg *)buf; + size_t offset = sizeof (I2NPDatabaseStoreMsg); + if (msg->replyToken) // TODO: + offset += 36; + if (msg->type == 1) // LeaseSet + { + LogPrint ("Remote LeaseSet"); + auto it = m_RemoteLeaseSets.find (msg->key); + if (it != m_RemoteLeaseSets.end ()) + { + it->second->Update (buf + offset, len - offset); + LogPrint ("Remote LeaseSet updated"); + } + else + { + LogPrint ("New remote LeaseSet added"); + m_RemoteLeaseSets[msg->key] = new i2p::data::LeaseSet (buf + offset, len - offset); + } + } + else + LogPrint ("Unexpected client's DatabaseStore type ", msg->type, ". Dropped"); + } + + void ClientDestination::SetLeaseSetUpdated () + { + i2p::garlic::GarlicDestination::SetLeaseSetUpdated (); + UpdateLeaseSet (); + if (m_IsPublic) + i2p::data::netdb.PublishLeaseSet (m_LeaseSet, m_Pool); + } + + void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len) + { + uint32_t length = be32toh (*(uint32_t *)buf); + buf += 4; + // we assume I2CP payload + if (buf[9] == 6) // streaming protocol + { + // unzip it + CryptoPP::Gunzip decompressor; + decompressor.Put (buf, length); + decompressor.MessageEnd(); + i2p::stream::Packet * uncompressed = new i2p::stream::Packet; + uncompressed->offset = 0; + uncompressed->len = decompressor.MaxRetrievable (); + if (uncompressed->len <= i2p::stream::MAX_PACKET_SIZE) + { + decompressor.Get (uncompressed->buf, uncompressed->len); + HandleNextPacket (uncompressed); + } + else + { + LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size. Skipped"); + decompressor.Skip (); + delete uncompressed; + } + } + else + LogPrint ("Data: unexpected protocol ", buf[9]); + } + + I2NPMessage * ClientDestination::CreateDataMessage (const uint8_t * payload, size_t len) + { + I2NPMessage * msg = NewI2NPShortMessage (); + CryptoPP::Gzip compressor; + if (len <= i2p::stream::COMPRESSION_THRESHOLD_SIZE) + compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL); + else + compressor.SetDeflateLevel (CryptoPP::Gzip::DEFAULT_DEFLATE_LEVEL); + compressor.Put (payload, len); + compressor.MessageEnd(); + int size = compressor.MaxRetrievable (); + uint8_t * buf = msg->GetPayload (); + *(uint32_t *)buf = htobe32 (size); // length + buf += 4; + compressor.Get (buf, size); + memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later + buf[9] = 6; // streaming protocol + msg->len += size + 4; + FillI2NPMessageHeader (msg, eI2NPData); + + return msg; + } +} + +namespace stream +{ + + void StreamingDestination::Start () + { + ClientDestination::Start (); + } + + void StreamingDestination::Stop () + { + ResetAcceptor (); + { + std::unique_lock l(m_StreamsMutex); + for (auto it: m_Streams) + delete it.second; + m_Streams.clear (); + } + ClientDestination::Stop (); + } + + void StreamingDestination::HandleNextPacket (Packet * packet) { uint32_t sendStreamID = packet->GetSendStreamID (); @@ -163,7 +338,7 @@ namespace stream Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote) { - Stream * s = new Stream (*m_Service, *this, remote); + Stream * s = new Stream (*GetService (), *this, remote); std::unique_lock l(m_StreamsMutex); m_Streams[s->GetRecvStreamID ()] = s; return s; @@ -171,7 +346,7 @@ namespace stream Stream * StreamingDestination::CreateNewIncomingStream () { - Stream * s = new Stream (*m_Service, *this); + Stream * s = new Stream (*GetService (), *this); std::unique_lock l(m_StreamsMutex); m_Streams[s->GetRecvStreamID ()] = s; return s; @@ -186,172 +361,12 @@ namespace stream if (it != m_Streams.end ()) { m_Streams.erase (it); - if (m_Service) - m_Service->post ([stream](void) { delete stream; }); + if (GetService ()) + GetService ()->post ([stream](void) { delete stream; }); else delete stream; } } } - - const i2p::data::LeaseSet * StreamingDestination::GetLeaseSet () - { - if (!m_Pool) return nullptr; - if (!m_LeaseSet) - UpdateLeaseSet (); - return m_LeaseSet; - } - - void StreamingDestination::UpdateLeaseSet () - { - auto newLeaseSet = new i2p::data::LeaseSet (*m_Pool); - if (!m_LeaseSet) - m_LeaseSet = newLeaseSet; - else - { - // TODO: implement it better - *m_LeaseSet = *newLeaseSet; - delete newLeaseSet; - } - } - - void StreamingDestination::HandleDataMessage (const uint8_t * buf, size_t len) - { - uint32_t length = be32toh (*(uint32_t *)buf); - buf += 4; - // we assume I2CP payload - if (buf[9] == 6) // streaming protocol - { - // unzip it - CryptoPP::Gunzip decompressor; - decompressor.Put (buf, length); - decompressor.MessageEnd(); - Packet * uncompressed = new Packet; - uncompressed->offset = 0; - uncompressed->len = decompressor.MaxRetrievable (); - if (uncompressed->len <= MAX_PACKET_SIZE) - { - decompressor.Get (uncompressed->buf, uncompressed->len); - HandleNextPacket (uncompressed); - } - else - { - LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size. Skipped"); - decompressor.Skip (); - delete uncompressed; - } - } - else - LogPrint ("Data: unexpected protocol ", buf[9]); - } - - I2NPMessage * StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len) - { - I2NPMessage * msg = NewI2NPShortMessage (); - CryptoPP::Gzip compressor; - if (len <= COMPRESSION_THRESHOLD_SIZE) - compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL); - else - compressor.SetDeflateLevel (CryptoPP::Gzip::DEFAULT_DEFLATE_LEVEL); - compressor.Put (payload, len); - compressor.MessageEnd(); - int size = compressor.MaxRetrievable (); - uint8_t * buf = msg->GetPayload (); - *(uint32_t *)buf = htobe32 (size); // length - buf += 4; - compressor.Get (buf, size); - memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later - buf[9] = 6; // streaming protocol - msg->len += size + 4; - FillI2NPMessageHeader (msg, eI2NPData); - - return msg; - } - - void StreamingDestination::SetLeaseSetUpdated () - { - i2p::garlic::GarlicDestination::SetLeaseSetUpdated (); - UpdateLeaseSet (); - if (m_IsPublic) - i2p::data::netdb.PublishLeaseSet (m_LeaseSet, m_Pool); - } - - void StreamingDestination::ProcessGarlicMessage (I2NPMessage * msg) - { - m_Service->post (boost::bind (&StreamingDestination::HandleGarlicMessage, this, msg)); - } - - void StreamingDestination::ProcessDeliveryStatusMessage (I2NPMessage * msg) - { - m_Service->post (boost::bind (&StreamingDestination::HandleDeliveryStatusMessage, this, msg)); - } - - void StreamingDestination::HandleI2NPMessage (const uint8_t * buf, size_t len, i2p::tunnel::InboundTunnel * from) - { - I2NPHeader * header = (I2NPHeader *)buf; - switch (header->typeID) - { - case eI2NPData: - HandleDataMessage (buf + sizeof (I2NPHeader), be16toh (header->size)); - break; - case eI2NPDatabaseStore: - HandleDatabaseStoreMessage (buf + sizeof (I2NPHeader), be16toh (header->size)); - i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); // TODO: remove - break; - default: - i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); - } - } - - void StreamingDestination::HandleDatabaseStoreMessage (const uint8_t * buf, size_t len) - { - I2NPDatabaseStoreMsg * msg = (I2NPDatabaseStoreMsg *)buf; - size_t offset = sizeof (I2NPDatabaseStoreMsg); - if (msg->replyToken) // TODO: - offset += 36; - if (msg->type == 1) // LeaseSet - { - LogPrint ("Remote LeaseSet"); - auto it = m_RemoteLeaseSets.find (msg->key); - if (it != m_RemoteLeaseSets.end ()) - { - it->second->Update (buf + offset, len - offset); - LogPrint ("Remote LeaseSet updated"); - } - else - { - LogPrint ("New remote LeaseSet added"); - m_RemoteLeaseSets[msg->key] = new i2p::data::LeaseSet (buf + offset, len - offset); - } - } - else - LogPrint ("Unexpected client's DatabaseStore type ", msg->type, ". Dropped"); - } - - const i2p::data::LeaseSet * StreamingDestination::FindLeaseSet (const i2p::data::IdentHash& ident) - { - auto it = m_RemoteLeaseSets.find (ident); - if (it != m_RemoteLeaseSets.end ()) - { - if (it->second->HasNonExpiredLeases ()) - return it->second; - else - { - LogPrint ("All leases of remote LeaseSet expired. Request it"); - i2p::data::netdb.RequestDestination (ident, true, m_Pool); - } - } - else - { - auto ls = i2p::data::netdb.FindLeaseSet (ident); - if (ls) - { - ls = new i2p::data::LeaseSet (*ls); - m_RemoteLeaseSets[ident] = ls; - return ls; - } - } - return nullptr; - } } } diff --git a/Destination.h b/Destination.h index 809856a5..23fd6864 100644 --- a/Destination.h +++ b/Destination.h @@ -12,41 +12,32 @@ namespace i2p { -namespace stream +namespace client { - class StreamingDestination: public i2p::garlic::GarlicDestination + class ClientDestination: public i2p::garlic::GarlicDestination { public: - StreamingDestination (bool isPublic, i2p::data::SigningKeyType sigType); - StreamingDestination (const std::string& fullPath, bool isPublic); - StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic); - ~StreamingDestination (); + ClientDestination (bool isPublic, i2p::data::SigningKeyType sigType); + ClientDestination (const std::string& fullPath, bool isPublic); + ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic); + ~ClientDestination (); - void Start (); - void Stop (); + virtual void Start (); + virtual void Stop (); bool IsRunning () const { return m_IsRunning; }; - - i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; }; + boost::asio::io_service * GetService () { return m_Service; }; + i2p::tunnel::TunnelPool * GetTunnelPool () { return m_Pool; }; - Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); - void DeleteStream (Stream * stream); - void SetAcceptor (const std::function& acceptor) { m_Acceptor = acceptor; }; - void ResetAcceptor () { m_Acceptor = nullptr; }; - bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; - void HandleNextPacket (Packet * packet); - void SendTunnelDataMsgs (const std::vector& msgs); void ResetCurrentOutboundTunnel () { m_CurrentOutboundTunnel = nullptr; }; const i2p::data::LeaseSet * FindLeaseSet (const i2p::data::IdentHash& ident); - // I2CP - void HandleDataMessage (const uint8_t * buf, size_t len); - I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len); + void SendTunnelDataMsgs (const std::vector& msgs); // implements LocalDestination const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; }; const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; }; const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; }; - + // implements GarlicDestination const i2p::data::LeaseSet * GetLeaseSet (); void HandleI2NPMessage (const uint8_t * buf, size_t len, i2p::tunnel::InboundTunnel * from); @@ -56,12 +47,19 @@ namespace stream void ProcessDeliveryStatusMessage (I2NPMessage * msg); void SetLeaseSetUpdated (); - private: - - void Run (); - Stream * CreateNewIncomingStream (); + // I2CP + void HandleDataMessage (const uint8_t * buf, size_t len); + I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len); + + protected: + + virtual void HandleNextPacket (i2p::stream::Packet * packet) = 0; // TODO + + private: + + void Run (); void UpdateLeaseSet (); - void HandleDatabaseStoreMessage (const uint8_t * buf, size_t len); + void HandleDatabaseStoreMessage (const uint8_t * buf, size_t len); private: @@ -69,24 +67,61 @@ namespace stream std::thread * m_Thread; boost::asio::io_service * m_Service; boost::asio::io_service::work * m_Work; - std::map m_RemoteLeaseSets; - - std::mutex m_StreamsMutex; - std::map m_Streams; i2p::data::PrivateKeys m_Keys; uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256]; - + std::map m_RemoteLeaseSets; + i2p::tunnel::TunnelPool * m_Pool; i2p::tunnel::OutboundTunnel * m_CurrentOutboundTunnel; i2p::data::LeaseSet * m_LeaseSet; - bool m_IsPublic; + bool m_IsPublic; + + public: + // for HTTP only + int GetNumRemoteLeaseSets () const { return m_RemoteLeaseSets.size (); }; + }; +} + +namespace stream +{ + class StreamingDestination: public i2p::client::ClientDestination + { + public: + + StreamingDestination (bool isPublic, i2p::data::SigningKeyType sigType): + ClientDestination (isPublic, sigType) {}; + StreamingDestination (const std::string& fullPath, bool isPublic): + ClientDestination (fullPath, isPublic) {}; + StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic): + ClientDestination (keys, isPublic) {}; + ~StreamingDestination () {}; + + void Start (); + void Stop (); + + Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); + void DeleteStream (Stream * stream); + void SetAcceptor (const std::function& acceptor) { m_Acceptor = acceptor; }; + void ResetAcceptor () { m_Acceptor = nullptr; }; + bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; + + // ClientDestination + void HandleNextPacket (Packet * packet); + + private: + + Stream * CreateNewIncomingStream (); + + private: + + std::mutex m_StreamsMutex; + std::map m_Streams; std::function m_Acceptor; public: // for HTTP only - int GetNumRemoteLeaseSets () const { return m_RemoteLeaseSets.size (); }; const decltype(m_Streams)& GetStreams () const { return m_Streams; }; }; }