diff --git a/Crypto.h b/Crypto.h index 00255a37..3f99f2b6 100644 --- a/Crypto.h +++ b/Crypto.h @@ -325,6 +325,10 @@ inline void DH_get0_key(const DH *dh, const BIGNUM **pub_key, const BIGNUM **pri inline RSA *EVP_PKEY_get0_RSA(EVP_PKEY *pkey) { return pkey->pkey.rsa; } + +// ssl +#define TLS_method TLSv1_method + #endif #endif diff --git a/DaemonLinux.cpp b/DaemonLinux.cpp index 07e27e9d..416b6d2c 100644 --- a/DaemonLinux.cpp +++ b/DaemonLinux.cpp @@ -21,20 +21,23 @@ void handle_signal(int sig) switch (sig) { case SIGHUP: - LogPrint(eLogInfo, "Daemon: Got SIGHUP, reopening logs and tunnel configuration..."); - i2p::log::Logger().Reopen (); + LogPrint(eLogInfo, "Daemon: Got SIGHUP, reopening tunnel configuration..."); i2p::client::context.ReloadConfig(); break; + case SIGUSR1: + LogPrint(eLogInfo, "Daemon: Got SIGUSR1, reopening logs..."); + i2p::log::Logger().Reopen (); + break; case SIGINT: if (i2p::context.AcceptsTunnels () && !Daemon.gracefulShutdownInterval) - { + { i2p::context.SetAcceptsTunnels (false); Daemon.gracefulShutdownInterval = 10*60; // 10 minutes LogPrint(eLogInfo, "Graceful shutdown after ", Daemon.gracefulShutdownInterval, " seconds"); - } + } else - Daemon.running = 0; - break; + Daemon.running = 0; + break; case SIGABRT: case SIGTERM: Daemon.running = 0; // Exit loop @@ -77,7 +80,7 @@ namespace i2p } // point std{in,out,err} descriptors to /dev/null - freopen("/dev/null", "r", stdin); + freopen("/dev/null", "r", stdin); freopen("/dev/null", "w", stdout); freopen("/dev/null", "w", stderr); } @@ -101,8 +104,8 @@ namespace i2p } uint32_t cfsize; i2p::config::GetOption("limits.coresize", cfsize); if (cfsize) // core file size set - { - cfsize *= 1024; + { + cfsize *= 1024; getrlimit(RLIMIT_CORE, &limit); if (cfsize <= limit.rlim_max) { limit.rlim_cur = cfsize; @@ -116,7 +119,7 @@ namespace i2p } else { LogPrint(eLogError, "Daemon: limits.coresize exceeds system limit: ", limit.rlim_max); } - } + } // Pidfile // this code is c-styled and a bit ugly, but we need fd for locking pidfile @@ -153,6 +156,7 @@ namespace i2p sigemptyset(&sa.sa_mask); sa.sa_flags = SA_RESTART; sigaction(SIGHUP, &sa, 0); + sigaction(SIGUSR1, &sa, 0); sigaction(SIGABRT, &sa, 0); sigaction(SIGTERM, &sa, 0); sigaction(SIGINT, &sa, 0); @@ -164,7 +168,7 @@ namespace i2p { i2p::fs::Remove(pidfile); - return Daemon_Singleton::stop(); + return Daemon_Singleton::stop(); } void DaemonLinux::run () @@ -175,12 +179,12 @@ namespace i2p if (gracefulShutdownInterval) { gracefulShutdownInterval--; // - 1 second - if (gracefulShutdownInterval <= 0) - { + if (gracefulShutdownInterval <= 0) + { LogPrint(eLogInfo, "Graceful shutdown"); return; } - } + } } } } diff --git a/Family.cpp b/Family.cpp index ce995a4a..fb3b19a6 100644 --- a/Family.cpp +++ b/Family.cpp @@ -20,7 +20,7 @@ namespace data void Families::LoadCertificate (const std::string& filename) { - SSL_CTX * ctx = SSL_CTX_new (TLSv1_method ()); + SSL_CTX * ctx = SSL_CTX_new (TLS_method ()); int ret = SSL_CTX_use_certificate_file (ctx, filename.c_str (), SSL_FILETYPE_PEM); if (ret) { @@ -135,7 +135,7 @@ namespace data { auto filename = i2p::fs::DataDirPath("family", (family + ".key")); std::string sig; - SSL_CTX * ctx = SSL_CTX_new (TLSv1_method ()); + SSL_CTX * ctx = SSL_CTX_new (TLS_method ()); int ret = SSL_CTX_use_PrivateKey_file (ctx, filename.c_str (), SSL_FILETYPE_PEM); if (ret) { diff --git a/Gzip.cpp b/Gzip.cpp index da9f06b1..db991283 100644 --- a/Gzip.cpp +++ b/Gzip.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2016, The PurpleI2P Project +* Copyright (c) 2013-2017, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -9,11 +9,13 @@ #include #include /* memset */ #include - +#include "Log.h" #include "Gzip.h" -namespace i2p { -namespace data { +namespace i2p +{ +namespace data +{ const size_t GZIP_CHUNK_SIZE = 16384; GzipInflator::GzipInflator (): m_IsDirty (false) @@ -36,9 +38,10 @@ namespace data { m_Inflator.next_out = out; m_Inflator.avail_out = outLen; int err; - if ((err = inflate (&m_Inflator, Z_NO_FLUSH)) == Z_STREAM_END) { + if ((err = inflate (&m_Inflator, Z_NO_FLUSH)) == Z_STREAM_END) return outLen - m_Inflator.avail_out; - } + // else + LogPrint (eLogError, "Gzip: Inflate error ", err); return 0; } @@ -49,17 +52,20 @@ namespace data { m_Inflator.next_in = const_cast(in); m_Inflator.avail_in = inLen; int ret; - do { + do + { m_Inflator.next_out = out; m_Inflator.avail_out = GZIP_CHUNK_SIZE; ret = inflate (&m_Inflator, Z_NO_FLUSH); - if (ret < 0) { + if (ret < 0) + { inflateEnd (&m_Inflator); os.setstate(std::ios_base::failbit); break; } os.write ((char *)out, GZIP_CHUNK_SIZE - m_Inflator.avail_out); - } while (!m_Inflator.avail_out); // more data to read + } + while (!m_Inflator.avail_out); // more data to read delete[] out; } @@ -99,9 +105,10 @@ namespace data { m_Deflator.next_out = out; m_Deflator.avail_out = outLen; int err; - if ((err = deflate (&m_Deflator, Z_FINISH)) == Z_STREAM_END) { + if ((err = deflate (&m_Deflator, Z_FINISH)) == Z_STREAM_END) return outLen - m_Deflator.avail_out; - } /* else */ + // else + LogPrint (eLogError, "Gzip: Deflate error ", err); return 0; } } // data diff --git a/I2PControl.cpp b/I2PControl.cpp index 523f10e1..3a65a11c 100644 --- a/I2PControl.cpp +++ b/I2PControl.cpp @@ -25,6 +25,7 @@ #include "Transports.h" #include "version.h" #include "util.h" +#include "ClientContext.h" #include "I2PControl.h" namespace i2p @@ -78,6 +79,8 @@ namespace client m_RouterInfoHandlers["i2p.router.net.bw.outbound.1s"] = &I2PControlService::OutboundBandwidth1S; m_RouterInfoHandlers["i2p.router.net.status"] = &I2PControlService::NetStatusHandler; m_RouterInfoHandlers["i2p.router.net.tunnels.participating"] = &I2PControlService::TunnelsParticipatingHandler; + m_RouterInfoHandlers["i2p.router.net.tunnels.successrate"] = +&I2PControlService::TunnelsSuccessRateHandler; m_RouterInfoHandlers["i2p.router.net.total.received.bytes"] = &I2PControlService::NetTotalReceivedBytes; m_RouterInfoHandlers["i2p.router.net.total.sent.bytes"] = &I2PControlService::NetTotalSentBytes; @@ -187,13 +190,16 @@ namespace client size_t bytes_transferred, std::shared_ptr socket, std::shared_ptr buf) { - if (ecode) { + if (ecode) + { LogPrint (eLogError, "I2PControl: read error: ", ecode.message ()); return; - } else { + } + else + { + bool isHtml = !memcmp (buf->data (), "POST", 4); try { - bool isHtml = !memcmp (buf->data (), "POST", 4); std::stringstream ss; ss.write (buf->data (), bytes_transferred); if (isHtml) @@ -237,7 +243,9 @@ namespace client response << "{\"id\":" << id << ",\"result\":{"; (this->*(it->second))(pt.get_child ("params"), response); response << "},\"jsonrpc\":\"2.0\"}"; - } else { + } + else + { LogPrint (eLogWarning, "I2PControl: unknown method ", method); response << "{\"id\":null,\"error\":"; response << "{\"code\":-32601,\"message\":\"Method not found\"},"; @@ -249,6 +257,11 @@ namespace client catch (std::exception& ex) { LogPrint (eLogError, "I2PControl: exception when handle request: ", ex.what ()); + std::ostringstream response; + response << "{\"id\":null,\"error\":"; + response << "{\"code\":-32700,\"message\":\"" << ex.what () << "\"},"; + response << "\"jsonrpc\":\"2.0\"}"; + SendResponse (socket, buf, response, isHtml); } catch (...) { @@ -391,7 +404,8 @@ namespace client void I2PControlService::StatusHandler (std::ostringstream& results) { - InsertParam (results, "i2p.router.status", "???"); // TODO: + auto dest = i2p::client::context.GetSharedLocalDestination (); + InsertParam (results, "i2p.router.status", (dest && dest->IsReady ()) ? "1" : "0"); } void I2PControlService::NetDbKnownPeersHandler (std::ostringstream& results) @@ -415,6 +429,12 @@ namespace client InsertParam (results, "i2p.router.net.tunnels.participating", transit); } + void I2PControlService::TunnelsSuccessRateHandler (std::ostringstream& results) + { + int rate = i2p::tunnel::tunnels.GetTunnelCreationSuccessRate (); + InsertParam (results, "i2p.router.net.tunnels.successrate", rate); + } + void I2PControlService::InboundBandwidth1S (std::ostringstream& results) { double bw = i2p::transport::transports.GetInBandwidth (); diff --git a/I2PControl.h b/I2PControl.h index 047c2fe2..5d81c8f6 100644 --- a/I2PControl.h +++ b/I2PControl.h @@ -81,6 +81,7 @@ namespace client void NetDbActivePeersHandler (std::ostringstream& results); void NetStatusHandler (std::ostringstream& results); void TunnelsParticipatingHandler (std::ostringstream& results); + void TunnelsSuccessRateHandler (std::ostringstream& results); void InboundBandwidth1S (std::ostringstream& results); void OutboundBandwidth1S (std::ostringstream& results); void NetTotalReceivedBytes (std::ostringstream& results); diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 843d129f..6a9b3ffd 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -302,7 +302,7 @@ namespace client if (m_NeedsWebIrc) { m_NeedsWebIrc = false; - m_OutPacket << "WEBIRC " << m_WebircPass << " cgiirc " << context.GetAddressBook ().ToAddress (m_From->GetIdentHash ()) << " 127.0.0.1\n"; + m_OutPacket << "WEBIRC " << m_WebircPass << " cgiirc " << context.GetAddressBook ().ToAddress (m_From->GetIdentHash ()) << " " << GetSocket ()->local_endpoint ().address () << std::endl; } m_InPacket.clear (); diff --git a/I2PTunnel.h b/I2PTunnel.h index 4b9b2c9b..68f46e43 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -53,6 +53,8 @@ namespace client void HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleConnect (const boost::system::error_code& ecode); + std::shared_ptr GetSocket () const { return m_Socket; }; + private: uint8_t m_Buffer[I2P_TUNNEL_CONNECTION_BUFFER_SIZE], m_StreamBuffer[I2P_TUNNEL_CONNECTION_BUFFER_SIZE]; diff --git a/Makefile.linux b/Makefile.linux index 02f099d2..af23f955 100644 --- a/Makefile.linux +++ b/Makefile.linux @@ -1,5 +1,5 @@ # set defaults instead redefine -CXXFLAGS ?= -g -Wall -Wextra -Wno-unused-parameter -pedantic +CXXFLAGS ?= -g -Wall -Wextra -Wno-unused-parameter -pedantic -Wno-misleading-indentation INCFLAGS ?= ## NOTE: The NEEDED_CXXFLAGS are here so that custom CXXFLAGS can be specified at build time diff --git a/Makefile.osx b/Makefile.osx index c8a7de2a..f9372c8f 100644 --- a/Makefile.osx +++ b/Makefile.osx @@ -1,11 +1,11 @@ CXX = clang++ -CXXFLAGS = -g -Wall -std=c++11 -DMAC_OSX +CXXFLAGS = -Os -Wall -std=c++11 -DMAC_OSX #CXXFLAGS = -g -O2 -Wall -std=c++11 -INCFLAGS = -I/usr/local/include -I/usr/local/ssl/include -LDFLAGS = -Wl,-rpath,/usr/local/lib -L/usr/local/lib -L/usr/local/ssl/lib +INCFLAGS = -I/usr/local/include +LDFLAGS = -Wl,-rpath,/usr/local/lib -L/usr/local/lib ifeq ($(USE_STATIC),yes) -LDLIBS = -lz -lcrypto -lssl /usr/local/lib/libboost_system.a /usr/local/lib/libboost_date_time.a /usr/local/lib/libboost_filesystem.a /usr/local/lib/libboost_program_options.a -lpthread +LDLIBS = -lz /usr/local/lib/libcrypto.a /usr/local/lib/libssl.a /usr/local/lib/libboost_system.a /usr/local/lib/libboost_date_time.a /usr/local/lib/libboost_filesystem.a /usr/local/lib/libboost_program_options.a -lpthread else LDLIBS = -lz -lcrypto -lssl -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options -lpthread endif @@ -15,11 +15,13 @@ ifeq ($(USE_UPNP),yes) CXXFLAGS += -DUSE_UPNP endif -ifeq ($(USE_AESNI),yes) +ifeq ($(USE_AESNI),1) CXXFLAGS += -maes -DAESNI +else + CXXFLAGS += -msse endif -ifeq ($(USE_AVX),yes) +ifeq ($(USE_AVX),1) CXXFLAGS += -mavx endif diff --git a/NTCPSession.cpp b/NTCPSession.cpp index 8f31e246..883f0a11 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "I2PEndian.h" #include "Base.h" @@ -35,12 +36,12 @@ namespace transport delete m_Establisher; } - void NTCPSession::CreateAESKey (uint8_t * pubKey, i2p::crypto::AESKey& key) + void NTCPSession::CreateAESKey (uint8_t * pubKey) { uint8_t sharedKey[256]; - m_DHKeysPair->Agree (pubKey, sharedKey); + m_DHKeysPair->Agree (pubKey, sharedKey); // time consuming operation - uint8_t * aesKey = key; + i2p::crypto::AESKey aesKey; if (sharedKey[0] & 0x80) { aesKey[0] = 0; @@ -63,6 +64,9 @@ namespace transport } memcpy (aesKey, nonZero, 32); } + + m_Decryption.SetKey (aesKey); + m_Encryption.SetKey (aesKey); } void NTCPSession::Done () @@ -163,15 +167,25 @@ namespace transport return; } } - - SendPhase2 (); + + // TODO: check for number of pending keys + auto s = shared_from_this (); + auto keyCreated = std::async (std::launch::async, [s] () + { + if (!s->m_DHKeysPair) + s->m_DHKeysPair = transports.GetNextDHKeysPair (); + s->CreateAESKey (s->m_Establisher->phase1.pubKey); + }).share (); + m_Server.GetService ().post ([s, keyCreated]() + { + keyCreated.get (); + s->SendPhase2 (); + }); } } void NTCPSession::SendPhase2 () { - if (!m_DHKeysPair) - m_DHKeysPair = transports.GetNextDHKeysPair (); const uint8_t * y = m_DHKeysPair->GetPublicKey (); memcpy (m_Establisher->phase2.pubKey, y, 256); uint8_t xy[512]; @@ -182,11 +196,7 @@ namespace transport memcpy (m_Establisher->phase2.encrypted.timestamp, &tsB, 4); RAND_bytes (m_Establisher->phase2.encrypted.filler, 12); - i2p::crypto::AESKey aesKey; - CreateAESKey (m_Establisher->phase1.pubKey, aesKey); - m_Encryption.SetKey (aesKey); m_Encryption.SetIV (y + 240); - m_Decryption.SetKey (aesKey); m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16); m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); @@ -229,32 +239,44 @@ namespace transport } else { - i2p::crypto::AESKey aesKey; - CreateAESKey (m_Establisher->phase2.pubKey, aesKey); - m_Decryption.SetKey (aesKey); - m_Decryption.SetIV (m_Establisher->phase2.pubKey + 240); - m_Encryption.SetKey (aesKey); - m_Encryption.SetIV (m_Establisher->phase1.HXxorHI + 16); - - m_Decryption.Decrypt((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); - // verify - uint8_t xy[512]; - memcpy (xy, m_DHKeysPair->GetPublicKey (), 256); - memcpy (xy + 256, m_Establisher->phase2.pubKey, 256); - uint8_t digest[32]; - SHA256 (xy, 512, digest); - if (memcmp(m_Establisher->phase2.encrypted.hxy, digest, 32)) - { - LogPrint (eLogError, "NTCP: Phase 2 process error: incorrect hash"); - transports.ReuseDHKeysPair (m_DHKeysPair); - m_DHKeysPair = nullptr; - Terminate (); - return ; - } - SendPhase3 (); + auto s = shared_from_this (); + // create AES key in separate thread + auto keyCreated = std::async (std::launch::async, [s] () + { + s->CreateAESKey (s->m_Establisher->phase2.pubKey); + }).share (); // TODO: use move capture in C++ 14 instead shared_future + // let other operations execute while a key gets created + m_Server.GetService ().post ([s, keyCreated]() + { + keyCreated.get (); // we might wait if no more pending operations + s->HandlePhase2 (); + }); } } + void NTCPSession::HandlePhase2 () + { + m_Decryption.SetIV (m_Establisher->phase2.pubKey + 240); + m_Encryption.SetIV (m_Establisher->phase1.HXxorHI + 16); + + m_Decryption.Decrypt((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted); + // verify + uint8_t xy[512]; + memcpy (xy, m_DHKeysPair->GetPublicKey (), 256); + memcpy (xy + 256, m_Establisher->phase2.pubKey, 256); + uint8_t digest[32]; + SHA256 (xy, 512, digest); + if (memcmp(m_Establisher->phase2.encrypted.hxy, digest, 32)) + { + LogPrint (eLogError, "NTCP: Phase 2 process error: incorrect hash"); + transports.ReuseDHKeysPair (m_DHKeysPair); + m_DHKeysPair = nullptr; + Terminate (); + return ; + } + SendPhase3 (); + } + void NTCPSession::SendPhase3 () { auto& keys = i2p::context.GetPrivateKeys (); diff --git a/NTCPSession.h b/NTCPSession.h index a5d6f99f..d9acd5ce 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -67,12 +67,13 @@ namespace transport void SendTimeSyncMessage (); void SetIsEstablished (bool isEstablished) { m_IsEstablished = isEstablished; } - void CreateAESKey (uint8_t * pubKey, i2p::crypto::AESKey& key); + void CreateAESKey (uint8_t * pubKey); // client void SendPhase3 (); void HandlePhase1Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandlePhase2Received (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void HandlePhase2 (); void HandlePhase3Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA); void HandlePhase4Received (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA); diff --git a/NetDb.cpp b/NetDb.cpp index 88d5483f..8b4ec88a 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -583,7 +583,7 @@ namespace data IdentHash ident (buf + DATABASE_STORE_KEY_OFFSET); if (ident.IsZero ()) { - LogPrint (eLogError, "NetDb: database store with zero ident, dropped"); + LogPrint (eLogDebug, "NetDb: database store with zero ident, dropped"); return; } uint32_t replyToken = bufbe32toh (buf + DATABASE_STORE_REPLY_TOKEN_OFFSET); @@ -602,14 +602,14 @@ namespace data if (outbound) outbound->SendTunnelDataMsg (buf + offset, tunnelID, deliveryStatus); else - LogPrint (eLogError, "NetDb: no outbound tunnels for DatabaseStore reply found"); + LogPrint (eLogWarning, "NetDb: no outbound tunnels for DatabaseStore reply found"); } offset += 32; } // we must send reply back before this check if (ident == i2p::context.GetIdentHash ()) { - LogPrint (eLogError, "NetDb: database store with own RouterInfo received, dropped"); + LogPrint (eLogDebug, "NetDb: database store with own RouterInfo received, dropped"); return; } size_t payloadOffset = offset; @@ -636,7 +636,7 @@ namespace data updated = AddRouterInfo (ident, uncompressed, uncompressedSize); else { - LogPrint (eLogError, "NetDb: decompression failed ", uncompressedSize); + LogPrint (eLogInfo, "NetDb: decompression failed ", uncompressedSize); return; } } diff --git a/Reseed.cpp b/Reseed.cpp index b4eaa76c..9387d2ae 100644 --- a/Reseed.cpp +++ b/Reseed.cpp @@ -83,10 +83,24 @@ namespace data } } + int Reseeder::ProcessZIPFile (const char * filename) + { + std::ifstream s(filename, std::ifstream::binary); + if (s.is_open ()) + { + s.seekg (0, std::ios::end); + auto len = s.tellg (); + s.seekg (0, std::ios::beg); + return ProcessZIPStream (s, len); + } + else + { + LogPrint (eLogError, "Reseed: Can't open file ", filename); + return 0; + } + } + const char SU3_MAGIC_NUMBER[]="I2Psu3"; - const uint32_t ZIP_HEADER_SIGNATURE = 0x04034B50; - const uint32_t ZIP_CENTRAL_DIRECTORY_HEADER_SIGNATURE = 0x02014B50; - const uint16_t ZIP_BIT_FLAG_DATA_DESCRIPTOR = 0x0008; int Reseeder::ProcessSU3Stream (std::istream& s) { char magicNumber[7]; @@ -194,6 +208,14 @@ namespace data } // handle content + return ProcessZIPStream (s, contentLength); + } + + const uint32_t ZIP_HEADER_SIGNATURE = 0x04034B50; + const uint32_t ZIP_CENTRAL_DIRECTORY_HEADER_SIGNATURE = 0x02014B50; + const uint16_t ZIP_BIT_FLAG_DATA_DESCRIPTOR = 0x0008; + int Reseeder::ProcessZIPStream (std::istream& s, uint64_t contentLength) + { int numFiles = 0; size_t contentPos = s.tellg (); while (!s.eof ()) @@ -362,7 +384,7 @@ namespace data void Reseeder::LoadCertificate (const std::string& filename) { - SSL_CTX * ctx = SSL_CTX_new (TLSv1_method ()); + SSL_CTX * ctx = SSL_CTX_new (TLS_method ()); int ret = SSL_CTX_use_certificate_file (ctx, filename.c_str (), SSL_FILETYPE_PEM); if (ret) { diff --git a/Reseed.h b/Reseed.h index de47c0b1..3ef8767c 100644 --- a/Reseed.h +++ b/Reseed.h @@ -31,8 +31,10 @@ namespace data int ReseedFromSU3 (const std::string& url); int ProcessSU3File (const char * filename); + int ProcessZIPFile (const char * filename); int ProcessSU3Stream (std::istream& s); - + int ProcessZIPStream (std::istream& s, uint64_t contentLength); + bool FindZipDataDescriptor (std::istream& s); std::string HttpsRequest (const std::string& address); diff --git a/RouterContext.cpp b/RouterContext.cpp index 966da6ac..b558e050 100644 --- a/RouterContext.cpp +++ b/RouterContext.cpp @@ -205,7 +205,7 @@ namespace i2p void RouterContext::SetBandwidth (char L) { uint16_t limit = 0; - enum { low, high, extra } type = high; + enum { low, high, extra, unlim } type = high; /* detect parameters */ switch (L) { @@ -215,7 +215,7 @@ namespace i2p case i2p::data::CAPS_FLAG_HIGH_BANDWIDTH2 : limit = 128; type = high; break; case i2p::data::CAPS_FLAG_HIGH_BANDWIDTH3 : limit = 256; type = high; break; case i2p::data::CAPS_FLAG_EXTRA_BANDWIDTH1 : limit = 2048; type = extra; break; - case i2p::data::CAPS_FLAG_EXTRA_BANDWIDTH2 : limit = 9999; type = extra; break; + case i2p::data::CAPS_FLAG_EXTRA_BANDWIDTH2 : limit = 9999; type = unlim; break; default: limit = 48; type = low; } @@ -226,7 +226,8 @@ namespace i2p switch (type) { case low : /* not set */; break; - case extra : caps |= i2p::data::RouterInfo::eExtraBandwidth; // no break here + case extra : caps |= i2p::data::RouterInfo::eExtraBandwidth; break; // 'P' + case unlim : caps |= i2p::data::RouterInfo::eExtraBandwidth; // no break here, extra + high means 'X' case high : caps |= i2p::data::RouterInfo::eHighBandwidth; break; } m_RouterInfo.SetCaps (caps); @@ -253,7 +254,12 @@ namespace i2p void RouterContext::SetUnreachable () { // set caps - m_RouterInfo.SetCaps (i2p::data::RouterInfo::eUnreachable | i2p::data::RouterInfo::eSSUTesting); // LU, B + uint8_t caps = m_RouterInfo.GetCaps (); + caps &= ~i2p::data::RouterInfo::eReachable; + caps |= i2p::data::RouterInfo::eUnreachable; + caps &= ~i2p::data::RouterInfo::eFloodfill; // can't be floodfill + caps &= ~i2p::data::RouterInfo::eSSUIntroducer; // can't be introducer + m_RouterInfo.SetCaps (caps); // remove NTCP address auto& addresses = m_RouterInfo.GetAddresses (); for (auto it = addresses.begin (); it != addresses.end (); ++it) diff --git a/RouterInfo.cpp b/RouterInfo.cpp index 08d46ecb..205262e5 100644 --- a/RouterInfo.cpp +++ b/RouterInfo.cpp @@ -376,14 +376,21 @@ namespace data std::string caps; if (m_Caps & eFloodfill) { - if (m_Caps & eExtraBandwidth) caps += CAPS_FLAG_EXTRA_BANDWIDTH1; // 'P' + if (m_Caps & eExtraBandwidth) caps += (m_Caps & eHighBandwidth) ? + CAPS_FLAG_EXTRA_BANDWIDTH2 : // 'X' + CAPS_FLAG_EXTRA_BANDWIDTH1; // 'P' caps += CAPS_FLAG_HIGH_BANDWIDTH3; // 'O' caps += CAPS_FLAG_FLOODFILL; // floodfill } else { - if (m_Caps & eExtraBandwidth) caps += CAPS_FLAG_EXTRA_BANDWIDTH1; // 'P' - caps += (m_Caps & eHighBandwidth) ? CAPS_FLAG_HIGH_BANDWIDTH3 /* 'O' */: CAPS_FLAG_LOW_BANDWIDTH2 /* 'L' */; // bandwidth + if (m_Caps & eExtraBandwidth) + { + caps += (m_Caps & eHighBandwidth) ? CAPS_FLAG_EXTRA_BANDWIDTH2 /* 'X' */ : CAPS_FLAG_EXTRA_BANDWIDTH1; /*'P' */ + caps += CAPS_FLAG_HIGH_BANDWIDTH3; // 'O' + } + else + caps += (m_Caps & eHighBandwidth) ? CAPS_FLAG_HIGH_BANDWIDTH3 /* 'O' */: CAPS_FLAG_LOW_BANDWIDTH2 /* 'L' */; // bandwidth } if (m_Caps & eHidden) caps += CAPS_FLAG_HIDDEN; // hidden if (m_Caps & eReachable) caps += CAPS_FLAG_REACHABLE; // reachable diff --git a/SAM.cpp b/SAM.cpp index 929b144d..d89750f2 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -15,9 +15,9 @@ namespace i2p { namespace client { - SAMSocket::SAMSocket (SAMBridge& owner): + SAMSocket::SAMSocket (SAMBridge& owner): m_Owner (owner), m_Socket (m_Owner.GetService ()), m_Timer (m_Owner.GetService ()), - m_BufferOffset (0), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false), + m_BufferOffset (0), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false), m_Stream (nullptr), m_Session (nullptr) { } @@ -25,21 +25,21 @@ namespace client SAMSocket::~SAMSocket () { Terminate (); - } + } void SAMSocket::CloseStream () { if (m_Stream) - { + { m_Stream->Close (); m_Stream.reset (); - } - } - + } + } + void SAMSocket::Terminate () { CloseStream (); - + switch (m_SocketType) { case eSAMSocketTypeSession: @@ -47,14 +47,14 @@ namespace client break; case eSAMSocketTypeStream: { - if (m_Session) + if (m_Session) m_Session->DelSocket (shared_from_this ()); break; } case eSAMSocketTypeAcceptor: { if (m_Session) - { + { m_Session->DelSocket (shared_from_this ()); if (m_Session->localDestination) m_Session->localDestination->StopAcceptingStreams (); @@ -71,8 +71,8 @@ namespace client void SAMSocket::ReceiveHandshake () { - m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), - std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), + m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), + std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } @@ -85,7 +85,7 @@ namespace client Terminate (); } else - { + { m_Buffer[bytes_transferred] = 0; char * eol = (char *)memchr (m_Buffer, '\n', bytes_transferred); if (eol) @@ -94,8 +94,8 @@ namespace client char * separator = strchr (m_Buffer, ' '); if (separator) { - separator = strchr (separator + 1, ' '); - if (separator) + separator = strchr (separator + 1, ' '); + if (separator) *separator = 0; } @@ -117,13 +117,13 @@ namespace client { #ifdef _MSC_VER size_t l = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ()); -#else +#else size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ()); #endif boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (), - std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (), + std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - } + } else SendMessageReply (SAM_HANDSHAKE_I2P_ERROR, strlen (SAM_HANDSHAKE_I2P_ERROR), true); } @@ -145,25 +145,25 @@ namespace client } else { - m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), - std::bind(&SAMSocket::HandleMessage, shared_from_this (), - std::placeholders::_1, std::placeholders::_2)); - } + m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), + std::bind(&SAMSocket::HandleMessage, shared_from_this (), + std::placeholders::_1, std::placeholders::_2)); + } } void SAMSocket::SendMessageReply (const char * msg, size_t len, bool close) { - if (!m_IsSilent) + if (!m_IsSilent) boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), - std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (), + std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, close)); else { if (close) Terminate (); else - Receive (); - } + Receive (); + } } void SAMSocket::HandleMessageReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred, bool close) @@ -179,8 +179,8 @@ namespace client if (close) Terminate (); else - Receive (); - } + Receive (); + } } void SAMSocket::HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred) @@ -205,8 +205,8 @@ namespace client char * separator = strchr (m_Buffer, ' '); if (separator) { - separator = strchr (separator + 1, ' '); - if (separator) + separator = strchr (separator + 1, ' '); + if (separator) *separator = 0; else separator = eol; @@ -236,12 +236,12 @@ namespace client *separator = ' '; *eol = '\n'; } - } + } // since it's SAM v1 reply is not expected Receive (); } - else - { + else + { LogPrint (eLogError, "SAM: unexpected message ", m_Buffer); Terminate (); } @@ -252,8 +252,9 @@ namespace client Terminate (); } } + else - { + { LogPrint (eLogWarning, "SAM: incomplete message ", bytes_transferred); m_BufferOffset = bytes_transferred; // try to receive remaining message @@ -267,10 +268,10 @@ namespace client LogPrint (eLogDebug, "SAM: session create: ", buf); std::map params; ExtractParams (buf, params); - std::string& style = params[SAM_PARAM_STYLE]; + std::string& style = params[SAM_PARAM_STYLE]; std::string& id = params[SAM_PARAM_ID]; std::string& destination = params[SAM_PARAM_DESTINATION]; - m_ID = id; + m_ID = id; if (m_Owner.FindSession (id)) { // session exists @@ -278,15 +279,39 @@ namespace client return; } - // create destination - m_Session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination, ¶ms); + std::shared_ptr forward = nullptr; + if (style == SAM_VALUE_DATAGRAM && params.find(SAM_VALUE_HOST) != params.end() && params.find(SAM_VALUE_PORT) != params.end()) + { + // udp forward selected + boost::system::error_code e; + // TODO: support hostnames in udp forward + auto addr = boost::asio::ip::address::from_string(params[SAM_VALUE_HOST], e); + if (e) + { + // not an ip address + SendI2PError("Invalid IP Address in HOST"); + return; + } + + auto port = std::stoi(params[SAM_VALUE_PORT]); + if (port == -1) + { + SendI2PError("Invalid port"); + return; + } + forward = std::make_shared(addr, port); + } + + // create destination + m_Session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination, ¶ms); if (m_Session) { m_SocketType = eSAMSocketTypeSession; if (style == SAM_VALUE_DATAGRAM) { + m_Session->UDPEndpoint = forward; auto dest = m_Session->localDestination->CreateDatagramDestination (); - dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), + dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); } @@ -296,7 +321,7 @@ namespace client { m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL)); m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); } } else @@ -314,7 +339,7 @@ namespace client m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL)); m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer, shared_from_this (), std::placeholders::_1)); - } + } } } @@ -327,7 +352,7 @@ namespace client priv[l1] = 0; #ifdef _MSC_VER size_t l2 = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv); -#else +#else size_t l2 = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv); #endif SendMessageReply (m_Buffer, l2, false); @@ -341,7 +366,7 @@ namespace client std::string& id = params[SAM_PARAM_ID]; std::string& destination = params[SAM_PARAM_DESTINATION]; std::string& silent = params[SAM_PARAM_SILENT]; - if (silent == SAM_VALUE_TRUE) m_IsSilent = true; + if (silent == SAM_VALUE_TRUE) m_IsSilent = true; m_ID = id; m_Session = m_Owner.FindSession (id); if (m_Session) @@ -365,7 +390,7 @@ namespace client SendMessageReply(SAM_SESSION_STATUS_INVALID_KEY, strlen(SAM_SESSION_STATUS_INVALID_KEY), true); } else - SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); + SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); } void SAMSocket::Connect (std::shared_ptr remote) @@ -374,7 +399,7 @@ namespace client m_Session->AddSocket (shared_from_this ()); m_Stream = m_Session->localDestination->CreateStream (remote); m_Stream->Send ((uint8_t *)m_Buffer, 0); // connect - I2PReceive (); + I2PReceive (); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); } @@ -396,17 +421,17 @@ namespace client ExtractParams (buf, params); std::string& id = params[SAM_PARAM_ID]; std::string& silent = params[SAM_PARAM_SILENT]; - if (silent == SAM_VALUE_TRUE) m_IsSilent = true; + if (silent == SAM_VALUE_TRUE) m_IsSilent = true; m_ID = id; m_Session = m_Owner.FindSession (id); if (m_Session) - { + { m_SocketType = eSAMSocketTypeAcceptor; m_Session->AddSocket (shared_from_this ()); if (!m_Session->localDestination->IsAcceptingStreams ()) m_Session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); - } + } else SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); } @@ -418,9 +443,9 @@ namespace client ExtractParams (buf, params); size_t size = std::stoi(params[SAM_PARAM_SIZE]), offset = data - buf; if (offset + size <= len) - { + { if (m_Session) - { + { auto d = m_Session->localDestination->GetDatagramDestination (); if (d) { @@ -433,24 +458,24 @@ namespace client } else LogPrint (eLogError, "SAM: session is not created from DATAGRAM SEND"); - } + } else { LogPrint (eLogWarning, "SAM: sent datagram size ", size, " exceeds buffer ", len - offset); return 0; // try to receive more - } + } return offset + size; - } - + } + void SAMSocket::ProcessDestGenerate () { LogPrint (eLogDebug, "SAM: dest generate"); auto keys = i2p::data::PrivateKeys::CreateRandomKeys (); #ifdef _MSC_VER - size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY, + size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY, keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ()); -#else - size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY, +#else + size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY, keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ()); #endif SendMessageReply (m_Buffer, len, false); @@ -479,45 +504,56 @@ namespace client std::bind (&SAMSocket::HandleNamingLookupLeaseSetRequestComplete, shared_from_this (), std::placeholders::_1, ident)); } - else + else { LogPrint (eLogError, "SAM: naming failed, unknown address ", name); #ifdef _MSC_VER size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, name.c_str()); -#else +#else size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, name.c_str()); #endif SendMessageReply (m_Buffer, len, false); } - } + } - void SAMSocket::HandleNamingLookupLeaseSetRequestComplete (std::shared_ptr leaseSet, i2p::data::IdentHash ident) + void SAMSocket::SendI2PError(const std::string & msg) + { + LogPrint (eLogError, "SAM: i2p error ", msg); +#ifdef _MSC_VER + size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_STATUS_I2P_ERROR, msg.c_str()); +#else + size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_STATUS_I2P_ERROR, msg.c_str()); +#endif + SendMessageReply (m_Buffer, len, true); + } + + void SAMSocket::HandleNamingLookupLeaseSetRequestComplete (std::shared_ptr leaseSet, i2p::data::IdentHash ident) { if (leaseSet) - { + { context.GetAddressBook ().InsertAddress (leaseSet->GetIdentity ()); SendNamingLookupReply (leaseSet->GetIdentity ()); - } + } else { LogPrint (eLogError, "SAM: naming lookup failed. LeaseSet for ", ident.ToBase32 (), " not found"); #ifdef _MSC_VER - size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, + size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, context.GetAddressBook ().ToAddress (ident).c_str()); -#else +#else size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, context.GetAddressBook ().ToAddress (ident).c_str()); #endif SendMessageReply (m_Buffer, len, false); } - } - + } + void SAMSocket::SendNamingLookupReply (std::shared_ptr identity) { auto base64 = identity->ToBase64 (); #ifdef _MSC_VER - size_t l = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY, base64.c_str ()); -#else + size_t l = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY, base64.c_str ()); +#else size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY, base64.c_str ()); #endif SendMessageReply (m_Buffer, l, false); @@ -525,7 +561,7 @@ namespace client void SAMSocket::ExtractParams (char * buf, std::map& params) { - char * separator; + char * separator; do { separator = strchr (buf, ' '); @@ -536,11 +572,11 @@ namespace client *value = 0; value++; params[buf] = value; - } + } buf = separator + 1; } while (separator); - } + } void SAMSocket::Receive () { @@ -550,7 +586,7 @@ namespace client Terminate (); return; } - m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), + m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } @@ -566,17 +602,17 @@ namespace client else { if (m_Stream) - { + { auto s = shared_from_this (); m_Stream->AsyncSend ((uint8_t *)m_Buffer, bytes_transferred, [s](const boost::system::error_code& ecode) { if (!ecode) s->Receive (); - else + else s->m_Owner.GetService ().post ([s] { s->Terminate (); }); }); - } + } } } @@ -586,7 +622,7 @@ namespace client { if (m_Stream->GetStatus () == i2p::stream::eStreamStatusNew || m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular - { + { m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE), std::bind (&SAMSocket::HandleI2PReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2), @@ -602,10 +638,10 @@ namespace client std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); } else // no more data - Terminate (); - } + Terminate (); + } } - } + } void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) { @@ -617,17 +653,17 @@ namespace client if (bytes_transferred > 0) boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred), std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); // postpone termination - else - { + else + { auto s = shared_from_this (); m_Owner.GetService ().post ([s] { s->Terminate (); }); } } - else - { + else + { auto s = shared_from_this (); m_Owner.GetService ().post ([s] { s->Terminate (); }); - } + } } else { @@ -658,7 +694,7 @@ namespace client context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ()); auto session = m_Owner.FindSession (m_ID); if (session) - { + { // find more pending acceptors for (auto it: session->ListSockets ()) if (it->m_SocketType == eSAMSocketTypeAcceptor) @@ -674,44 +710,66 @@ namespace client const size_t ident_len = ident_ptr->GetFullLen(); uint8_t* ident = new uint8_t[ident_len]; - // send remote peer address as base64 + // send remote peer address as base64 const size_t l = ident_ptr->ToBuffer (ident, ident_len); const size_t l1 = i2p::data::ByteStreamToBase64 (ident, l, (char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE); delete[] ident; m_StreamBuffer[l1] = '\n'; HandleI2PReceive (boost::system::error_code (), l1 +1); // we send identity like it has been received from stream - } + } else I2PReceive (); } else LogPrint (eLogWarning, "SAM: I2P acceptor has been reset"); - } + } void SAMSocket::HandleI2PDatagramReceive (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { LogPrint (eLogDebug, "SAM: datagram received ", len); auto base64 = from.ToBase64 (); -#ifdef _MSC_VER - size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); -#else - size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); -#endif - if (len < SAM_SOCKET_BUFFER_SIZE - l) - { - memcpy (m_StreamBuffer + l, buf, len); - boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len + l), - std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); + auto ep = m_Session->UDPEndpoint; + if (ep) + { + // udp forward enabled + size_t bsz = base64.size(); + size_t sz = bsz + 1 + len; + // build datagram body + uint8_t * data = new uint8_t[sz]; + // Destination + memcpy(data, base64.c_str(), bsz); + // linefeed + data[bsz] = '\n'; + // Payload + memcpy(data+bsz+1, buf, len); + // send to remote endpoint + m_Owner.SendTo(data, sz, ep); + delete [] data; } else - LogPrint (eLogWarning, "SAM: received datagram size ", len," exceeds buffer"); + { +#ifdef _MSC_VER + size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); +#else + size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); +#endif + if (len < SAM_SOCKET_BUFFER_SIZE - l) + { + memcpy (m_StreamBuffer + l, buf, len); + boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len + l), + std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); + } + else + LogPrint (eLogWarning, "SAM: received datagram size ", len," exceeds buffer"); + } } SAMSession::SAMSession (std::shared_ptr dest): - localDestination (dest) + localDestination (dest), + UDPEndpoint(nullptr) { } - + SAMSession::~SAMSession () { CloseStreams(); @@ -741,7 +799,7 @@ namespace client { if (m_IsRunning) Stop (); - } + } void SAMBridge::Start () { @@ -760,26 +818,26 @@ namespace client m_Sessions.clear (); m_Service.stop (); if (m_Thread) - { - m_Thread->join (); + { + m_Thread->join (); delete m_Thread; m_Thread = nullptr; - } + } } - void SAMBridge::Run () - { + void SAMBridge::Run () + { while (m_IsRunning) { try - { + { m_Service.run (); } catch (std::exception& ex) { LogPrint (eLogError, "SAM: runtime exception: ", ex.what ()); - } - } + } + } } void SAMBridge::Accept () @@ -796,7 +854,7 @@ namespace client boost::system::error_code ec; auto ep = socket->GetSocket ().remote_endpoint (ec); if (!ec) - { + { LogPrint (eLogDebug, "SAM: new connection from ", ep); socket->ReceiveHandshake (); } @@ -810,10 +868,10 @@ namespace client Accept (); } - std::shared_ptr SAMBridge::CreateSession (const std::string& id, const std::string& destination, + std::shared_ptr SAMBridge::CreateSession (const std::string& id, const std::string& destination, const std::map * params) { - std::shared_ptr localDestination = nullptr; + std::shared_ptr localDestination = nullptr; if (destination != "") { i2p::data::PrivateKeys keys; @@ -828,10 +886,10 @@ namespace client { auto it = params->find (SAM_PARAM_SIGNATURE_TYPE); if (it != params->end ()) - // TODO: extract string values + // TODO: extract string values signatureType = std::stoi(it->second); } - localDestination = i2p::client::context.CreateNewLocalDestination (true, signatureType, params); + localDestination = i2p::client::context.CreateNewLocalDestination (true, signatureType, params); } if (localDestination) { @@ -852,13 +910,13 @@ namespace client std::unique_lock l(m_SessionsMutex); auto it = m_Sessions.find (id); if (it != m_Sessions.end ()) - { + { session = it->second; m_Sessions.erase (it); - } - } + } + } if (session) - { + { session->localDestination->StopAcceptingStreams (); session->CloseStreams (); } @@ -873,12 +931,20 @@ namespace client return nullptr; } + void SAMBridge::SendTo(const uint8_t * buf, size_t len, std::shared_ptr remote) + { + if(remote) + { + m_DatagramSocket.send_to(boost::asio::buffer(buf, len), *remote); + } + } + void SAMBridge::ReceiveDatagram () { m_DatagramSocket.async_receive_from ( - boost::asio::buffer (m_DatagramReceiveBuffer, i2p::datagram::MAX_DATAGRAM_SIZE), + boost::asio::buffer (m_DatagramReceiveBuffer, i2p::datagram::MAX_DATAGRAM_SIZE), m_SenderEndpoint, - std::bind (&SAMBridge::HandleReceivedDatagram, this, std::placeholders::_1, std::placeholders::_2)); + std::bind (&SAMBridge::HandleReceivedDatagram, this, std::placeholders::_1, std::placeholders::_2)); } void SAMBridge::HandleReceivedDatagram (const boost::system::error_code& ecode, std::size_t bytes_transferred) @@ -888,7 +954,7 @@ namespace client m_DatagramReceiveBuffer[bytes_transferred] = 0; char * eol = strchr ((char *)m_DatagramReceiveBuffer, '\n'); *eol = 0; eol++; - size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer); + size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer); LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen); char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' '); if (sessionID) @@ -900,12 +966,12 @@ namespace client *destination = 0; destination++; auto session = FindSession (sessionID); if (session) - { + { i2p::data::IdentityEx dest; dest.FromBase64 (destination); session->localDestination->GetDatagramDestination ()-> SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ()); - } + } else LogPrint (eLogError, "SAM: Session ", sessionID, " not found"); } diff --git a/SAM.h b/SAM.h index db08c5a0..db6991f5 100644 --- a/SAM.h +++ b/SAM.h @@ -20,45 +20,48 @@ namespace client { const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds - const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds + const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; - const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n"; + const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n"; const char SAM_SESSION_CREATE[] = "SESSION CREATE"; const char SAM_SESSION_CREATE_REPLY_OK[] = "SESSION STATUS RESULT=OK DESTINATION=%s\n"; const char SAM_SESSION_CREATE_DUPLICATED_ID[] = "SESSION STATUS RESULT=DUPLICATED_ID\n"; - const char SAM_SESSION_CREATE_DUPLICATED_DEST[] = "SESSION STATUS RESULT=DUPLICATED_DEST\n"; + const char SAM_SESSION_CREATE_DUPLICATED_DEST[] = "SESSION STATUS RESULT=DUPLICATED_DEST\n"; const char SAM_SESSION_STATUS_INVALID_KEY[] = "SESSION STATUS RESULT=INVALID_KEY\n"; + const char SAM_SESSION_STATUS_I2P_ERROR[] = "SESSION STATUS RESULT=I2P_ERROR MESSAGE=%s\n"; const char SAM_STREAM_CONNECT[] = "STREAM CONNECT"; const char SAM_STREAM_STATUS_OK[] = "STREAM STATUS RESULT=OK\n"; const char SAM_STREAM_STATUS_INVALID_ID[] = "STREAM STATUS RESULT=INVALID_ID\n"; const char SAM_STREAM_STATUS_CANT_REACH_PEER[] = "STREAM STATUS RESULT=CANT_REACH_PEER\n"; const char SAM_STREAM_STATUS_I2P_ERROR[] = "STREAM STATUS RESULT=I2P_ERROR\n"; - const char SAM_STREAM_ACCEPT[] = "STREAM ACCEPT"; + const char SAM_STREAM_ACCEPT[] = "STREAM ACCEPT"; const char SAM_DATAGRAM_SEND[] = "DATAGRAM SEND"; const char SAM_DEST_GENERATE[] = "DEST GENERATE"; - const char SAM_DEST_REPLY[] = "DEST REPLY PUB=%s PRIV=%s\n"; + const char SAM_DEST_REPLY[] = "DEST REPLY PUB=%s PRIV=%s\n"; const char SAM_DEST_REPLY_I2P_ERROR[] = "DEST REPLY RESULT=I2P_ERROR\n"; const char SAM_NAMING_LOOKUP[] = "NAMING LOOKUP"; const char SAM_NAMING_REPLY[] = "NAMING REPLY RESULT=OK NAME=ME VALUE=%s\n"; const char SAM_DATAGRAM_RECEIVED[] = "DATAGRAM RECEIVED DESTINATION=%s SIZE=%lu\n"; const char SAM_NAMING_REPLY_INVALID_KEY[] = "NAMING REPLY RESULT=INVALID_KEY NAME=%s\n"; const char SAM_NAMING_REPLY_KEY_NOT_FOUND[] = "NAMING REPLY RESULT=INVALID_KEY_NOT_FOUND NAME=%s\n"; - const char SAM_PARAM_MIN[] = "MIN"; - const char SAM_PARAM_MAX[] = "MAX"; - const char SAM_PARAM_STYLE[] = "STYLE"; - const char SAM_PARAM_ID[] = "ID"; + const char SAM_PARAM_MIN[] = "MIN"; + const char SAM_PARAM_MAX[] = "MAX"; + const char SAM_PARAM_STYLE[] = "STYLE"; + const char SAM_PARAM_ID[] = "ID"; const char SAM_PARAM_SILENT[] = "SILENT"; - const char SAM_PARAM_DESTINATION[] = "DESTINATION"; + const char SAM_PARAM_DESTINATION[] = "DESTINATION"; const char SAM_PARAM_NAME[] = "NAME"; - const char SAM_PARAM_SIGNATURE_TYPE[] = "SIGNATURE_TYPE"; + const char SAM_PARAM_SIGNATURE_TYPE[] = "SIGNATURE_TYPE"; const char SAM_PARAM_SIZE[] = "SIZE"; - const char SAM_VALUE_TRANSIENT[] = "TRANSIENT"; + const char SAM_VALUE_TRANSIENT[] = "TRANSIENT"; const char SAM_VALUE_STREAM[] = "STREAM"; const char SAM_VALUE_DATAGRAM[] = "DATAGRAM"; - const char SAM_VALUE_RAW[] = "RAW"; - const char SAM_VALUE_TRUE[] = "true"; - const char SAM_VALUE_FALSE[] = "false"; + const char SAM_VALUE_RAW[] = "RAW"; + const char SAM_VALUE_TRUE[] = "true"; + const char SAM_VALUE_FALSE[] = "false"; + const char SAM_VALUE_HOST[] = "HOST"; + const char SAM_VALUE_PORT[] = "PORT"; enum SAMSocketType { @@ -76,8 +79,8 @@ namespace client public: SAMSocket (SAMBridge& owner); - ~SAMSocket (); - void CloseStream (); // TODO: implement it better + ~SAMSocket (); + void CloseStream (); // TODO: implement it better boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; void ReceiveHandshake (); @@ -86,16 +89,16 @@ namespace client private: - void Terminate (); + void Terminate (); void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred); - void SendMessageReply (const char * msg, size_t len, bool close); + void SendMessageReply (const char * msg, size_t len, bool close); void HandleMessageReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred, bool close); void Receive (); void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); - void I2PReceive (); + void I2PReceive (); void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleI2PAccept (std::shared_ptr stream); void HandleWriteI2PData (const boost::system::error_code& ecode); @@ -106,7 +109,8 @@ namespace client void ProcessStreamAccept (char * buf, size_t len); void ProcessDestGenerate (); void ProcessNamingLookup (char * buf, size_t len); - size_t ProcessDatagramSend (char * buf, size_t len, const char * data); // from SAM 1.0 + void SendI2PError(const std::string & msg); + size_t ProcessDatagramSend (char * buf, size_t len, const char * data); // from SAM 1.0 void ExtractParams (char * buf, std::map& params); void Connect (std::shared_ptr remote); @@ -129,12 +133,13 @@ namespace client bool m_IsSilent; std::shared_ptr m_Stream; std::shared_ptr m_Session; - }; + }; struct SAMSession { std::shared_ptr localDestination; std::list > m_Sockets; + std::shared_ptr UDPEndpoint; std::mutex m_SocketsMutex; /** safely add a socket to this session */ @@ -158,8 +163,8 @@ namespace client } return l; } - - SAMSession (std::shared_ptr dest); + + SAMSession (std::shared_ptr dest); ~SAMSession (); void CloseStreams (); @@ -174,13 +179,16 @@ namespace client void Start (); void Stop (); - + boost::asio::io_service& GetService () { return m_Service; }; std::shared_ptr CreateSession (const std::string& id, const std::string& destination, // empty string means transient const std::map * params); void CloseSession (const std::string& id); std::shared_ptr FindSession (const std::string& id) const; + /** send raw data to remote endpoint from our UDP Socket */ + void SendTo(const uint8_t * buf, size_t len, std::shared_ptr remote); + private: void Run (); @@ -194,7 +202,7 @@ namespace client private: bool m_IsRunning; - std::thread * m_Thread; + std::thread * m_Thread; boost::asio::io_service m_Service; boost::asio::ip::tcp::acceptor m_Acceptor; boost::asio::ip::udp::endpoint m_DatagramEndpoint, m_SenderEndpoint; @@ -207,9 +215,8 @@ namespace client // for HTTP const decltype(m_Sessions)& GetSessions () const { return m_Sessions; }; - }; + }; } } #endif - diff --git a/Streaming.cpp b/Streaming.cpp index 343b9b38..551b1ff6 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -11,72 +11,78 @@ namespace i2p { namespace stream { - Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, + Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, std::shared_ptr remote, int port): m_Service (service), - m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), - m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), - m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), - m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), + m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), + m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), + m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0) { RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); m_RemoteIdentity = remote->GetIdentity (); - } + } Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): - m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), - m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), - m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), + m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), + m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0) { RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); } Stream::~Stream () - { - while (!m_ReceiveQueue.empty ()) - { - auto packet = m_ReceiveQueue.front (); - m_ReceiveQueue.pop (); - m_LocalDestination.DeletePacket (packet); - } - - for (auto it: m_SentPackets) - m_LocalDestination.DeletePacket (it); - m_SentPackets.clear (); - - for (auto it: m_SavedPackets) - m_LocalDestination.DeletePacket (it); - m_SavedPackets.clear (); - + { + CleanUp (); LogPrint (eLogDebug, "Streaming: Stream deleted"); - } + } void Stream::Terminate () { m_AckSendTimer.cancel (); m_ReceiveTimer.cancel (); m_ResendTimer.cancel (); - if (m_SendHandler) + if (m_SendHandler) { auto handler = m_SendHandler; m_SendHandler = nullptr; handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted)); } - m_LocalDestination.DeleteStream (shared_from_this ()); - } - + //CleanUp (); /* Need to recheck - broke working on windows */ + m_LocalDestination.DeleteStream (shared_from_this ()); + } + + void Stream::CleanUp () + { + m_SendBuffer.str (""); + while (!m_ReceiveQueue.empty ()) + { + auto packet = m_ReceiveQueue.front (); + m_ReceiveQueue.pop (); + m_LocalDestination.DeletePacket (packet); + } + + for (auto it: m_SentPackets) + m_LocalDestination.DeletePacket (it); + m_SentPackets.clear (); + + for (auto it: m_SavedPackets) + m_LocalDestination.DeletePacket (it); + m_SavedPackets.clear (); + } + void Stream::HandleNextPacket (Packet * packet) { m_NumReceivedBytes += packet->GetLength (); - if (!m_SendStreamID) - m_SendStreamID = packet->GetReceiveStreamID (); + if (!m_SendStreamID) + m_SendStreamID = packet->GetReceiveStreamID (); if (!packet->IsNoAck ()) // ack received ProcessAck (packet); - + int32_t receivedSeqn = packet->GetSeqn (); bool isSyn = packet->IsSYN (); if (!receivedSeqn && !isSyn) @@ -89,13 +95,13 @@ namespace stream LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn, " on sSID=", m_SendStreamID); if (receivedSeqn == m_LastReceivedSequenceNumber + 1) - { + { // we have received next in sequence message ProcessPacket (packet); - + // we should also try stored messages if any for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();) - { + { if ((*it)->GetSeqn () == (uint32_t)(m_LastReceivedSequenceNumber + 1)) { Packet * savedPacket = *it; @@ -119,35 +125,35 @@ namespace stream m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer, shared_from_this (), std::placeholders::_1)); } - } + } else if (isSyn) // we have to send SYN back to incoming connection - SendBuffer (); // also sets m_IsOpen - } - else - { + SendBuffer (); // also sets m_IsOpen + } + else + { if (receivedSeqn <= m_LastReceivedSequenceNumber) { // we have received duplicate LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID); SendQuickAck (); // resend ack for previous message again m_LocalDestination.DeletePacket (packet); // packet dropped - } + } else { LogPrint (eLogWarning, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); // save message and wait for missing message again SavePacket (packet); if (m_LastReceivedSequenceNumber >= 0) - { + { // send NACKs for missing messages ASAP if (m_IsAckSendScheduled) { - m_IsAckSendScheduled = false; + m_IsAckSendScheduled = false; m_AckSendTimer.cancel (); } SendQuickAck (); - } + } else { // wait for SYN @@ -155,16 +161,16 @@ namespace stream m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(ACK_SEND_TIMEOUT)); m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer, shared_from_this (), std::placeholders::_1)); - } - } - } - } + } + } + } + } void Stream::SavePacket (Packet * packet) { if (!m_SavedPackets.insert (packet).second) m_LocalDestination.DeletePacket (packet); - } + } void Stream::ProcessPacket (Packet * packet) { @@ -172,52 +178,52 @@ namespace stream uint32_t receivedSeqn = packet->GetSeqn (); uint16_t flags = packet->GetFlags (); LogPrint (eLogDebug, "Streaming: Process seqn=", receivedSeqn, ", flags=", flags); - + const uint8_t * optionData = packet->GetOptionData (); if (flags & PACKET_FLAG_DELAY_REQUESTED) optionData += 2; - + if (flags & PACKET_FLAG_FROM_INCLUDED) { m_RemoteIdentity = std::make_shared(optionData, packet->GetOptionSize ()); optionData += m_RemoteIdentity->GetFullLen (); if (!m_RemoteLeaseSet) LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), ", sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); - } + } if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) { uint16_t maxPacketSize = bufbe16toh (optionData); LogPrint (eLogDebug, "Streaming: Max packet size ", maxPacketSize); optionData += 2; - } - + } + if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) { - uint8_t signature[256]; + uint8_t signature[256]; auto signatureLen = m_RemoteIdentity->GetSignatureLen (); memcpy (signature, optionData, signatureLen); memset (const_cast(optionData), 0, signatureLen); if (!m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature)) - { + { LogPrint (eLogError, "Streaming: Signature verification failed, sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); Close (); flags |= PACKET_FLAG_CLOSE; - } + } memcpy (const_cast(optionData), signature, signatureLen); optionData += signatureLen; - } + } packet->offset = packet->GetPayload () - packet->buf; if (packet->GetLength () > 0) - { + { m_ReceiveQueue.push (packet); m_ReceiveTimer.cancel (); - } + } else m_LocalDestination.DeletePacket (packet); - + m_LastReceivedSequenceNumber = receivedSeqn; if (flags & PACKET_FLAG_RESET) @@ -231,9 +237,9 @@ namespace stream if (m_Status != eStreamStatusClosed) SendClose (); m_Status = eStreamStatusClosed; - Terminate (); + Terminate (); } - } + } void Stream::ProcessAck (Packet * packet) { @@ -244,7 +250,7 @@ namespace stream { LogPrint (eLogError, "Streaming: Unexpected ackThrough=", ackThrough, " > seqn=", m_SequenceNumber); return; - } + } int nackCount = packet->GetNACKCount (); for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) { @@ -265,7 +271,7 @@ namespace stream LogPrint (eLogDebug, "Streaming: Packet ", seqn, " NACK"); ++it; continue; - } + } } auto sentPacket = *it; uint64_t rtt = ts - sentPacket->sendTime; @@ -278,7 +284,7 @@ namespace stream m_RTO = m_RTT*1.5; // TODO: implement it better LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); - m_LocalDestination.DeletePacket (sentPacket); + m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; if (m_WindowSize < WINDOW_SIZE) m_WindowSize++; // slow start @@ -306,13 +312,13 @@ namespace stream { m_NumResendAttempts = 0; SendBuffer (); - } + } if (m_Status == eStreamStatusClosed) Terminate (); else if (m_Status == eStreamStatusClosing) Close (); // check is all outgoing messages have been sent and we can send close - } - + } + size_t Stream::Send (const uint8_t * buf, size_t len) { if (len > 0 && buf) @@ -320,14 +326,14 @@ namespace stream std::unique_lock l(m_SendBufferMutex); m_SendBuffer.clear (); m_SendBuffer.write ((const char *)buf, len); - } + } m_Service.post (std::bind (&Stream::SendBuffer, shared_from_this ())); return len; - } + } void Stream::AsyncSend (const uint8_t * buf, size_t len, SendHandler handler) { - if (m_SendHandler) + if (m_SendHandler) handler (boost::asio::error::make_error_code (boost::asio::error::in_progress)); else m_SendHandler = handler; @@ -335,10 +341,10 @@ namespace stream } void Stream::SendBuffer () - { + { int numMsgs = m_WindowSize - m_SentPackets.size (); - if (numMsgs <= 0) return; // window is full - + if (numMsgs <= 0) return; // window is full + bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet std::vector packets; { @@ -355,20 +361,20 @@ namespace stream size += 4; // receiveStreamID htobe32buf (packet + size, m_SequenceNumber++); size += 4; // sequenceNum - if (isNoAck) + if (isNoAck) htobuf32 (packet + size, 0); else htobe32buf (packet + size, m_LastReceivedSequenceNumber); size += 4; // ack Through - packet[size] = 0; + packet[size] = 0; size++; // NACK count packet[size] = m_RTO/1000; size++; // resend delay if (m_Status == eStreamStatusNew) - { + { // initial packet m_Status = eStreamStatusOpen; - uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | + uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED; if (isNoAck) flags |= PACKET_FLAG_NO_ACK; htobe16buf (packet + size, flags); @@ -377,7 +383,7 @@ namespace stream size_t signatureLen = m_LocalDestination.GetOwner ()->GetIdentity ()->GetSignatureLen (); htobe16buf (packet + size, identityLen + signatureLen + 2); // identity + signature + packet size size += 2; // options size - m_LocalDestination.GetOwner ()->GetIdentity ()->ToBuffer (packet + size, identityLen); + m_LocalDestination.GetOwner ()->GetIdentity ()->ToBuffer (packet + size, identityLen); size += identityLen; // from htobe16buf (packet + size, STREAMING_MTU); size += 2; // max packet size @@ -387,7 +393,7 @@ namespace stream m_SendBuffer.read ((char *)(packet + size), STREAMING_MTU - size); size += m_SendBuffer.gcount (); // payload m_LocalDestination.GetOwner ()->Sign (packet, size, signature); - } + } else { // follow on packet @@ -395,9 +401,9 @@ namespace stream size += 2; // flags htobuf16 (packet + size, 0); // no options size += 2; // options size - m_SendBuffer.read((char *)(packet + size), STREAMING_MTU - size); + m_SendBuffer.read((char *)(packet + size), STREAMING_MTU - size); size += m_SendBuffer.gcount (); // payload - } + } p->len = size; packets.push_back (p); numMsgs--; @@ -407,14 +413,14 @@ namespace stream m_SendHandler (boost::system::error_code ()); m_SendHandler = nullptr; } - } + } if (packets.size () > 0) { if (m_SavedPackets.empty ()) // no NACKS - { - m_IsAckSendScheduled = false; + { + m_IsAckSendScheduled = false; m_AckSendTimer.cancel (); - } + } bool isEmpty = m_SentPackets.empty (); auto ts = i2p::util::GetMillisecondsSinceEpoch (); for (auto& it: packets) @@ -427,9 +433,9 @@ namespace stream SendClose (); if (isEmpty) ScheduleResend (); - } + } } - + void Stream::SendQuickAck () { int32_t lastReceivedSeqn = m_LastReceivedSequenceNumber; @@ -437,15 +443,15 @@ namespace stream { int32_t seqn = (*m_SavedPackets.rbegin ())->GetSeqn (); if (seqn > lastReceivedSeqn) lastReceivedSeqn = seqn; - } - if (lastReceivedSeqn < 0) - { + } + if (lastReceivedSeqn < 0) + { LogPrint (eLogError, "Streaming: No packets have been received yet"); return; } - + Packet p; - uint8_t * packet = p.GetBuffer (); + uint8_t * packet = p.GetBuffer (); size_t size = 0; htobe32buf (packet + size, m_SendStreamID); size += 4; // sendStreamID @@ -456,8 +462,8 @@ namespace stream htobe32buf (packet + size, lastReceivedSeqn); size += 4; // ack Through uint8_t numNacks = 0; - if (lastReceivedSeqn > m_LastReceivedSequenceNumber) - { + if (lastReceivedSeqn > m_LastReceivedSequenceNumber) + { // fill NACKs uint8_t * nacks = packet + size + 1; auto nextSeqn = m_LastReceivedSequenceNumber + 1; @@ -469,35 +475,35 @@ namespace stream LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); htobe32buf (packet + 12, nextSeqn); // change ack Through break; - } + } for (uint32_t i = nextSeqn; i < seqn; i++) { htobe32buf (nacks, i); nacks += 4; numNacks++; - } + } nextSeqn = seqn + 1; } - packet[size] = numNacks; - size++; // NACK count + packet[size] = numNacks; + size++; // NACK count size += numNacks*4; // NACKs - } + } else { // No NACKs - packet[size] = 0; - size++; // NACK count - } + packet[size] = 0; + size++; // NACK count + } size++; // resend delay htobuf16 (packet + size, 0); // nof flags set size += 2; // flags htobuf16 (packet + size, 0); // no options size += 2; // options size - p.len = size; + p.len = size; SendPackets (std::vector { &p }); LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs"); - } + } void Stream::Close () { @@ -512,7 +518,7 @@ namespace stream break; case eStreamStatusReset: // TODO: send reset - Terminate (); + Terminate (); break; case eStreamStatusClosing: if (m_SentPackets.empty () && m_SendBuffer.eof ()) // nothing to send @@ -524,10 +530,10 @@ namespace stream case eStreamStatusClosed: // already closed Terminate (); - break; + break; default: LogPrint (eLogWarning, "Streaming: Unexpected stream status ", (int)m_Status, "sSID=", m_SendStreamID); - }; + }; } void Stream::SendClose () @@ -543,7 +549,7 @@ namespace stream size += 4; // sequenceNum htobe32buf (packet + size, m_LastReceivedSequenceNumber >= 0 ? m_LastReceivedSequenceNumber : 0); size += 4; // ack Through - packet[size] = 0; + packet[size] = 0; size++; // NACK count size++; // resend delay htobe16buf (packet + size, PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED); @@ -555,12 +561,12 @@ namespace stream memset (packet + size, 0, signatureLen); size += signatureLen; // signature m_LocalDestination.GetOwner ()->Sign (packet, size, signature); - + p->len = size; m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID); - } - + } + size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) { size_t pos = 0; @@ -575,18 +581,18 @@ namespace stream { m_ReceiveQueue.pop (); m_LocalDestination.DeletePacket (packet); - } - } - return pos; + } + } + return pos; } bool Stream::SendPacket (Packet * packet) { if (packet) - { + { if (m_IsAckSendScheduled) { - m_IsAckSendScheduled = false; + m_IsAckSendScheduled = false; m_AckSendTimer.cancel (); } SendPackets (std::vector { packet }); @@ -594,17 +600,17 @@ namespace stream m_SentPackets.insert (packet); if (isEmpty) ScheduleResend (); - return true; - } + return true; + } else return false; - } - + } + void Stream::SendPackets (const std::vector& packets) { if (!m_RemoteLeaseSet) { - UpdateCurrentRemoteLease (); + UpdateCurrentRemoteLease (); if (!m_RemoteLeaseSet) { LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet, sSID=", m_SendStreamID); @@ -612,7 +618,7 @@ namespace stream } } if (!m_RoutingSession || !m_RoutingSession->GetOwner ()) // expired and detached - m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true); + m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true); if (!m_CurrentOutboundTunnel && m_RoutingSession) // first message to send { // try to get shared path first @@ -624,7 +630,7 @@ namespace stream m_RTT = routingPath->rtt; m_RTO = m_RTT*1.5; // TODO: implement it better } - } + } if (!m_CurrentOutboundTunnel || !m_CurrentOutboundTunnel->IsEstablished ()) m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); if (!m_CurrentOutboundTunnel) @@ -633,30 +639,30 @@ namespace stream return; } - auto ts = i2p::util::GetMillisecondsSinceEpoch (); + auto ts = i2p::util::GetMillisecondsSinceEpoch (); if (!m_CurrentRemoteLease || !m_CurrentRemoteLease->endDate || // excluded from LeaseSet ts >= m_CurrentRemoteLease->endDate - i2p::data::LEASE_ENDDATE_THRESHOLD) UpdateCurrentRemoteLease (true); if (m_CurrentRemoteLease && ts < m_CurrentRemoteLease->endDate + i2p::data::LEASE_ENDDATE_THRESHOLD) - { + { std::vector msgs; for (auto it: packets) - { + { auto msg = m_RoutingSession->WrapSingleMessage (m_LocalDestination.CreateDataMessage (it->GetBuffer (), it->GetLength (), m_Port)); - msgs.push_back (i2p::tunnel::TunnelMessageBlock - { + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { i2p::tunnel::eDeliveryTypeTunnel, m_CurrentRemoteLease->tunnelGateway, m_CurrentRemoteLease->tunnelID, msg - }); + }); m_NumSentBytes += it->GetLength (); } m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs); - } + } else - { + { LogPrint (eLogWarning, "Streaming: Remote lease is not available, sSID=", m_SendStreamID); - if (m_RoutingSession) + if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); // invalidate routing path } } @@ -664,8 +670,8 @@ namespace stream void Stream::SendUpdatedLeaseSet () { if (m_RoutingSession) - { - if (m_RoutingSession->IsLeaseSetNonConfirmed ()) + { + if (m_RoutingSession->IsLeaseSetNonConfirmed ()) { auto ts = i2p::util::GetMillisecondsSinceEpoch (); if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASET_CONFIRMATION_TIMEOUT) @@ -673,19 +679,19 @@ namespace stream // LeaseSet was not confirmed, should try other tunnels LogPrint (eLogWarning, "Streaming: LeaseSet was not confrimed in ", i2p::garlic::LEASET_CONFIRMATION_TIMEOUT, " milliseconds. Trying to resubmit"); m_RoutingSession->SetSharedRoutingPath (nullptr); - m_CurrentOutboundTunnel = nullptr; + m_CurrentOutboundTunnel = nullptr; m_CurrentRemoteLease = nullptr; SendQuickAck (); - } - } + } + } else if (m_RoutingSession->IsLeaseSetUpdated ()) - { + { LogPrint (eLogDebug, "Streaming: sending updated LeaseSet"); SendQuickAck (); - } - } - } - + } + } + } + void Stream::ScheduleResend () { m_ResendTimer.cancel (); @@ -695,11 +701,11 @@ namespace stream m_ResendTimer.async_wait (std::bind (&Stream::HandleResendTimer, shared_from_this (), std::placeholders::_1)); } - + void Stream::HandleResendTimer (const boost::system::error_code& ecode) { - if (ecode != boost::asio::error::operation_aborted) - { + if (ecode != boost::asio::error::operation_aborted) + { // check for resend attempts if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS) { @@ -707,7 +713,7 @@ namespace stream m_Status = eStreamStatusReset; Close (); return; - } + } // collect packets to resend auto ts = i2p::util::GetMillisecondsSinceEpoch (); @@ -718,8 +724,8 @@ namespace stream { it->sendTime = ts; packets.push_back (it); - } - } + } + } // select tunnels if necessary and send if (packets.size () > 0) @@ -727,7 +733,7 @@ namespace stream m_NumResendAttempts++; m_RTO *= 2; switch (m_NumResendAttempts) - { + { case 1: // congesion avoidance m_WindowSize /= 2; if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; @@ -739,21 +745,21 @@ namespace stream if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); UpdateCurrentRemoteLease (); // pick another lease LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID); - break; + break; case 3: - // pick another outbound tunnel + // pick another outbound tunnel if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); - m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); + m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); break; - default: ; - } + default: ; + } SendPackets (packets); - } + } ScheduleResend (); - } - } - + } + } + void Stream::HandleAckSendTimer (const boost::system::error_code& ecode) { if (m_IsAckSendScheduled) @@ -764,10 +770,10 @@ namespace stream m_Status = eStreamStatusReset; Close (); return; - } + } if (m_Status == eStreamStatusOpen) { - if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ()) + if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ()) { // seems something went wrong and we should re-select tunnels m_CurrentOutboundTunnel = nullptr; @@ -776,19 +782,19 @@ namespace stream SendQuickAck (); } m_IsAckSendScheduled = false; - } + } } void Stream::UpdateCurrentRemoteLease (bool expired) { - if (!m_RemoteLeaseSet || m_RemoteLeaseSet->IsExpired ()) + if (!m_RemoteLeaseSet || m_RemoteLeaseSet->IsExpired ()) { m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ()); - if (!m_RemoteLeaseSet) - { + if (!m_RemoteLeaseSet) + { LogPrint (eLogWarning, "Streaming: LeaseSet ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " not found"); m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash ()); // try to request for a next attempt - } + } } if (m_RemoteLeaseSet) { @@ -799,11 +805,11 @@ namespace stream { expired = false; m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash ()); // time to request - leases = m_RemoteLeaseSet->GetNonExpiredLeases (true); // then with threshold + leases = m_RemoteLeaseSet->GetNonExpiredLeases (true); // then with threshold } if (!leases.empty ()) - { - bool updated = false; + { + bool updated = false; if (expired && m_CurrentRemoteLease) { for (const auto& it: leases) @@ -812,34 +818,34 @@ namespace stream m_CurrentRemoteLease = it; updated = true; break; - } + } } if (!updated) { uint32_t i = rand () % leases.size (); if (m_CurrentRemoteLease && leases[i]->tunnelID == m_CurrentRemoteLease->tunnelID) - // make sure we don't select previous + // make sure we don't select previous i = (i + 1) % leases.size (); // if so, pick next - m_CurrentRemoteLease = leases[i]; + m_CurrentRemoteLease = leases[i]; } - } + } else - { + { LogPrint (eLogWarning, "Streaming: All remote leases are expired"); m_RemoteLeaseSet = nullptr; m_CurrentRemoteLease = nullptr; // we have requested expired before, no need to do it twice - } + } } else { LogPrint (eLogWarning, "Streaming: Remote LeaseSet not found"); m_CurrentRemoteLease = nullptr; - } - } + } + } - StreamingDestination::StreamingDestination (std::shared_ptr owner, uint16_t localPort, bool gzip): - m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), + StreamingDestination::StreamingDestination (std::shared_ptr owner, uint16_t localPort, bool gzip): + m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), m_LastIncomingReceiveStreamID (0), m_PendingIncomingTimer (m_Owner->GetService ()), m_ConnTrackTimer(m_Owner->GetService()), @@ -847,13 +853,13 @@ namespace stream m_LastBanClear(i2p::util::GetMillisecondsSinceEpoch()) { } - + StreamingDestination::~StreamingDestination () { for (auto& it: m_SavedPackets) { for (auto it1: it.second) DeletePacket (it1); - it.second.clear (); + it.second.clear (); } m_SavedPackets.clear (); } @@ -862,9 +868,9 @@ namespace stream { ScheduleConnTrack(); } - + void StreamingDestination::Stop () - { + { ResetAcceptor (); m_PendingIncomingTimer.cancel (); m_PendingIncomingStreams.clear (); @@ -877,34 +883,34 @@ namespace stream std::unique_lock l(m_ConnsMutex); m_Conns.clear (); } - } - + } + void StreamingDestination::HandleNextPacket (Packet * packet) { uint32_t sendStreamID = packet->GetSendStreamID (); if (sendStreamID) - { + { auto it = m_Streams.find (sendStreamID); if (it != m_Streams.end ()) it->second->HandleNextPacket (packet); else - { - LogPrint (eLogError, "Streaming: Unknown stream sSID=", sendStreamID); + { + LogPrint (eLogInfo, "Streaming: Unknown stream sSID=", sendStreamID); DeletePacket (packet); } - } - else + } + else { if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream - { + { uint32_t receiveStreamID = packet->GetReceiveStreamID (); - if (receiveStreamID == m_LastIncomingReceiveStreamID) + if (receiveStreamID == m_LastIncomingReceiveStreamID) { // already pending LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists"); DeletePacket (packet); // drop it, because previous should be connected return; - } + } auto incomingStream = CreateNewIncomingStream (); incomingStream->HandleNextPacket (packet); // SYN auto ident = incomingStream->GetRemoteIdentity(); @@ -920,7 +926,7 @@ namespace stream } } m_LastIncomingReceiveStreamID = receiveStreamID; - + // handle saved packets if any { auto it = m_SavedPackets.find (receiveStreamID); @@ -930,7 +936,7 @@ namespace stream for (auto it1: it->second) incomingStream->HandleNextPacket (it1); m_SavedPackets.erase (it); - } + } } // accept if (m_Acceptor != nullptr) @@ -943,17 +949,17 @@ namespace stream m_PendingIncomingStreams.push_back (incomingStream); m_PendingIncomingTimer.cancel (); m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); - m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, + m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, shared_from_this (), std::placeholders::_1)); LogPrint (eLogDebug, "Streaming: Pending incoming stream added, rSID=", receiveStreamID); } else - { + { LogPrint (eLogWarning, "Streaming: Pending incoming streams backlog exceeds ", MAX_PENDING_INCOMING_BACKLOG); incomingStream->Close (); - } - } - } + } + } + } else // follow on packet without SYN { uint32_t receiveStreamID = packet->GetReceiveStreamID (); @@ -988,17 +994,17 @@ namespace stream } }); } - } - } - } - + } + } + } + std::shared_ptr StreamingDestination::CreateNewOutgoingStream (std::shared_ptr remote, int port) { auto s = std::make_shared (m_Owner->GetService (), *this, remote, port); std::unique_lock l(m_StreamsMutex); m_Streams[s->GetRecvStreamID ()] = s; return s; - } + } std::shared_ptr StreamingDestination::CreateNewIncomingStream () { @@ -1011,39 +1017,39 @@ namespace stream void StreamingDestination::DeleteStream (std::shared_ptr stream) { if (stream) - { + { std::unique_lock l(m_StreamsMutex); auto it = m_Streams.find (stream->GetRecvStreamID ()); if (it != m_Streams.end ()) m_Streams.erase (it); - } - } + } + } - void StreamingDestination::SetAcceptor (const Acceptor& acceptor) - { + void StreamingDestination::SetAcceptor (const Acceptor& acceptor) + { m_Acceptor = acceptor; // we must set it immediately for IsAcceptorSet auto s = shared_from_this (); m_Owner->GetService ().post([s](void) - { + { // take care about incoming queue for (auto& it: s->m_PendingIncomingStreams) if (it->GetStatus () == eStreamStatusOpen) // still open? s->m_Acceptor (it); s->m_PendingIncomingStreams.clear (); s->m_PendingIncomingTimer.cancel (); - }); + }); } - void StreamingDestination::ResetAcceptor () - { - if (m_Acceptor) m_Acceptor (nullptr); - m_Acceptor = nullptr; + void StreamingDestination::ResetAcceptor () + { + if (m_Acceptor) m_Acceptor (nullptr); + m_Acceptor = nullptr; } void StreamingDestination::AcceptOnce (const Acceptor& acceptor) { m_Owner->GetService ().post([acceptor, this](void) - { + { if (!m_PendingIncomingStreams.empty ()) { acceptor (m_PendingIncomingStreams.front ()); @@ -1053,14 +1059,14 @@ namespace stream } else // we must save old acceptor and set it back { - auto oldAcceptor = m_Acceptor; + auto oldAcceptor = m_Acceptor; m_Acceptor = [acceptor, oldAcceptor, this](std::shared_ptr stream) { acceptor (stream); m_Acceptor = oldAcceptor; }; } - }); + }); } void StreamingDestination::HandlePendingIncomingTimer (const boost::system::error_code& ecode) @@ -1071,9 +1077,9 @@ namespace stream for (auto& it: m_PendingIncomingStreams) it->Close (); m_PendingIncomingStreams.clear (); - } - } - + } + } + void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len) { // unzip it @@ -1081,7 +1087,7 @@ namespace stream uncompressed->offset = 0; uncompressed->len = m_Inflator.Inflate (buf, len, uncompressed->buf, MAX_PACKET_SIZE); if (uncompressed->len) - HandleNextPacket (uncompressed); + HandleNextPacket (uncompressed); else DeletePacket (uncompressed); } @@ -1101,11 +1107,11 @@ namespace stream { htobe32buf (msg->GetPayload (), size); // length htobe16buf (buf + 4, m_LocalPort); // source port - htobe16buf (buf + 6, toPort); // destination port + htobe16buf (buf + 6, toPort); // destination port buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol - msg->len += size; + msg->len += size; msg->FillI2NPMessageHeader (eI2NPData); - } + } else msg = nullptr; return msg; @@ -1158,15 +1164,15 @@ namespace stream } // reschedule timer ScheduleConnTrack(); - } + } } void StreamingDestination::ScheduleConnTrack() { m_ConnTrackTimer.expires_from_now (boost::posix_time::seconds(60)); m_ConnTrackTimer.async_wait ( - std::bind (&StreamingDestination::HandleConnTrack, + std::bind (&StreamingDestination::HandleConnTrack, shared_from_this (), std::placeholders::_1)); } -} -} +} +} diff --git a/Streaming.h b/Streaming.h index 828fccc7..bfea74ed 100644 --- a/Streaming.h +++ b/Streaming.h @@ -158,6 +158,8 @@ namespace stream private: + void CleanUp (); + void SendBuffer (); void SendQuickAck (); void SendClose (); diff --git a/TransitTunnel.h b/TransitTunnel.h index 2a1908df..eec244ce 100644 --- a/TransitTunnel.h +++ b/TransitTunnel.h @@ -85,6 +85,8 @@ namespace tunnel TransitTunnel (receiveTunnelID, nextIdent, nextTunnelID, layerKey, ivKey), m_Endpoint (false) {}; // transit endpoint is always outbound + void Cleanup () { m_Endpoint.Cleanup (); } + void HandleTunnelDataMsg (std::shared_ptr tunnelMsg); size_t GetNumTransmittedBytes () const { return m_Endpoint.GetNumReceivedBytes (); } diff --git a/Transports.cpp b/Transports.cpp index 1659eeed..b0a19379 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -101,8 +101,9 @@ namespace transport void DHKeysPairSupplier::Return (std::shared_ptr pair) { - std::unique_lock l(m_AcquiredMutex); - m_Queue.push (pair); + std::unique_lockl(m_AcquiredMutex); + if ((int)m_Queue.size () < 2*m_QueueSize) + m_Queue.push (pair); } Transports transports; @@ -417,7 +418,7 @@ namespace transport } else { - LogPrint (eLogError, "Transports: RouterInfo not found, Failed to send messages"); + LogPrint (eLogWarning, "Transports: RouterInfo not found, Failed to send messages"); std::unique_lock l(m_PeersMutex); m_Peers.erase (it); } diff --git a/android/jni/Android.mk b/android/jni/Android.mk index 5eaa983e..5b77c66e 100755 --- a/android/jni/Android.mk +++ b/android/jni/Android.mk @@ -4,10 +4,10 @@ LOCAL_MODULE := i2pd LOCAL_CPP_FEATURES := rtti exceptions LOCAL_C_INCLUDES += $(IFADDRS_PATH) ../.. LOCAL_STATIC_LIBRARIES := \ - boost_system-gcc-mt-1_53 \ - boost_date_time-gcc-mt-1_53 \ - boost_filesystem-gcc-mt-1_53 \ - boost_program_options-gcc-mt-1_53 \ + boost_system \ + boost_date_time \ + boost_filesystem \ + boost_program_options \ crypto ssl \ miniupnpc LOCAL_LDLIBS := -lz @@ -68,44 +68,44 @@ include $(BUILD_SHARED_LIBRARY) LOCAL_PATH := $(call my-dir) include $(CLEAR_VARS) -LOCAL_MODULE := boost_system-gcc-mt-1_53 -LOCAL_SRC_FILES := $(BOOST_PATH)/boost_1_53_0/$(TARGET_ARCH_ABI)/lib/libboost_system-gcc-mt-1_53.a -LOCAL_EXPORT_C_INCLUDES := $(BOOST_PATH)/boost_1_53_0/include +LOCAL_MODULE := boost_system +LOCAL_SRC_FILES := $(BOOST_PATH)/boost_1_62_0/$(TARGET_ARCH_ABI)/lib/libboost_system.a +LOCAL_EXPORT_C_INCLUDES := $(BOOST_PATH)/boost_1_62_0/include include $(PREBUILT_STATIC_LIBRARY) LOCAL_PATH := $(call my-dir) include $(CLEAR_VARS) -LOCAL_MODULE := boost_date_time-gcc-mt-1_53 -LOCAL_SRC_FILES := $(BOOST_PATH)/boost_1_53_0/$(TARGET_ARCH_ABI)/lib/libboost_date_time-gcc-mt-1_53.a -LOCAL_EXPORT_C_INCLUDES := $(BOOST_PATH)/boost_1_53_0/include +LOCAL_MODULE := boost_date_time +LOCAL_SRC_FILES := $(BOOST_PATH)/boost_1_62_0/$(TARGET_ARCH_ABI)/lib/libboost_date_time.a +LOCAL_EXPORT_C_INCLUDES := $(BOOST_PATH)/boost_1_62_0/include include $(PREBUILT_STATIC_LIBRARY) LOCAL_PATH := $(call my-dir) include $(CLEAR_VARS) -LOCAL_MODULE := boost_filesystem-gcc-mt-1_53 -LOCAL_SRC_FILES := $(BOOST_PATH)/boost_1_53_0/$(TARGET_ARCH_ABI)/lib/libboost_filesystem-gcc-mt-1_53.a -LOCAL_EXPORT_C_INCLUDES := $(BOOST_PATH)/boost_1_53_0/include +LOCAL_MODULE := boost_filesystem +LOCAL_SRC_FILES := $(BOOST_PATH)/boost_1_62_0/$(TARGET_ARCH_ABI)/lib/libboost_filesystem.a +LOCAL_EXPORT_C_INCLUDES := $(BOOST_PATH)/boost_1_62_0/include include $(PREBUILT_STATIC_LIBRARY) LOCAL_PATH := $(call my-dir) include $(CLEAR_VARS) -LOCAL_MODULE := boost_program_options-gcc-mt-1_53 -LOCAL_SRC_FILES := $(BOOST_PATH)/boost_1_53_0/$(TARGET_ARCH_ABI)/lib/libboost_program_options-gcc-mt-1_53.a -LOCAL_EXPORT_C_INCLUDES := $(BOOST_PATH)/boost_1_53_0/include +LOCAL_MODULE := boost_program_options +LOCAL_SRC_FILES := $(BOOST_PATH)/boost_1_62_0/$(TARGET_ARCH_ABI)/lib/libboost_program_options.a +LOCAL_EXPORT_C_INCLUDES := $(BOOST_PATH)/boost_1_62_0/include include $(PREBUILT_STATIC_LIBRARY) LOCAL_PATH := $(call my-dir) include $(CLEAR_VARS) LOCAL_MODULE := crypto -LOCAL_SRC_FILES := $(OPENSSL_PATH)/openssl-1.0.2/$(TARGET_ARCH_ABI)/lib/libcrypto.a -LOCAL_EXPORT_C_INCLUDES := $(OPENSSL_PATH)/openssl-1.0.2/include +LOCAL_SRC_FILES := $(OPENSSL_PATH)/openssl-1.1.0/$(TARGET_ARCH_ABI)/lib/libcrypto.a +LOCAL_EXPORT_C_INCLUDES := $(OPENSSL_PATH)/openssl-1.1.0/include include $(PREBUILT_STATIC_LIBRARY) LOCAL_PATH := $(call my-dir) include $(CLEAR_VARS) LOCAL_MODULE := ssl -LOCAL_SRC_FILES := $(OPENSSL_PATH)/openssl-1.0.2/$(TARGET_ARCH_ABI)/lib/libssl.a -LOCAL_EXPORT_C_INCLUDES := $(OPENSSL_PATH)/openssl-1.0.2/include +LOCAL_SRC_FILES := $(OPENSSL_PATH)/openssl-1.1.0/$(TARGET_ARCH_ABI)/lib/libssl.a +LOCAL_EXPORT_C_INCLUDES := $(OPENSSL_PATH)/openssl-1.1.0/include LOCAL_STATIC_LIBRARIES := crypto include $(PREBUILT_STATIC_LIBRARY) diff --git a/android/res/drawable/itoopie_notification_icon.png b/android/res/drawable/itoopie_notification_icon.png index d055bc8f..fa99e7fc 100644 Binary files a/android/res/drawable/itoopie_notification_icon.png and b/android/res/drawable/itoopie_notification_icon.png differ diff --git a/contrib/.gitignore b/contrib/.gitignore new file mode 100644 index 00000000..d736e035 --- /dev/null +++ b/contrib/.gitignore @@ -0,0 +1,2 @@ +i2pd*.zip +build*.log \ No newline at end of file diff --git a/contrib/build_mingw.cmd b/contrib/build_mingw.cmd new file mode 100644 index 00000000..b5ac0d69 --- /dev/null +++ b/contrib/build_mingw.cmd @@ -0,0 +1,22 @@ +@echo off +title ઠ i2pd + +set "WD=C:\msys64" +set CHERE_INVOKING=enabled_from_arguments +set MSYSCON=mintty.exe + +echo ઠ i2pd win32. Enter ᫥ 砭 樨... +set "MSYSTEM=MINGW32" +set "CONTITLE=MinGW x32" +start "%CONTITLE%" /WAIT C:\msys64\usr\bin\mintty.exe -i /msys2.ico /usr/bin/bash --login build_mingw.sh +pause + +echo ઠ i2pd win64. Enter ᫥ 砭 樨... +set "MSYSTEM=MINGW64" +set "CONTITLE=MinGW x64" +start "%CONTITLE%" /WAIT C:\msys64\usr\bin\mintty.exe -i /msys2.ico /usr/bin/bash --login build_mingw.sh +pause + +echo ઠ 襭... +pause +exit /b 0 \ No newline at end of file diff --git a/contrib/build_mingw.sh b/contrib/build_mingw.sh new file mode 100644 index 00000000..ec4b975b --- /dev/null +++ b/contrib/build_mingw.sh @@ -0,0 +1,84 @@ +#!/bin/sh + +# Определяем архитектуру. +if [ $MSYSTEM == MINGW64 ]; then + export arch="win64" +elif [ $MSYSTEM == MINGW32 ]; then + export arch="win32" +else + echo "Не могу понять, какая у вас архитектура, используемая для сборки."; + echo "Вы точно запустили скрипт в оболочке MSYS2 MinGW [64/32]-bit ?"; + echo "Обычно её можно запустить выполнив c:\msys64\mingw64.exe или c:\msys64\mingw32.exe"; + exit 1; +fi; + +# Задаём переменной contrib текущий путь и переходим на уровень выше. +export contrib=$PWD +cd .. + +# Очистка от предыдущей сборки (на всякий случай =) ). +make clean >> /dev/null + +# Обновляем репозиторий, и получаем хеш последнего коммита. +echo "Получаем обновления из репозитория."; +git pull +if [ "$?" != 0 ]; then + echo "Не удалось обновить локальный репозиторий."; + echo "Вы точно запустили скрипт в папке репозитория?"; + exit 1; +fi; + +export commit=$(git rev-parse --verify HEAD | cut -c -7) +if [ -z commit ]; then + echo "Не удалось получить хеш последнего коммита."; + echo "Вы точно запустили скрипт в папке репозитория?"; + exit 1; +fi; + +# Получаем версию приложения +export version=$(grep -E "I2PD_VERSION_(MAJOR|MINOR|MICRO)\ " version.h | grep -oE '[^ ]+$' | tr '\n' '.'|head -c -1) + +# Получаем количество ядер, и уменьшаем количество потоков на 1 от количества ядер (если их больше чем 1). +if [ $NUMBER_OF_PROCESSORS -ge 2 ]; then + export threads=$(( $NUMBER_OF_PROCESSORS - 1 )) +else + export threads=$NUMBER_OF_PROCESSORS +fi; + +echo "Собираем i2pd ${version} (коммит ${commit}) для ${arch}."; + +# Собираем приложение с разными параметрами, и архивируем в zip архивы. +make USE_UPNP=yes USE_AVX=1 USE_AESNI=1 -j ${threads} > ${contrib}/build_avx_aesni.log 2>&1 +if [ "$?" != 0 ]; then + echo "Сборка не удалась. Смотрите в build_avx_aesni.log"; + exit 1; +fi; +zip -9 ${contrib}/i2pd_${version}_${commit}_${arch}_mingw_avx_aesni.zip i2pd.exe >> /dev/null +make clean >> /dev/null + +make USE_UPNP=yes USE_AVX=1 -j ${threads} > ${contrib}/build_avx.log 2>&1 +if [ "$?" != 0 ]; then + echo "Сборка не удалась. Смотрите в build_avx.log."; + exit 1; +fi; +zip -9 ${contrib}/i2pd_${version}_${commit}_${arch}_mingw_avx.zip i2pd.exe >> /dev/null +make clean >> /dev/null + +make USE_UPNP=yes USE_AESNI=1 -j ${threads} > ${contrib}/build_aesni.log 2>&1 +if [ "$?" != 0 ]; then + echo "Сборка не удалась. Смотрите в build_aesni.log"; + exit 1; +fi; +zip -9 ${contrib}/i2pd_${version}_${commit}_${arch}_mingw_aesni.zip i2pd.exe >> /dev/null +make clean >> /dev/null + +make USE_UPNP=yes -j ${threads} > ${contrib}/build.log 2>&1 +if [ "$?" != 0 ]; then + echo "Сборка не удалась. Смотрите в build.log"; + exit 1; +fi; +zip -9 ${contrib}/i2pd_${version}_${commit}_${arch}_mingw.zip i2pd.exe >> /dev/null +make clean >> /dev/null + +echo "Сборка i2pd ${version} для ${arch} завершена."; +exit 0; diff --git a/contrib/debian/README b/contrib/debian/README index 077d7b96..cccbc4de 100644 --- a/contrib/debian/README +++ b/contrib/debian/README @@ -1,2 +1,2 @@ This forder contain systemd unit files. -To use systemd daemon control, place files from this directory to debian folder. +To use systemd daemon control, place files from this directory to debian folder before building package. diff --git a/debian/i2pd.init b/debian/i2pd.init index ca403598..e4ed01e1 100644 --- a/debian/i2pd.init +++ b/debian/i2pd.init @@ -53,7 +53,7 @@ do_start() || return 1 start-stop-daemon --start --quiet --pidfile $PIDFILE --exec $DAEMON --chuid "$USER" -- \ --service --daemon --log=file --logfile=$LOGFILE --conf=$I2PCONF --tunconf=$TUNCONF \ - $DAEMON_OPTS > /dev/null 2>&1 \ + --pidfile=$PIDFILE $DAEMON_OPTS > /dev/null 2>&1 \ || return 2 return $? } diff --git a/qt/i2pd_qt/android/res/drawable-hdpi/icon.png b/qt/i2pd_qt/android/res/drawable-hdpi/icon.png index a5dc7b68..9a2f7404 100644 Binary files a/qt/i2pd_qt/android/res/drawable-hdpi/icon.png and b/qt/i2pd_qt/android/res/drawable-hdpi/icon.png differ diff --git a/qt/i2pd_qt/android/res/drawable/itoopie_notification_icon.png b/qt/i2pd_qt/android/res/drawable/itoopie_notification_icon.png index 8fbe2468..fa99e7fc 100644 Binary files a/qt/i2pd_qt/android/res/drawable/itoopie_notification_icon.png and b/qt/i2pd_qt/android/res/drawable/itoopie_notification_icon.png differ diff --git a/qt/i2pd_qt/i2pd_qt.pro b/qt/i2pd_qt/i2pd_qt.pro index e51eb963..36883399 100644 --- a/qt/i2pd_qt/i2pd_qt.pro +++ b/qt/i2pd_qt/i2pd_qt.pro @@ -36,7 +36,7 @@ SOURCES += DaemonQT.cpp mainwindow.cpp \ ../../SSUData.cpp ../../SSUSession.cpp ../../Streaming.cpp ../../TransitTunnel.cpp \ ../../Transports.cpp ../../Tunnel.cpp ../../TunnelEndpoint.cpp ../../TunnelGateway.cpp \ ../../TunnelPool.cpp ../../UPnP.cpp ../../Gzip.cpp ../../Timestamp.cpp ../../util.cpp \ - ../../Event.cpp ../../BloomFiler.cpp ../../i2pd.cpp + ../../Event.cpp ../../BloomFilter.cpp ../../WebSocks.cpp ../../i2pd.cpp HEADERS += DaemonQT.h mainwindow.h \ ../../HTTPServer.h ../../I2PControl.h ../../UPnP.h ../../Daemon.h ../../Config.h \ @@ -51,7 +51,7 @@ HEADERS += DaemonQT.h mainwindow.h \ ../../TransportSession.h ../../Tunnel.h ../../TunnelBase.h ../../TunnelConfig.h \ ../../TunnelEndpoint.h ../../TunnelGateway.h ../../TunnelPool.h ../../UPnP.h \ ../../util.h ../../version.h ../../Gzip.h ../../Tag.h \ - ../../BloomFiler.h ../../Event.h + ../../BloomFilter.h ../../Event.h ../../WebSocks.h FORMS += mainwindow.ui