diff --git a/.github/workflows/build-windows.yml b/.github/workflows/build-windows.yml index d587ba05..ed7147ff 100644 --- a/.github/workflows/build-windows.yml +++ b/.github/workflows/build-windows.yml @@ -231,7 +231,7 @@ jobs: cd MINGW-packages/mingw-w64-boost MINGW_ARCH=mingw32 makepkg-mingw -sCLf --noconfirm --nocheck - name: Install boost package - run: pacman --noconfirm -U MINGW-packages/mingw-w64-boost/mingw-w64-i686-*-any.pkg.tar.zst + run: pacman --noconfirm -U --overwrite MINGW-packages/mingw-w64-boost/mingw-w64-i686-*-any.pkg.tar.zst # Building i2pd - name: Build application diff --git a/Makefile.linux b/Makefile.linux index aa67626a..9c3e6895 100644 --- a/Makefile.linux +++ b/Makefile.linux @@ -13,12 +13,9 @@ LDFLAGS ?= ${LD_DEBUG} CXXVER := $(shell $(CXX) -dumpversion) ifeq ($(shell expr match $(CXX) 'clang'),5) NEEDED_CXXFLAGS += -std=c++17 -else ifeq ($(shell expr match ${CXXVER} "7"),1) # gcc 7 - NEEDED_CXXFLAGS += -std=c++17 - LDLIBS = -lboost_filesystem else ifeq ($(shell expr match ${CXXVER} "[8-9]"),1) # gcc 8 - 9 NEEDED_CXXFLAGS += -std=c++17 - LDLIBS = -lstdc++fs + LDLIBS = -lboost_system -lstdc++fs else ifeq ($(shell expr match ${CXXVER} "1[0-2]"),2) # gcc 10 - 12 NEEDED_CXXFLAGS += -std=c++17 else ifeq ($(shell expr match ${CXXVER} "1[3-9]"),2) # gcc 13+ @@ -34,7 +31,6 @@ ifeq ($(USE_STATIC),yes) # Using 'getaddrinfo' in statically linked applications requires at runtime # the shared libraries from the glibc version used for linking LIBDIR := /usr/lib/$(SYS) - LDLIBS += $(LIBDIR)/libboost_system.a LDLIBS += $(LIBDIR)/libboost_program_options.a LDLIBS += $(LIBDIR)/libssl.a LDLIBS += $(LIBDIR)/libcrypto.a @@ -44,7 +40,7 @@ ifeq ($(USE_UPNP),yes) endif LDLIBS += -lpthread -ldl else - LDLIBS += -lcrypto -lssl -lz -lboost_system -lboost_program_options -lpthread -latomic + LDLIBS += -lcrypto -lssl -lz -lboost_program_options -lpthread -latomic ifeq ($(USE_UPNP),yes) LDLIBS += -lminiupnpc endif diff --git a/daemon/HTTPServer.cpp b/daemon/HTTPServer.cpp index 21c7b6c6..e32bd459 100644 --- a/daemon/HTTPServer.cpp +++ b/daemon/HTTPServer.cpp @@ -701,6 +701,7 @@ namespace http { { s << "" << tr("Tunnels") << ":
\r\n"; s << "" << tr("Queue size") << ": " << i2p::tunnel::tunnels.GetQueueSize () << "
\r\n
\r\n"; + s << "" << tr("TBM Queue size") << ": " << i2p::tunnel::tunnels.GetTBMQueueSize () << "
\r\n
\r\n"; auto ExplPool = i2p::tunnel::tunnels.GetExploratoryPool (); @@ -1485,8 +1486,8 @@ namespace http { } HTTPServer::HTTPServer (const std::string& address, int port): - m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), - m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint (boost::asio::ip::address::from_string(address), port)), + m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service.get_executor ()), + m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint (boost::asio::ip::make_address(address), port)), m_Hostname(address) { } diff --git a/daemon/HTTPServer.h b/daemon/HTTPServer.h index f751c5a8..38b790d4 100644 --- a/daemon/HTTPServer.h +++ b/daemon/HTTPServer.h @@ -83,8 +83,8 @@ namespace http bool m_IsRunning; std::unique_ptr m_Thread; - boost::asio::io_service m_Service; - boost::asio::io_service::work m_Work; + boost::asio::io_context m_Service; + boost::asio::executor_work_guard m_Work; boost::asio::ip::tcp::acceptor m_Acceptor; std::string m_Hostname; }; diff --git a/daemon/I2PControl.cpp b/daemon/I2PControl.cpp index 43c74199..1ea75bbb 100644 --- a/daemon/I2PControl.cpp +++ b/daemon/I2PControl.cpp @@ -31,7 +31,7 @@ namespace client { I2PControlService::I2PControlService (const std::string& address, int port): m_IsRunning (false), m_Thread (nullptr), - m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(address), port)), + m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(address), port)), m_SSLContext (boost::asio::ssl::context::sslv23), m_ShutdownTimer (m_Service) { diff --git a/daemon/I2PControl.h b/daemon/I2PControl.h index af152631..ff32c131 100644 --- a/daemon/I2PControl.h +++ b/daemon/I2PControl.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2022, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -88,7 +88,7 @@ namespace client bool m_IsRunning; std::thread * m_Thread; - boost::asio::io_service m_Service; + boost::asio::io_context m_Service; boost::asio::ip::tcp::acceptor m_Acceptor; boost::asio::ssl::context m_SSLContext; boost::asio::deadline_timer m_ShutdownTimer; diff --git a/daemon/UPnP.h b/daemon/UPnP.h index d865df40..2a5fe9f3 100644 --- a/daemon/UPnP.h +++ b/daemon/UPnP.h @@ -67,7 +67,7 @@ namespace transport std::unique_ptr m_Thread; std::condition_variable m_Started; std::mutex m_StartedMutex; - boost::asio::io_service m_Service; + boost::asio::io_context m_Service; boost::asio::deadline_timer m_Timer; bool m_upnpUrlsInitialized = false; struct UPNPUrls m_upnpUrls; diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index 08acc0ed..2d1aa2c8 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -23,7 +23,7 @@ namespace i2p { namespace client { - LeaseSetDestination::LeaseSetDestination (boost::asio::io_service& service, + LeaseSetDestination::LeaseSetDestination (boost::asio::io_context& service, bool isPublic, const std::map * params): m_Service (service), m_IsPublic (isPublic), m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), @@ -294,7 +294,7 @@ namespace client if (m_IsPublic) { auto s = shared_from_this (); - m_Service.post ([s](void) + boost::asio::post (m_Service, [s](void) { s->m_PublishVerificationTimer.cancel (); s->Publish (); @@ -322,7 +322,7 @@ namespace client memcpy (data.k, key, 32); memcpy (data.t, tag, 32); auto s = shared_from_this (); - m_Service.post ([s,data](void) + boost::asio::post (m_Service, [s,data](void) { s->AddSessionKey (data.k, data.t); }); @@ -339,7 +339,7 @@ namespace client memcpy (data.k, key, 32); data.t = tag; auto s = shared_from_this (); - m_Service.post ([s,data](void) + boost::asio::post (m_Service, [s,data](void) { s->AddECIESx25519Key (data.k, data.t); }); @@ -355,7 +355,7 @@ namespace client m_IncomingMsgsQueue.push_back (msg); } if (empty) - m_Service.post([s = shared_from_this ()]() + boost::asio::post (m_Service, [s = shared_from_this ()]() { std::list > receivedMsgs; { @@ -370,7 +370,7 @@ namespace client void LeaseSetDestination::ProcessDeliveryStatusMessage (std::shared_ptr msg) { uint32_t msgID = bufbe32toh (msg->GetPayload () + DELIVERY_STATUS_MSGID_OFFSET); - m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msgID)); + boost::asio::post (m_Service, std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msgID)); } void LeaseSetDestination::HandleI2NPMessage (const uint8_t * buf, size_t len) @@ -608,7 +608,7 @@ namespace client void LeaseSetDestination::SetLeaseSetUpdated (bool post) { if (post) - m_Service.post([s = shared_from_this ()]() { s->UpdateLeaseSet (); }); + boost::asio::post (m_Service, [s = shared_from_this ()]() { s->UpdateLeaseSet (); }); else UpdateLeaseSet (); } @@ -690,7 +690,7 @@ namespace client auto s = shared_from_this (); msg->onDrop = [s]() { - s->GetService ().post([s]() + boost::asio::post (s->GetService (), [s]() { s->m_PublishConfirmationTimer.cancel (); s->HandlePublishConfirmationTimer (boost::system::error_code()); @@ -775,10 +775,10 @@ namespace client if (!m_Pool || !IsReady ()) { if (requestComplete) - m_Service.post ([requestComplete](void){requestComplete (nullptr);}); + boost::asio::post (m_Service, [requestComplete](void){requestComplete (nullptr);}); return false; } - m_Service.post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), dest, requestComplete, nullptr)); + boost::asio::post (m_Service, std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), dest, requestComplete, nullptr)); return true; } @@ -787,7 +787,7 @@ namespace client if (!dest || !m_Pool || !IsReady ()) { if (requestComplete) - m_Service.post ([requestComplete](void){requestComplete (nullptr);}); + boost::asio::post (m_Service, [requestComplete](void){requestComplete (nullptr);}); return false; } auto storeHash = dest->GetStoreHash (); @@ -795,17 +795,17 @@ namespace client if (leaseSet) { if (requestComplete) - m_Service.post ([requestComplete, leaseSet](void){requestComplete (leaseSet);}); + boost::asio::post (m_Service, [requestComplete, leaseSet](void){requestComplete (leaseSet);}); return true; } - m_Service.post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), storeHash, requestComplete, dest)); + boost::asio::post (m_Service, std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), storeHash, requestComplete, dest)); return true; } void LeaseSetDestination::CancelDestinationRequest (const i2p::data::IdentHash& dest, bool notify) { auto s = shared_from_this (); - m_Service.post ([dest, notify, s](void) + boost::asio::post (m_Service, [dest, notify, s](void) { auto it = s->m_LeaseSetRequests.find (dest); if (it != s->m_LeaseSetRequests.end ()) @@ -903,7 +903,7 @@ namespace client auto s = shared_from_this (); msg->onDrop = [s, dest, request]() { - s->GetService ().post([s, dest, request]() + boost::asio::post (s->GetService (), [s, dest, request]() { s->SendNextLeaseSetRequest (dest, request); }); @@ -1000,7 +1000,7 @@ namespace client return i2p::data::CRYPTO_KEY_TYPE_ELGAMAL; } - ClientDestination::ClientDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, + ClientDestination::ClientDestination (boost::asio::io_context& service, const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params): LeaseSetDestination (service, isPublic, params), m_Keys (keys), m_StreamingAckDelay (DEFAULT_INITIAL_ACK_DELAY), @@ -1215,7 +1215,7 @@ namespace client if (leaseSet) { auto stream = CreateStream (leaseSet, port); - GetService ().post ([streamRequestComplete, stream]() + boost::asio::post (GetService (), [streamRequestComplete, stream]() { streamRequestComplete(stream); }); diff --git a/libi2pd/Destination.h b/libi2pd/Destination.h index 8d49ef82..7516c90f 100644 --- a/libi2pd/Destination.h +++ b/libi2pd/Destination.h @@ -107,7 +107,7 @@ namespace client // leaseSet = nullptr means not found struct LeaseSetRequest { - LeaseSetRequest (boost::asio::io_service& service): requestTime (0), requestTimeoutTimer (service) {}; + LeaseSetRequest (boost::asio::io_context& service): requestTime (0), requestTimeoutTimer (service) {}; std::unordered_set excluded; uint64_t requestTime; boost::asio::deadline_timer requestTimeoutTimer; @@ -125,10 +125,10 @@ namespace client public: - LeaseSetDestination (boost::asio::io_service& service, bool isPublic, const std::map * params = nullptr); + LeaseSetDestination (boost::asio::io_context& service, bool isPublic, const std::map * params = nullptr); ~LeaseSetDestination (); const std::string& GetNickname () const { return m_Nickname; }; - boost::asio::io_service& GetService () { return m_Service; }; + auto& GetService () { return m_Service; }; virtual void Start (); virtual void Stop (); @@ -195,7 +195,7 @@ namespace client private: - boost::asio::io_service& m_Service; + boost::asio::io_context& m_Service; mutable std::mutex m_RemoteLeaseSetsMutex; std::unordered_map > m_RemoteLeaseSets; std::unordered_map > m_LeaseSetRequests; @@ -241,7 +241,7 @@ namespace client public: - ClientDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, + ClientDestination (boost::asio::io_context& service, const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params = nullptr); ~ClientDestination (); diff --git a/libi2pd/NTCP2.cpp b/libi2pd/NTCP2.cpp index 33c33596..cbfbc84e 100644 --- a/libi2pd/NTCP2.cpp +++ b/libi2pd/NTCP2.cpp @@ -42,28 +42,29 @@ namespace transport delete[] m_SessionConfirmedBuffer; } - void NTCP2Establisher::KeyDerivationFunction1 (const uint8_t * pub, i2p::crypto::X25519Keys& priv, const uint8_t * rs, const uint8_t * epub) + bool NTCP2Establisher::KeyDerivationFunction1 (const uint8_t * pub, i2p::crypto::X25519Keys& priv, const uint8_t * rs, const uint8_t * epub) { i2p::crypto::InitNoiseXKState (*this, rs); // h = SHA256(h || epub) MixHash (epub, 32); // x25519 between pub and priv uint8_t inputKeyMaterial[32]; - priv.Agree (pub, inputKeyMaterial); + if (!priv.Agree (pub, inputKeyMaterial)) return false; MixKey (inputKeyMaterial); + return true; } - void NTCP2Establisher::KDF1Alice () + bool NTCP2Establisher::KDF1Alice () { - KeyDerivationFunction1 (m_RemoteStaticKey, *m_EphemeralKeys, m_RemoteStaticKey, GetPub ()); + return KeyDerivationFunction1 (m_RemoteStaticKey, *m_EphemeralKeys, m_RemoteStaticKey, GetPub ()); } - void NTCP2Establisher::KDF1Bob () + bool NTCP2Establisher::KDF1Bob () { - KeyDerivationFunction1 (GetRemotePub (), i2p::context.GetNTCP2StaticKeys (), i2p::context.GetNTCP2StaticPublicKey (), GetRemotePub ()); + return KeyDerivationFunction1 (GetRemotePub (), i2p::context.GetNTCP2StaticKeys (), i2p::context.GetNTCP2StaticPublicKey (), GetRemotePub ()); } - void NTCP2Establisher::KeyDerivationFunction2 (const uint8_t * sessionRequest, size_t sessionRequestLen, const uint8_t * epub) + bool NTCP2Establisher::KeyDerivationFunction2 (const uint8_t * sessionRequest, size_t sessionRequestLen, const uint8_t * epub) { MixHash (sessionRequest + 32, 32); // encrypted payload @@ -74,33 +75,35 @@ namespace transport // x25519 between remote pub and ephemaral priv uint8_t inputKeyMaterial[32]; - m_EphemeralKeys->Agree (GetRemotePub (), inputKeyMaterial); - + if (!m_EphemeralKeys->Agree (GetRemotePub (), inputKeyMaterial)) return false; MixKey (inputKeyMaterial); + return true; } - void NTCP2Establisher::KDF2Alice () + bool NTCP2Establisher::KDF2Alice () { - KeyDerivationFunction2 (m_SessionRequestBuffer, m_SessionRequestBufferLen, GetRemotePub ()); + return KeyDerivationFunction2 (m_SessionRequestBuffer, m_SessionRequestBufferLen, GetRemotePub ()); } - void NTCP2Establisher::KDF2Bob () + bool NTCP2Establisher::KDF2Bob () { - KeyDerivationFunction2 (m_SessionRequestBuffer, m_SessionRequestBufferLen, GetPub ()); + return KeyDerivationFunction2 (m_SessionRequestBuffer, m_SessionRequestBufferLen, GetPub ()); } - void NTCP2Establisher::KDF3Alice () + bool NTCP2Establisher::KDF3Alice () { uint8_t inputKeyMaterial[32]; - i2p::context.GetNTCP2StaticKeys ().Agree (GetRemotePub (), inputKeyMaterial); + if (!i2p::context.GetNTCP2StaticKeys ().Agree (GetRemotePub (), inputKeyMaterial)) return false; MixKey (inputKeyMaterial); + return true; } - void NTCP2Establisher::KDF3Bob () + bool NTCP2Establisher::KDF3Bob () { uint8_t inputKeyMaterial[32]; - m_EphemeralKeys->Agree (m_RemoteStaticKey, inputKeyMaterial); + if (!m_EphemeralKeys->Agree (m_RemoteStaticKey, inputKeyMaterial)) return false; MixKey (inputKeyMaterial); + return true; } void NTCP2Establisher::CreateEphemeralKey () @@ -108,7 +111,7 @@ namespace transport m_EphemeralKeys = i2p::transport::transports.GetNextX25519KeysPair (); } - void NTCP2Establisher::CreateSessionRequestMessage (std::mt19937& rng) + bool NTCP2Establisher::CreateSessionRequestMessage (std::mt19937& rng) { // create buffer and fill padding auto paddingLength = rng () % (NTCP2_SESSION_REQUEST_MAX_SIZE - 64); // message length doesn't exceed 287 bytes @@ -121,7 +124,7 @@ namespace transport encryption.Encrypt (GetPub (), 32, m_SessionRequestBuffer); // X encryption.GetIV (m_IV); // save IV for SessionCreated // encryption key for next block - KDF1Alice (); + if (!KDF1Alice ()) return false; // fill options uint8_t options[32]; // actual options size is 16 bytes memset (options, 0, 16); @@ -147,9 +150,10 @@ namespace transport uint8_t nonce[12]; memset (nonce, 0, 12); // set nonce to zero i2p::crypto::AEADChaCha20Poly1305 (options, 16, GetH (), 32, GetK (), nonce, m_SessionRequestBuffer + 32, 32, true); // encrypt + return true; } - void NTCP2Establisher::CreateSessionCreatedMessage (std::mt19937& rng) + bool NTCP2Establisher::CreateSessionCreatedMessage (std::mt19937& rng) { auto paddingLen = rng () % (NTCP2_SESSION_CREATED_MAX_SIZE - 64); m_SessionCreatedBufferLen = paddingLen + 64; @@ -160,7 +164,7 @@ namespace transport encryption.SetIV (m_IV); encryption.Encrypt (GetPub (), 32, m_SessionCreatedBuffer); // Y // encryption key for next block (m_K) - KDF2Bob (); + if (!KDF2Bob ()) return false; uint8_t options[16]; memset (options, 0, 16); htobe16buf (options + 2, paddingLen); // padLen @@ -169,7 +173,7 @@ namespace transport uint8_t nonce[12]; memset (nonce, 0, 12); // set nonce to zero i2p::crypto::AEADChaCha20Poly1305 (options, 16, GetH (), 32, GetK (), nonce, m_SessionCreatedBuffer + 32, 32, true); // encrypt - + return true; } void NTCP2Establisher::CreateSessionConfirmedMessagePart1 (const uint8_t * nonce) @@ -184,17 +188,18 @@ namespace transport i2p::crypto::AEADChaCha20Poly1305 (i2p::context.GetNTCP2StaticPublicKey (), 32, GetH (), 32, GetK (), nonce, m_SessionConfirmedBuffer, 48, true); // encrypt } - void NTCP2Establisher::CreateSessionConfirmedMessagePart2 (const uint8_t * nonce) + bool NTCP2Establisher::CreateSessionConfirmedMessagePart2 (const uint8_t * nonce) { // part 2 // update AD again MixHash (m_SessionConfirmedBuffer, 48); // encrypt m3p2, it must be filled in SessionRequest - KDF3Alice (); + if (!KDF3Alice ()) return false; uint8_t * m3p2 = m_SessionConfirmedBuffer + 48; i2p::crypto::AEADChaCha20Poly1305 (m3p2, m3p2Len - 16, GetH (), 32, GetK (), nonce, m3p2, m3p2Len, true); // encrypt // update h again MixHash (m3p2, m3p2Len); //h = SHA256(h || ciphertext) + return true; } bool NTCP2Establisher::ProcessSessionRequestMessage (uint16_t& paddingLen, bool& clockSkew) @@ -207,7 +212,11 @@ namespace transport decryption.Decrypt (m_SessionRequestBuffer, 32, GetRemotePub ()); decryption.GetIV (m_IV); // save IV for SessionCreated // decryption key for next block - KDF1Bob (); + if (!KDF1Bob ()) + { + LogPrint (eLogWarning, "NTCP2: SessionRequest KDF failed"); + return false; + } // verify MAC and decrypt options block (32 bytes), use m_H as AD uint8_t nonce[12], options[16]; memset (nonce, 0, 12); // set nonce to zero @@ -262,7 +271,11 @@ namespace transport decryption.SetIV (m_IV); decryption.Decrypt (m_SessionCreatedBuffer, 32, GetRemotePub ()); // decryption key for next block (m_K) - KDF2Alice (); + if (!KDF2Alice ()) + { + LogPrint (eLogWarning, "NTCP2: SessionCreated KDF failed"); + return false; + } // decrypt and verify MAC uint8_t payload[16]; uint8_t nonce[12]; @@ -309,7 +322,11 @@ namespace transport // update AD again MixHash (m_SessionConfirmedBuffer, 48); - KDF3Bob (); + if (!KDF3Bob ()) + { + LogPrint (eLogWarning, "NTCP2: SessionConfirmed Part2 KDF failed"); + return false; + } if (i2p::crypto::AEADChaCha20Poly1305 (m_SessionConfirmedBuffer + 48, m3p2Len - 16, GetH (), 32, GetK (), nonce, m3p2Buf, m3p2Len - 16, false)) // decrypt // calculate new h again for KDF data MixHash (m_SessionConfirmedBuffer + 48, m3p2Len); // h = SHA256(h || ciphertext) @@ -327,6 +344,7 @@ namespace transport m_Server (server), m_Socket (m_Server.GetService ()), m_IsEstablished (false), m_IsTerminated (false), m_Establisher (new NTCP2Establisher), + m_SendKey (nullptr), m_ReceiveKey (nullptr), #if OPENSSL_SIPHASH m_SendMDCtx(nullptr), m_ReceiveMDCtx (nullptr), #else @@ -406,7 +424,7 @@ namespace transport void NTCP2Session::Done () { - m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); } void NTCP2Session::Established () @@ -466,7 +484,12 @@ namespace transport void NTCP2Session::SendSessionRequest () { - m_Establisher->CreateSessionRequestMessage (m_Server.GetRng ()); + if (!m_Establisher->CreateSessionRequestMessage (m_Server.GetRng ())) + { + LogPrint (eLogWarning, "NTCP2: Send SessionRequest KDF failed"); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + return; + } // send message m_HandshakeInterval = i2p::util::GetMillisecondsSinceEpoch (); boost::asio::async_write (m_Socket, boost::asio::buffer (m_Establisher->m_SessionRequestBuffer, m_Establisher->m_SessionRequestBufferLen), boost::asio::transfer_all (), @@ -491,7 +514,6 @@ namespace transport void NTCP2Session::HandleSessionRequestReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) { - (void) bytes_transferred; if (ecode) { LogPrint (eLogWarning, "NTCP2: SessionRequest read error: ", ecode.message ()); @@ -499,38 +521,47 @@ namespace transport } else { - LogPrint (eLogDebug, "NTCP2: SessionRequest received ", bytes_transferred); - uint16_t paddingLen = 0; - bool clockSkew = false; - if (m_Establisher->ProcessSessionRequestMessage (paddingLen, clockSkew)) - { - if (clockSkew) + boost::asio::post (m_Server.GetEstablisherService (), + [s = shared_from_this (), bytes_transferred] () { - // we don't care about padding, send SessionCreated and close session - SendSessionCreated (); - m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); - } - else if (paddingLen > 0) - { - if (paddingLen <= NTCP2_SESSION_REQUEST_MAX_SIZE - 64) // session request is 287 bytes max - { - boost::asio::async_read (m_Socket, boost::asio::buffer(m_Establisher->m_SessionRequestBuffer + 64, paddingLen), boost::asio::transfer_all (), - std::bind(&NTCP2Session::HandleSessionRequestPaddingReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - } - else - { - LogPrint (eLogWarning, "NTCP2: SessionRequest padding length ", (int)paddingLen, " is too long"); - Terminate (); - } - } - else - SendSessionCreated (); - } - else - Terminate (); + s->ProcessSessionRequest (bytes_transferred);; + }); } } + void NTCP2Session::ProcessSessionRequest (size_t len) + { + LogPrint (eLogDebug, "NTCP2: SessionRequest received ", len); + uint16_t paddingLen = 0; + bool clockSkew = false; + if (m_Establisher->ProcessSessionRequestMessage (paddingLen, clockSkew)) + { + if (clockSkew) + { + // we don't care about padding, send SessionCreated and close session + SendSessionCreated (); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + } + else if (paddingLen > 0) + { + if (paddingLen <= NTCP2_SESSION_REQUEST_MAX_SIZE - 64) // session request is 287 bytes max + { + boost::asio::async_read (m_Socket, boost::asio::buffer(m_Establisher->m_SessionRequestBuffer + 64, paddingLen), boost::asio::transfer_all (), + std::bind(&NTCP2Session::HandleSessionRequestPaddingReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); + } + else + { + LogPrint (eLogWarning, "NTCP2: SessionRequest padding length ", (int)paddingLen, " is too long"); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + } + } + else + SendSessionCreated (); + } + else + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + } + void NTCP2Session::HandleSessionRequestPaddingReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) @@ -539,12 +570,23 @@ namespace transport Terminate (); } else - SendSessionCreated (); + { + boost::asio::post (m_Server.GetEstablisherService (), + [s = shared_from_this ()] () + { + s->SendSessionCreated (); + }); + } } void NTCP2Session::SendSessionCreated () { - m_Establisher->CreateSessionCreatedMessage (m_Server.GetRng ()); + if (!m_Establisher->CreateSessionCreatedMessage (m_Server.GetRng ())) + { + LogPrint (eLogWarning, "NTCP2: Send SessionCreated KDF failed"); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + return; + } // send message m_HandshakeInterval = i2p::util::GetMillisecondsSinceEpoch (); boost::asio::async_write (m_Socket, boost::asio::buffer (m_Establisher->m_SessionCreatedBuffer, m_Establisher->m_SessionCreatedBufferLen), boost::asio::transfer_all (), @@ -561,35 +603,44 @@ namespace transport else { m_HandshakeInterval = i2p::util::GetMillisecondsSinceEpoch () - m_HandshakeInterval; - LogPrint (eLogDebug, "NTCP2: SessionCreated received ", bytes_transferred); - uint16_t paddingLen = 0; - if (m_Establisher->ProcessSessionCreatedMessage (paddingLen)) - { - if (paddingLen > 0) + boost::asio::post (m_Server.GetEstablisherService (), + [s = shared_from_this (), bytes_transferred] () { - if (paddingLen <= NTCP2_SESSION_CREATED_MAX_SIZE - 64) // session created is 287 bytes max - { - boost::asio::async_read (m_Socket, boost::asio::buffer(m_Establisher->m_SessionCreatedBuffer + 64, paddingLen), boost::asio::transfer_all (), - std::bind(&NTCP2Session::HandleSessionCreatedPaddingReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - } - else - { - LogPrint (eLogWarning, "NTCP2: SessionCreated padding length ", (int)paddingLen, " is too long"); - Terminate (); - } - } - else - SendSessionConfirmed (); - } - else - { - if (GetRemoteIdentity ()) - i2p::data::netdb.SetUnreachable (GetRemoteIdentity ()->GetIdentHash (), true); // assume wrong s key - Terminate (); - } + s->ProcessSessionCreated (bytes_transferred); + }); } } + void NTCP2Session::ProcessSessionCreated (size_t len) + { + LogPrint (eLogDebug, "NTCP2: SessionCreated received ", len); + uint16_t paddingLen = 0; + if (m_Establisher->ProcessSessionCreatedMessage (paddingLen)) + { + if (paddingLen > 0) + { + if (paddingLen <= NTCP2_SESSION_CREATED_MAX_SIZE - 64) // session created is 287 bytes max + { + boost::asio::async_read (m_Socket, boost::asio::buffer(m_Establisher->m_SessionCreatedBuffer + 64, paddingLen), boost::asio::transfer_all (), + std::bind(&NTCP2Session::HandleSessionCreatedPaddingReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); + } + else + { + LogPrint (eLogWarning, "NTCP2: SessionCreated padding length ", (int)paddingLen, " is too long"); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + } + } + else + SendSessionConfirmed (); + } + else + { + if (GetRemoteIdentity ()) + i2p::data::netdb.SetUnreachable (GetRemoteIdentity ()->GetIdentHash (), true); // assume wrong s key + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + } + } + void NTCP2Session::HandleSessionCreatedPaddingReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) @@ -600,7 +651,11 @@ namespace transport else { m_Establisher->m_SessionCreatedBufferLen += bytes_transferred; - SendSessionConfirmed (); + boost::asio::post (m_Server.GetEstablisherService (), + [s = shared_from_this ()] () + { + s->SendSessionConfirmed (); + }); } } @@ -610,7 +665,12 @@ namespace transport CreateNonce (1, nonce); // set nonce to 1 m_Establisher->CreateSessionConfirmedMessagePart1 (nonce); memset (nonce, 0, 12); // set nonce back to 0 - m_Establisher->CreateSessionConfirmedMessagePart2 (nonce); + if (!m_Establisher->CreateSessionConfirmedMessagePart2 (nonce)) + { + LogPrint (eLogWarning, "NTCP2: Send SessionConfirmed Part2 KDF failed"); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + return; + } // send message boost::asio::async_write (m_Socket, boost::asio::buffer (m_Establisher->m_SessionConfirmedBuffer, m_Establisher->m3p2Len + 48), boost::asio::transfer_all (), std::bind(&NTCP2Session::HandleSessionConfirmedSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); @@ -662,6 +722,7 @@ namespace transport void NTCP2Session::HandleSessionConfirmedReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) { + (void) bytes_transferred; if (ecode) { LogPrint (eLogWarning, "NTCP2: SessionConfirmed read error: ", ecode.message ()); @@ -670,122 +731,143 @@ namespace transport else { m_HandshakeInterval = i2p::util::GetMillisecondsSinceEpoch () - m_HandshakeInterval; - LogPrint (eLogDebug, "NTCP2: SessionConfirmed received"); - // part 1 - uint8_t nonce[12]; - CreateNonce (1, nonce); - if (m_Establisher->ProcessSessionConfirmedMessagePart1 (nonce)) - { - // part 2 - std::vector buf(m_Establisher->m3p2Len - 16); // -MAC - memset (nonce, 0, 12); // set nonce to 0 again - if (m_Establisher->ProcessSessionConfirmedMessagePart2 (nonce, buf.data ())) + boost::asio::post (m_Server.GetEstablisherService (), + [s = shared_from_this ()] () { - KeyDerivationFunctionDataPhase (); - // Bob data phase keys - m_SendKey = m_Kba; - m_ReceiveKey = m_Kab; - SetSipKeys (m_Sipkeysba, m_Sipkeysab); - memcpy (m_ReceiveIV.buf, m_Sipkeysab + 16, 8); - memcpy (m_SendIV.buf, m_Sipkeysba + 16, 8); - // payload - // process RI - if (buf[0] != eNTCP2BlkRouterInfo) - { - LogPrint (eLogWarning, "NTCP2: Unexpected block ", (int)buf[0], " in SessionConfirmed"); - Terminate (); - return; - } - auto size = bufbe16toh (buf.data () + 1); - if (size > buf.size () - 3 || size > i2p::data::MAX_RI_BUFFER_SIZE + 1) - { - LogPrint (eLogError, "NTCP2: Unexpected RouterInfo size ", size, " in SessionConfirmed"); - Terminate (); - return; - } - // TODO: check flag - i2p::data::RouterInfo ri (buf.data () + 4, size - 1); // 1 byte block type + 2 bytes size + 1 byte flag - if (ri.IsUnreachable ()) - { - LogPrint (eLogError, "NTCP2: RouterInfo verification failed in SessionConfirmed from ", GetRemoteEndpoint ()); - SendTerminationAndTerminate (eNTCP2RouterInfoSignatureVerificationFail); - return; - } - LogPrint(eLogDebug, "NTCP2: SessionConfirmed from ", GetRemoteEndpoint (), - " (", i2p::data::GetIdentHashAbbreviation (ri.GetIdentHash ()), ")"); - auto ts = i2p::util::GetMillisecondsSinceEpoch (); - if (ts > ri.GetTimestamp () + i2p::data::NETDB_MIN_EXPIRATION_TIMEOUT*1000LL) // 90 minutes - { - LogPrint (eLogError, "NTCP2: RouterInfo is too old in SessionConfirmed for ", (ts - ri.GetTimestamp ())/1000LL, " seconds"); - SendTerminationAndTerminate (eNTCP2Message3Error); - return; - } - if (ts + i2p::data::NETDB_EXPIRATION_TIMEOUT_THRESHOLD*1000LL < ri.GetTimestamp ()) // 2 minutes - { - LogPrint (eLogError, "NTCP2: RouterInfo is from future for ", (ri.GetTimestamp () - ts)/1000LL, " seconds"); - SendTerminationAndTerminate (eNTCP2Message3Error); - return; - } - // update RouterInfo in netdb - auto ri1 = i2p::data::netdb.AddRouterInfo (ri.GetBuffer (), ri.GetBufferLen ()); // ri1 points to one from netdb now - if (!ri1) - { - LogPrint (eLogError, "NTCP2: Couldn't update RouterInfo from SessionConfirmed in netdb"); - Terminate (); - return; - } - std::shared_ptr profile; // not null if older - if (ri.GetTimestamp () + i2p::data::NETDB_EXPIRATION_TIMEOUT_THRESHOLD*1000LL < ri1->GetTimestamp ()) - { - // received RouterInfo is older than one in netdb - profile = i2p::data::GetRouterProfile (ri1->GetIdentHash ()); // retrieve profile - if (profile && profile->IsDuplicated ()) - { - SendTerminationAndTerminate (eNTCP2Banned); - return; - } - } - - auto addr = m_RemoteEndpoint.address ().is_v4 () ? ri1->GetNTCP2V4Address () : - (i2p::util::net::IsYggdrasilAddress (m_RemoteEndpoint.address ()) ? ri1->GetYggdrasilAddress () : ri1->GetNTCP2V6Address ()); - if (!addr || memcmp (m_Establisher->m_RemoteStaticKey, addr->s, 32)) - { - LogPrint (eLogError, "NTCP2: Wrong static key in SessionConfirmed"); - Terminate (); - return; - } - if (addr->IsPublishedNTCP2 () && m_RemoteEndpoint.address () != addr->host && - (!m_RemoteEndpoint.address ().is_v6 () || (i2p::util::net::IsYggdrasilAddress (m_RemoteEndpoint.address ()) ? - memcmp (m_RemoteEndpoint.address ().to_v6 ().to_bytes ().data () + 1, addr->host.to_v6 ().to_bytes ().data () + 1, 7) : // from the same yggdrasil subnet - memcmp (m_RemoteEndpoint.address ().to_v6 ().to_bytes ().data (), addr->host.to_v6 ().to_bytes ().data (), 8)))) // temporary address - { - if (profile) // older router? - profile->Duplicated (); // mark router as duplicated in profile - else - LogPrint (eLogInfo, "NTCP2: Host mismatch between published address ", addr->host, " and actual endpoint ", m_RemoteEndpoint.address ()); - SendTerminationAndTerminate (eNTCP2Banned); - return; - } - // TODO: process options - - // ready to communicate - SetRemoteIdentity (ri1->GetRouterIdentity ()); - if (m_Server.AddNTCP2Session (shared_from_this (), true)) - { - Established (); - ReceiveLength (); - } - else - Terminate (); - } - else - Terminate (); - } - else - Terminate (); + s->ProcessSessionConfirmed ();; + }); } } + void NTCP2Session::ProcessSessionConfirmed () + { + // run on establisher thread + LogPrint (eLogDebug, "NTCP2: SessionConfirmed received"); + // part 1 + uint8_t nonce[12]; + CreateNonce (1, nonce); + if (m_Establisher->ProcessSessionConfirmedMessagePart1 (nonce)) + { + // part 2 + auto buf = std::make_shared > (m_Establisher->m3p2Len - 16); // -MAC + memset (nonce, 0, 12); // set nonce to 0 again + if (m_Establisher->ProcessSessionConfirmedMessagePart2 (nonce, buf->data ())) // TODO:handle in establisher thread + { + // payload + // RI block must be first + if ((*buf)[0] != eNTCP2BlkRouterInfo) + { + LogPrint (eLogWarning, "NTCP2: Unexpected block ", (int)(*buf)[0], " in SessionConfirmed"); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + return; + } + auto size = bufbe16toh (buf->data () + 1); + if (size > buf->size () - 3 || size > i2p::data::MAX_RI_BUFFER_SIZE + 1) + { + LogPrint (eLogError, "NTCP2: Unexpected RouterInfo size ", size, " in SessionConfirmed"); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + return; + } + boost::asio::post (m_Server.GetService (), + [s = shared_from_this (), buf, size] () + { + s->EstablishSessionAfterSessionConfirmed (buf, size); + }); + } + else + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + } + else + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); + } + + void NTCP2Session::EstablishSessionAfterSessionConfirmed (std::shared_ptr > buf, size_t size) + { + // run on main NTCP2 thread + KeyDerivationFunctionDataPhase (); + // Bob data phase keys + m_SendKey = m_Kba; + m_ReceiveKey = m_Kab; + SetSipKeys (m_Sipkeysba, m_Sipkeysab); + memcpy (m_ReceiveIV.buf, m_Sipkeysab + 16, 8); + memcpy (m_SendIV.buf, m_Sipkeysba + 16, 8); + // we need to set keys for SendTerminationAndTerminate + // TODO: check flag + i2p::data::RouterInfo ri (buf->data () + 4, size - 1); // 1 byte block type + 2 bytes size + 1 byte flag + if (ri.IsUnreachable ()) + { + LogPrint (eLogError, "NTCP2: RouterInfo verification failed in SessionConfirmed from ", GetRemoteEndpoint ()); + SendTerminationAndTerminate (eNTCP2RouterInfoSignatureVerificationFail); + return; + } + LogPrint(eLogDebug, "NTCP2: SessionConfirmed from ", GetRemoteEndpoint (), + " (", i2p::data::GetIdentHashAbbreviation (ri.GetIdentHash ()), ")"); + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + if (ts > ri.GetTimestamp () + i2p::data::NETDB_MIN_EXPIRATION_TIMEOUT*1000LL) // 90 minutes + { + LogPrint (eLogError, "NTCP2: RouterInfo is too old in SessionConfirmed for ", (ts - ri.GetTimestamp ())/1000LL, " seconds"); + SendTerminationAndTerminate (eNTCP2Message3Error); + return; + } + if (ts + i2p::data::NETDB_EXPIRATION_TIMEOUT_THRESHOLD*1000LL < ri.GetTimestamp ()) // 2 minutes + { + LogPrint (eLogError, "NTCP2: RouterInfo is from future for ", (ri.GetTimestamp () - ts)/1000LL, " seconds"); + SendTerminationAndTerminate (eNTCP2Message3Error); + return; + } + // update RouterInfo in netdb + auto ri1 = i2p::data::netdb.AddRouterInfo (ri.GetBuffer (), ri.GetBufferLen ()); // ri1 points to one from netdb now + if (!ri1) + { + LogPrint (eLogError, "NTCP2: Couldn't update RouterInfo from SessionConfirmed in netdb"); + Terminate (); + return; + } + std::shared_ptr profile; // not null if older + if (ri.GetTimestamp () + i2p::data::NETDB_EXPIRATION_TIMEOUT_THRESHOLD*1000LL < ri1->GetTimestamp ()) + { + // received RouterInfo is older than one in netdb + profile = i2p::data::GetRouterProfile (ri1->GetIdentHash ()); // retrieve profile + if (profile && profile->IsDuplicated ()) + { + SendTerminationAndTerminate (eNTCP2Banned); + return; + } + } + + auto addr = m_RemoteEndpoint.address ().is_v4 () ? ri1->GetNTCP2V4Address () : + (i2p::util::net::IsYggdrasilAddress (m_RemoteEndpoint.address ()) ? ri1->GetYggdrasilAddress () : ri1->GetNTCP2V6Address ()); + if (!addr || memcmp (m_Establisher->m_RemoteStaticKey, addr->s, 32)) + { + LogPrint (eLogError, "NTCP2: Wrong static key in SessionConfirmed"); + Terminate (); + return; + } + if (addr->IsPublishedNTCP2 () && m_RemoteEndpoint.address () != addr->host && + (!m_RemoteEndpoint.address ().is_v6 () || (i2p::util::net::IsYggdrasilAddress (m_RemoteEndpoint.address ()) ? + memcmp (m_RemoteEndpoint.address ().to_v6 ().to_bytes ().data () + 1, addr->host.to_v6 ().to_bytes ().data () + 1, 7) : // from the same yggdrasil subnet + memcmp (m_RemoteEndpoint.address ().to_v6 ().to_bytes ().data (), addr->host.to_v6 ().to_bytes ().data (), 8)))) // temporary address + { + if (profile) // older router? + profile->Duplicated (); // mark router as duplicated in profile + else + LogPrint (eLogInfo, "NTCP2: Host mismatch between published address ", addr->host, " and actual endpoint ", m_RemoteEndpoint.address ()); + SendTerminationAndTerminate (eNTCP2Banned); + return; + } + // TODO: process options block + + // ready to communicate + SetRemoteIdentity (ri1->GetRouterIdentity ()); + if (m_Server.AddNTCP2Session (shared_from_this (), true)) + { + Established (); + ReceiveLength (); + } + else + Terminate (); + } + void NTCP2Session::SetSipKeys (const uint8_t * sendSipKey, const uint8_t * receiveSipKey) { #if OPENSSL_SIPHASH @@ -811,7 +893,11 @@ namespace transport void NTCP2Session::ClientLogin () { m_Establisher->CreateEphemeralKey (); - SendSessionRequest (); + boost::asio::post (m_Server.GetEstablisherService (), + [s = shared_from_this ()] () + { + s->SendSessionRequest (); + }); } void NTCP2Session::ServerLogin () @@ -1296,7 +1382,7 @@ namespace transport void NTCP2Session::SendTerminationAndTerminate (NTCP2TerminationReason reason) { SendTermination (reason); - m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go } void NTCP2Session::SendI2NPMessages (std::list >& msgs) @@ -1313,7 +1399,7 @@ namespace transport m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs); } if (empty) - m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this ())); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this ())); } void NTCP2Session::PostI2NPMessages () @@ -1350,7 +1436,7 @@ namespace transport void NTCP2Session::SendLocalRouterInfo (bool update) { if (update || !IsOutgoing ()) // we send it in SessionConfirmed for outgoing session - m_Server.GetService ().post (std::bind (&NTCP2Session::SendRouterInfo, shared_from_this ())); + boost::asio::post (m_Server.GetService (), std::bind (&NTCP2Session::SendRouterInfo, shared_from_this ())); } NTCP2Server::NTCP2Server (): @@ -1367,6 +1453,7 @@ namespace transport void NTCP2Server::Start () { + m_EstablisherService.Start (); if (!IsRunning ()) { StartIOService (); @@ -1374,14 +1461,13 @@ namespace transport { LogPrint(eLogInfo, "NTCP2: Using proxy to connect to peers"); // TODO: resolve proxy until it is resolved - boost::asio::ip::tcp::resolver::query q(m_ProxyAddress, std::to_string(m_ProxyPort)); boost::system::error_code e; - auto itr = m_Resolver.resolve(q, e); + auto itr = m_Resolver.resolve(m_ProxyAddress, std::to_string(m_ProxyPort), e); if(e) LogPrint(eLogCritical, "NTCP2: Failed to resolve proxy ", e.message()); else { - m_ProxyEndpoint.reset (new boost::asio::ip::tcp::endpoint(*itr)); + m_ProxyEndpoint.reset (new boost::asio::ip::tcp::endpoint(*itr.begin ())); if (m_ProxyEndpoint) LogPrint(eLogDebug, "NTCP2: m_ProxyEndpoint ", *m_ProxyEndpoint); } @@ -1477,6 +1563,7 @@ namespace transport m_TerminationTimer.cancel (); m_ProxyEndpoint = nullptr; } + m_EstablisherService.Stop (); StopIOService (); } @@ -1541,7 +1628,7 @@ namespace transport } LogPrint (eLogDebug, "NTCP2: Connecting to ", conn->GetRemoteEndpoint (), " (", i2p::data::GetIdentHashAbbreviation (conn->GetRemoteIdentity ()->GetIdentHash ()), ")"); - GetService ().post([this, conn]() + boost::asio::post (GetService (), [this, conn]() { if (this->AddNTCP2Session (conn)) { @@ -1762,7 +1849,7 @@ namespace transport LogPrint (eLogError, "NTCP2: Can't connect to unspecified address"); return; } - GetService().post([this, conn]() + boost::asio::post (GetService(), [this, conn]() { if (this->AddNTCP2Session (conn)) { @@ -1860,7 +1947,7 @@ namespace transport { readbuff->commit(transferred); i2p::http::HTTPRes res; - if(res.parse(boost::asio::buffer_cast(readbuff->data()), readbuff->size()) > 0) + if(res.parse(std::string {boost::asio::buffers_begin(readbuff->data ()), boost::asio::buffers_begin(readbuff->data ()) + readbuff->size ()}) > 0) { if(res.code == 200) { diff --git a/libi2pd/NTCP2.h b/libi2pd/NTCP2.h index 27acb529..108c58f6 100644 --- a/libi2pd/NTCP2.h +++ b/libi2pd/NTCP2.h @@ -95,21 +95,21 @@ namespace transport const uint8_t * GetCK () const { return m_CK; }; const uint8_t * GetH () const { return m_H; }; - void KDF1Alice (); - void KDF1Bob (); - void KDF2Alice (); - void KDF2Bob (); - void KDF3Alice (); // for SessionConfirmed part 2 - void KDF3Bob (); + bool KDF1Alice (); + bool KDF1Bob (); + bool KDF2Alice (); + bool KDF2Bob (); + bool KDF3Alice (); // for SessionConfirmed part 2 + bool KDF3Bob (); - void KeyDerivationFunction1 (const uint8_t * pub, i2p::crypto::X25519Keys& priv, const uint8_t * rs, const uint8_t * epub); // for SessionRequest, (pub, priv) for DH - void KeyDerivationFunction2 (const uint8_t * sessionRequest, size_t sessionRequestLen, const uint8_t * epub); // for SessionCreate + bool KeyDerivationFunction1 (const uint8_t * pub, i2p::crypto::X25519Keys& priv, const uint8_t * rs, const uint8_t * epub); // for SessionRequest, (pub, priv) for DH + bool KeyDerivationFunction2 (const uint8_t * sessionRequest, size_t sessionRequestLen, const uint8_t * epub); // for SessionCreate void CreateEphemeralKey (); - void CreateSessionRequestMessage (std::mt19937& rng); - void CreateSessionCreatedMessage (std::mt19937& rng); + bool CreateSessionRequestMessage (std::mt19937& rng); + bool CreateSessionCreatedMessage (std::mt19937& rng); void CreateSessionConfirmedMessagePart1 (const uint8_t * nonce); - void CreateSessionConfirmedMessagePart2 (const uint8_t * nonce); + bool CreateSessionConfirmedMessagePart2 (const uint8_t * nonce); bool ProcessSessionRequestMessage (uint16_t& paddingLen, bool& clockSkew); bool ProcessSessionCreatedMessage (uint16_t& paddingLen); @@ -172,13 +172,17 @@ namespace transport void HandleSessionRequestSent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleSessionRequestReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void ProcessSessionRequest (size_t len); void HandleSessionRequestPaddingReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleSessionCreatedSent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleSessionCreatedReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void ProcessSessionCreated (size_t len); void HandleSessionCreatedPaddingReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleSessionConfirmedSent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleSessionConfirmedReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); - + void ProcessSessionConfirmed (); + void EstablishSessionAfterSessionConfirmed (std::shared_ptr > buf, size_t size); + // data void ReceiveLength (); void HandleReceivedLength (const boost::system::error_code& ecode, std::size_t bytes_transferred); @@ -239,6 +243,18 @@ namespace transport class NTCP2Server: private i2p::util::RunnableServiceWithWork { + private: + + class EstablisherService: public i2p::util::RunnableServiceWithWork + { + public: + + EstablisherService (): RunnableServiceWithWork ("NTCP2e") {}; + auto& GetService () { return GetIOService (); }; + void Start () { StartIOService (); }; + void Stop () { StopIOService (); }; + }; + public: enum ProxyType @@ -247,13 +263,14 @@ namespace transport eSocksProxy, eHTTPProxy }; - + NTCP2Server (); ~NTCP2Server (); void Start (); void Stop (); - boost::asio::io_service& GetService () { return GetIOService (); }; + auto& GetService () { return GetIOService (); }; + auto& GetEstablisherService () { return m_EstablisherService.GetService (); }; std::mt19937& GetRng () { return m_Rng; }; bool AddNTCP2Session (std::shared_ptr session, bool incoming = false); @@ -294,7 +311,8 @@ namespace transport std::unique_ptr m_ProxyEndpoint; std::shared_ptr m_Address4, m_Address6, m_YggdrasilAddress; std::mt19937 m_Rng; - + EstablisherService m_EstablisherService; + public: // for HTTP/I2PControl diff --git a/libi2pd/NetDbRequests.cpp b/libi2pd/NetDbRequests.cpp index 0e632c26..63d161cd 100644 --- a/libi2pd/NetDbRequests.cpp +++ b/libi2pd/NetDbRequests.cpp @@ -179,7 +179,7 @@ namespace data void NetDbRequests::RequestComplete (const IdentHash& ident, std::shared_ptr r) { - GetIOService ().post ([this, ident, r]() + boost::asio::post (GetIOService (), [this, ident, r]() { std::shared_ptr request; auto it = m_RequestedDestinations.find (ident); @@ -267,7 +267,7 @@ namespace data { if (dest->IsActive ()) { - s->GetIOService ().post ([s, dest]() + boost::asio::post (s->GetIOService (), [s, dest]() { if (dest->IsActive ()) s->SendNextRequest (dest); }); @@ -345,7 +345,7 @@ namespace data void NetDbRequests::PostDatabaseSearchReplyMsg (std::shared_ptr msg) { - GetIOService ().post ([this, msg]() + boost::asio::post (GetIOService (), [this, msg]() { HandleDatabaseSearchReplyMsg (msg); }); @@ -431,7 +431,7 @@ namespace data void NetDbRequests::PostRequestDestination (const IdentHash& destination, const RequestedDestination::RequestComplete& requestComplete, bool direct) { - GetIOService ().post ([this, destination, requestComplete, direct]() + boost::asio::post (GetIOService (), [this, destination, requestComplete, direct]() { RequestDestination (destination, requestComplete, direct); }); diff --git a/libi2pd/Queue.h b/libi2pd/Queue.h index ec62bddf..0e3e4fde 100644 --- a/libi2pd/Queue.h +++ b/libi2pd/Queue.h @@ -84,7 +84,7 @@ namespace util return m_Queue.empty (); } - int GetSize () + int GetSize () const { std::unique_lock l(m_QueueMutex); return m_Queue.size (); @@ -134,7 +134,7 @@ namespace util private: std::list m_Queue; - std::mutex m_QueueMutex; + mutable std::mutex m_QueueMutex; std::condition_variable m_NonEmpty; }; } diff --git a/libi2pd/Reseed.cpp b/libi2pd/Reseed.cpp index f8307a56..e58e898b 100644 --- a/libi2pd/Reseed.cpp +++ b/libi2pd/Reseed.cpp @@ -552,7 +552,7 @@ namespace data if (!url.port) url.port = 443; - boost::asio::io_service service; + boost::asio::io_context service; boost::system::error_code ecode; boost::asio::ssl::context ctx(boost::asio::ssl::context::sslv23); @@ -562,11 +562,10 @@ namespace data if(proxyUrl.schema.size()) { // proxy connection - auto it = boost::asio::ip::tcp::resolver(service).resolve ( - boost::asio::ip::tcp::resolver::query (proxyUrl.host, std::to_string(proxyUrl.port)), ecode); + auto it = boost::asio::ip::tcp::resolver(service).resolve (proxyUrl.host, std::to_string(proxyUrl.port), ecode); if(!ecode) { - s.lowest_layer().connect(*it, ecode); + s.lowest_layer().connect(*it.begin (), ecode); if(!ecode) { auto & sock = s.next_layer(); @@ -599,7 +598,7 @@ namespace data LogPrint(eLogError, "Reseed: HTTP CONNECT read error: ", ecode.message()); return ""; } - if(proxyRes.parse(boost::asio::buffer_cast(readbuf.data()), readbuf.size()) <= 0) + if(proxyRes.parse(std::string {boost::asio::buffers_begin(readbuf.data ()), boost::asio::buffers_begin(readbuf.data ()) + readbuf.size ()}) <= 0) { sock.close(); LogPrint(eLogError, "Reseed: HTTP CONNECT malformed reply"); @@ -638,15 +637,13 @@ namespace data else { // direct connection - auto it = boost::asio::ip::tcp::resolver(service).resolve ( - boost::asio::ip::tcp::resolver::query (url.host, std::to_string(url.port)), ecode); + auto endpoints = boost::asio::ip::tcp::resolver(service).resolve (url.host, std::to_string(url.port), ecode); if (!ecode) { bool connected = false; - boost::asio::ip::tcp::resolver::iterator end; - while (it != end) + for (const auto& it: endpoints) { - boost::asio::ip::tcp::endpoint ep = *it; + boost::asio::ip::tcp::endpoint ep = it; bool supported = false; if (!ep.address ().is_unspecified ()) { @@ -666,7 +663,6 @@ namespace data break; } } - it++; } if (!connected) { @@ -746,19 +742,16 @@ namespace data if (!url.port) url.port = 80; boost::system::error_code ecode; - boost::asio::io_service service; + boost::asio::io_context service; boost::asio::ip::tcp::socket s(service, boost::asio::ip::tcp::v6()); - auto it = boost::asio::ip::tcp::resolver(service).resolve ( - boost::asio::ip::tcp::resolver::query (url.host, std::to_string(url.port)), ecode); - + auto endpoints = boost::asio::ip::tcp::resolver(service).resolve (url.host, std::to_string(url.port), ecode); if (!ecode) { bool connected = false; - boost::asio::ip::tcp::resolver::iterator end; - while (it != end) + for (const auto& it: endpoints) { - boost::asio::ip::tcp::endpoint ep = *it; + boost::asio::ip::tcp::endpoint ep = it; if ( i2p::util::net::IsYggdrasilAddress (ep.address ()) && i2p::context.SupportsMesh () @@ -772,7 +765,6 @@ namespace data break; } } - it++; } if (!connected) { diff --git a/libi2pd/RouterContext.cpp b/libi2pd/RouterContext.cpp index 558e9608..fdcdb145 100644 --- a/libi2pd/RouterContext.cpp +++ b/libi2pd/RouterContext.cpp @@ -140,7 +140,7 @@ namespace i2p { boost::asio::ip::address addr; if (!host.empty ()) - addr = boost::asio::ip::address::from_string (host); + addr = boost::asio::ip::make_address (host); if (!addr.is_v4()) addr = boost::asio::ip::address_v4 (); routerInfo.AddNTCP2Address (m_NTCP2Keys->staticPublicKey, m_NTCP2Keys->iv, addr, ntcp2Port); @@ -161,7 +161,7 @@ namespace i2p { boost::asio::ip::address addr; if (!host.empty ()) - addr = boost::asio::ip::address::from_string (host); + addr = boost::asio::ip::make_address (host); if (!addr.is_v4()) addr = boost::asio::ip::address_v4 (); routerInfo.AddSSU2Address (m_SSU2Keys->staticPublicKey, m_SSU2Keys->intro, addr, ssu2Port); @@ -192,7 +192,7 @@ namespace i2p ntcp2Host = host; boost::asio::ip::address addr; if (!ntcp2Host.empty ()) - addr = boost::asio::ip::address::from_string (ntcp2Host); + addr = boost::asio::ip::make_address (ntcp2Host); if (!addr.is_v6()) addr = boost::asio::ip::address_v6 (); routerInfo.AddNTCP2Address (m_NTCP2Keys->staticPublicKey, m_NTCP2Keys->iv, addr, ntcp2Port); @@ -211,7 +211,7 @@ namespace i2p { boost::asio::ip::address addr; if (!host.empty ()) - addr = boost::asio::ip::address::from_string (host); + addr = boost::asio::ip::make_address (host); if (!addr.is_v6()) addr = boost::asio::ip::address_v6 (); routerInfo.AddSSU2Address (m_SSU2Keys->staticPublicKey, m_SSU2Keys->intro, addr, ssu2Port); @@ -802,7 +802,7 @@ namespace i2p i2p::config::GetOption("host", ntcp2Host); if (!ntcp2Host.empty () && ntcp2Port) { - auto addr = boost::asio::ip::address::from_string (ntcp2Host); + auto addr = boost::asio::ip::make_address (ntcp2Host); if (addr.is_v6 ()) { m_RouterInfo.AddNTCP2Address (m_NTCP2Keys->staticPublicKey, m_NTCP2Keys->iv, addr, ntcp2Port); @@ -831,7 +831,7 @@ namespace i2p std::string host; i2p::config::GetOption("host", host); if (!host.empty ()) { - auto addr = boost::asio::ip::address::from_string (host); + auto addr = boost::asio::ip::make_address (host); if (addr.is_v6 ()) { m_RouterInfo.AddSSU2Address (m_SSU2Keys->staticPublicKey, m_SSU2Keys->intro, addr, ssu2Port); @@ -900,7 +900,7 @@ namespace i2p std::string host; i2p::config::GetOption("host", host); if (!host.empty ()) { - auto addr = boost::asio::ip::address::from_string (host); + auto addr = boost::asio::ip::make_address (host); if (addr.is_v4 ()) { m_RouterInfo.AddNTCP2Address (m_NTCP2Keys->staticPublicKey, m_NTCP2Keys->iv, addr, ntcp2Port); @@ -930,7 +930,7 @@ namespace i2p std::string host; i2p::config::GetOption("host", host); if (!host.empty ()) { - auto addr = boost::asio::ip::address::from_string (host); + auto addr = boost::asio::ip::make_address (host); if (addr.is_v4 ()) { m_RouterInfo.AddSSU2Address (m_SSU2Keys->staticPublicKey, m_SSU2Keys->intro, addr, ssu2Port); @@ -1185,7 +1185,7 @@ namespace i2p void RouterContext::ProcessGarlicMessage (std::shared_ptr msg) { if (m_Service) - m_Service->GetService ().post (std::bind (&RouterContext::PostGarlicMessage, this, msg)); + boost::asio::post (m_Service->GetService (), std::bind (&RouterContext::PostGarlicMessage, this, msg)); else LogPrint (eLogError, "Router: service is NULL"); } @@ -1213,7 +1213,7 @@ namespace i2p void RouterContext::ProcessDeliveryStatusMessage (std::shared_ptr msg) { if (m_Service) - m_Service->GetService ().post (std::bind (&RouterContext::PostDeliveryStatusMessage, this, msg)); + boost::asio::post (m_Service->GetService (), std::bind (&RouterContext::PostDeliveryStatusMessage, this, msg)); else LogPrint (eLogError, "Router: service is NULL"); } @@ -1242,7 +1242,7 @@ namespace i2p } data; memcpy (data.k, key, 32); data.t = tag; - m_Service->GetService ().post ([this,data](void) + boost::asio::post (m_Service->GetService (), [this,data](void) { AddECIESx25519Key (data.k, data.t); }); @@ -1408,7 +1408,7 @@ namespace i2p auto onDrop = [this]() { if (m_Service) - m_Service->GetService ().post ([this]() { HandlePublishResendTimer (boost::system::error_code ()); }); + boost::asio::post (m_Service->GetService (), [this]() { HandlePublishResendTimer (boost::system::error_code ()); }); }; if (i2p::transport::transports.IsConnected (floodfill->GetIdentHash ()) || // already connected (floodfill->IsReachableFrom (i2p::context.GetRouterInfo ()) && // are we able to connect diff --git a/libi2pd/RouterContext.h b/libi2pd/RouterContext.h index 0dfef254..5060866a 100644 --- a/libi2pd/RouterContext.h +++ b/libi2pd/RouterContext.h @@ -90,7 +90,7 @@ namespace garlic public: RouterService (): RunnableServiceWithWork ("Router") {}; - boost::asio::io_service& GetService () { return GetIOService (); }; + auto& GetService () { return GetIOService (); }; void Start () { StartIOService (); }; void Stop () { StopIOService (); }; }; diff --git a/libi2pd/RouterInfo.cpp b/libi2pd/RouterInfo.cpp index 2321a1e0..0d53f6cd 100644 --- a/libi2pd/RouterInfo.cpp +++ b/libi2pd/RouterInfo.cpp @@ -257,7 +257,7 @@ namespace data if (!strcmp (key, "host")) { boost::system::error_code ecode; - address->host = boost::asio::ip::address::from_string (value, ecode); + address->host = boost::asio::ip::make_address (value, ecode); if (!ecode && !address->host.is_unspecified ()) { if (!i2p::transport::transports.IsInReservedRange (address->host) || diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index a53c877c..982940c5 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -81,7 +81,7 @@ namespace transport found = true; LogPrint (eLogDebug, "SSU2: Opening IPv4 socket at Start"); OpenSocket (boost::asio::ip::udp::endpoint (m_AddressV4, port)); - m_ReceiveService.GetService ().post( + boost::asio::post (m_ReceiveService.GetService (), [this]() { Receive (m_SocketV4); @@ -93,8 +93,8 @@ namespace transport found = true; LogPrint (eLogDebug, "SSU2: Opening IPv6 socket at Start"); OpenSocket (boost::asio::ip::udp::endpoint (m_AddressV6, port)); - m_ReceiveService.GetService ().post( - [this]() + boost::asio::post (m_ReceiveService.GetService (), + [this]() { Receive (m_SocketV6); }); @@ -451,7 +451,7 @@ namespace transport m_ReceivedPacketsQueue.push_back (packet); } if (empty) - GetService ().post([this]() { HandleReceivedPacketsQueue (); }); + boost::asio::post (GetService (), [this]() { HandleReceivedPacketsQueue (); }); } void SSU2Server::InsertToReceivedPacketsQueue (std::list& packets) @@ -471,7 +471,7 @@ namespace transport } } if (!queueSize) - GetService ().post([this]() { HandleReceivedPacketsQueue (); }); + boost::asio::post (GetService (), [this]() { HandleReceivedPacketsQueue (); }); } void SSU2Server::HandleReceivedPacketsQueue () @@ -522,7 +522,7 @@ namespace transport void SSU2Server::RequestRemoveSession (uint64_t connID) { - GetService ().post ([connID, this]() { RemoveSession (connID); }); + boost::asio::post (GetService (), [connID, this]() { RemoveSession (connID); }); } void SSU2Server::AddSessionByRouterHash (std::shared_ptr session) @@ -550,7 +550,7 @@ namespace transport // move unsent msgs to new session oldSession->MoveSendQueue (session); // terminate existing - GetService ().post (std::bind (&SSU2Session::RequestTermination, oldSession, eSSU2TerminationReasonReplacedByNewSession)); + boost::asio::post (GetService (), std::bind (&SSU2Session::RequestTermination, oldSession, eSSU2TerminationReasonReplacedByNewSession)); } } } @@ -878,7 +878,7 @@ namespace transport { // session with router found, trying to send peer test if requested if (peerTest && existingSession->IsEstablished ()) - GetService ().post ([existingSession]() { existingSession->SendPeerTest (); }); + boost::asio::post (GetService (), [existingSession]() { existingSession->SendPeerTest (); }); return false; } // check is no pending session @@ -905,9 +905,9 @@ namespace transport session->SetOnEstablished ([session]() {session->SendPeerTest (); }); if (isValidEndpoint) // we know endpoint - GetService ().post ([session]() { session->Connect (); }); + boost::asio::post (GetService (), [session]() { session->Connect (); }); else if (address->UsesIntroducer ()) // we don't know endpoint yet - GetService ().post (std::bind (&SSU2Server::ConnectThroughIntroducer, this, session)); + boost::asio::post (GetService (), std::bind (&SSU2Server::ConnectThroughIntroducer, this, session)); else return false; } @@ -1050,7 +1050,7 @@ namespace transport if (!remoteAddr || !remoteAddr->IsPeerTesting () || (v4 && !remoteAddr->IsV4 ()) || (!v4 && !remoteAddr->IsV6 ())) return false; if (session->IsEstablished ()) - GetService ().post ([session]() { session->SendPeerTest (); }); + boost::asio::post (GetService (), [session]() { session->SendPeerTest (); }); else session->SetOnEstablished ([session]() { session->SendPeerTest (); }); return true; @@ -1764,7 +1764,7 @@ namespace transport bool SSU2Server::SetProxy (const std::string& address, uint16_t port) { boost::system::error_code ecode; - auto addr = boost::asio::ip::address::from_string (address, ecode); + auto addr = boost::asio::ip::make_address (address, ecode); if (!ecode && !addr.is_unspecified () && port) { m_IsThroughProxy = true; diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 95f053a9..715e72ab 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -61,7 +61,7 @@ namespace transport public: ReceiveService (const std::string& name): RunnableService (name) {}; - boost::asio::io_service& GetService () { return GetIOService (); }; + auto& GetService () { return GetIOService (); }; void Start () { StartIOService (); }; void Stop () { StopIOService (); }; }; @@ -73,7 +73,7 @@ namespace transport void Start (); void Stop (); - boost::asio::io_service& GetService () { return GetIOService (); }; + auto& GetService () { return GetIOService (); }; void SetLocalAddress (const boost::asio::ip::address& localAddress); bool SetProxy (const std::string& address, uint16_t port); bool UsesProxy () const { return m_IsThroughProxy; }; diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index ece2979d..7a0a39a4 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -66,14 +66,14 @@ namespace stream } } - Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, + Stream::Stream (boost::asio::io_context& service, StreamingDestination& local, std::shared_ptr remote, int port): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_DropWindowDelaySequenceNumber (0), m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed m_Status (eStreamStatusNew), m_IsIncoming (false), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (false), - m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_LocalDestination (local), + m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_DoubleWinIncCounter (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_RTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_SlowRTT2 (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), @@ -95,13 +95,13 @@ namespace stream m_PacketACKInterval = (1000000LL*STREAMING_MTU)/inboundSpeed; } - Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): + Stream::Stream (boost::asio::io_context& service, StreamingDestination& local): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_DropWindowDelaySequenceNumber (0), m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed m_Status (eStreamStatusNew), m_IsIncoming (true), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (false), - m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_LocalDestination (local), + m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_DoubleWinIncCounter (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_SlowRTT2 (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0), @@ -495,6 +495,7 @@ namespace stream return; } int rttSample = INT_MAX; + int incCounter = 0; m_IsNAcked = false; m_IsResendNeeded = false; int nackCount = packet->GetNACKCount (); @@ -537,8 +538,7 @@ namespace stream m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; if (m_WindowSize < MAX_WINDOW_SIZE && !m_IsFirstACK) - if (m_RTT < m_LocalDestination.GetRandom () % INITIAL_RTT) // dirty - m_WindowIncCounter++; + incCounter++; } else break; @@ -552,7 +552,7 @@ namespace stream m_SlowRTT2 = rttSample; m_PrevRTTSample = rttSample; m_Jitter = rttSample / 10; // 10% - m_Jitter += 5; // for low-latency connections + m_Jitter += 15; // for low-latency connections m_IsFirstRttSample = false; } else @@ -569,9 +569,23 @@ namespace stream jitter = m_PrevRTTSample - rttSample; else jitter = rttSample / 10; // 10% - jitter += 5; // for low-latency connections + jitter += 15; // for low-latency connections m_Jitter = (0.05 * jitter) + (1.0 - 0.05) * m_Jitter; } + if (rttSample > m_SlowRTT) + { + incCounter = 0; + m_DoubleWinIncCounter = 1; + } + else if (rttSample < m_SlowRTT) + { + if (m_DoubleWinIncCounter) + { + incCounter = incCounter * 2; + m_DoubleWinIncCounter = 0; + } + } + m_WindowIncCounter = m_WindowIncCounter + incCounter; // // delay-based CC if ((m_SlowRTT2 > m_SlowRTT + m_Jitter && rttSample > m_SlowRTT2 && rttSample > m_PrevRTTSample) && !m_IsWinDropped) // Drop window if RTT grows too fast, late detection @@ -667,7 +681,7 @@ namespace stream { // make sure that AsycReceive complete auto s = shared_from_this(); - m_Service.post ([s]() + boost::asio::post (m_Service, [s]() { s->m_ReceiveTimer.cancel (); }); @@ -695,7 +709,7 @@ namespace stream else if (handler) handler(boost::system::error_code ()); auto s = shared_from_this (); - m_Service.post ([s, buffer]() + boost::asio::post (m_Service, [s, buffer]() { if (buffer) s->m_SendBuffer.Add (buffer); @@ -1058,7 +1072,7 @@ namespace stream m_LocalDestination.GetOwner ()->Sign (packet, size, signature); p->len = size; - m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); + boost::asio::post (m_Service, std::bind (&Stream::SendPacket, shared_from_this (), p)); LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID); } @@ -1143,7 +1157,7 @@ namespace stream CancelRemoteLeaseChange (); UpdateCurrentRemoteLease (true); } - if (m_RemoteLeaseChangeTime && m_IsRemoteLeaseChangeInProgress && ts > m_RemoteLeaseChangeTime + INITIAL_RTT) + if (m_RemoteLeaseChangeTime && m_IsRemoteLeaseChangeInProgress && ts > m_RemoteLeaseChangeTime + INITIAL_RTO) { CancelRemoteLeaseChange (); m_CurrentRemoteLease = m_NextRemoteLease; @@ -1297,8 +1311,6 @@ namespace stream else if (m_IsResendNeeded) // resend packets ResendPacket (); // delay-based CC - else if (!m_IsWinDropped && int(m_SentPackets.size ()) == m_WindowSize) // we sending packets too fast, early detection - ProcessWindowDrop (); else if (m_WindowSize > int(m_SentPackets.size ())) // send packets SendBuffer (); } @@ -1855,7 +1867,7 @@ namespace stream if (it == m_Streams.end ()) return false; auto s = it->second; - m_Owner->GetService ().post ([this, s] () + boost::asio::post (m_Owner->GetService (), [this, s] () { s->Close (); // try to send FIN s->Terminate (false); @@ -1868,7 +1880,7 @@ namespace stream { m_Acceptor = acceptor; // we must set it immediately for IsAcceptorSet auto s = shared_from_this (); - m_Owner->GetService ().post([s](void) + boost::asio::post (m_Owner->GetService (), [s](void) { // take care about incoming queue for (auto& it: s->m_PendingIncomingStreams) @@ -1887,7 +1899,7 @@ namespace stream void StreamingDestination::AcceptOnce (const Acceptor& acceptor) { - m_Owner->GetService ().post([acceptor, this](void) + boost::asio::post (m_Owner->GetService (), [acceptor, this](void) { if (!m_PendingIncomingStreams.empty ()) { diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 11fc04d1..36c13eff 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -19,6 +19,7 @@ #include #include #include "Base.h" +#include "Gzip.h" #include "I2PEndian.h" #include "Identity.h" #include "LeaseSet.h" @@ -54,13 +55,13 @@ namespace stream const size_t COMPRESSION_THRESHOLD_SIZE = 66; const int MAX_NUM_RESEND_ATTEMPTS = 10; const int INITIAL_WINDOW_SIZE = 10; - const int MIN_WINDOW_SIZE = 2; + const int MIN_WINDOW_SIZE = 3; const int MAX_WINDOW_SIZE = 512; const double RTT_EWMA_ALPHA = 0.25; const double SLOWRTT_EWMA_ALPHA = 0.05; const double PREV_SPEED_KEEP_TIME_COEFF = 0.35; // 0.1 - 1 // how long will the window size stay around the previous drop level, less is longer const int MIN_RTO = 20; // in milliseconds - const int INITIAL_RTT = 8000; // in milliseconds + const int INITIAL_RTT = 1500; // in milliseconds const int INITIAL_RTO = 9000; // in milliseconds const int INITIAL_PACING_TIME = 1000 * INITIAL_RTT / INITIAL_WINDOW_SIZE; // in microseconds const int MIN_SEND_ACK_TIMEOUT = 2; // in milliseconds @@ -175,9 +176,9 @@ namespace stream { public: - Stream (boost::asio::io_service& service, StreamingDestination& local, + Stream (boost::asio::io_context& service, StreamingDestination& local, std::shared_ptr remote, int port = 0); // outgoing - Stream (boost::asio::io_service& service, StreamingDestination& local); // incoming + Stream (boost::asio::io_context& service, StreamingDestination& local); // incoming ~Stream (); uint32_t GetSendStreamID () const { return m_SendStreamID; }; @@ -202,7 +203,7 @@ namespace stream size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); }; size_t Receive (uint8_t * buf, size_t len, int timeout); - void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); }; + void AsyncClose() { boost::asio::post(m_Service, std::bind(&Stream::Close, shared_from_this())); }; /** only call close from destination thread, use Stream::AsyncClose for other threads */ void Close (); @@ -238,7 +239,7 @@ namespace stream void UpdateCurrentRemoteLease (bool expired = false); template - void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout); + void HandleReceiveTimer (const boost::system::error_code& ecode, Buffer& buffer, ReceiveHandler handler, int remainingTimeout); void ScheduleSend (); void HandleSendTimer (const boost::system::error_code& ecode); @@ -255,7 +256,7 @@ namespace stream private: - boost::asio::io_service& m_Service; + boost::asio::io_context& m_Service; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; uint32_t m_DropWindowDelaySequenceNumber; uint32_t m_TunnelsChangeSequenceNumber; @@ -274,6 +275,7 @@ namespace stream bool m_IsTimeOutResend; bool m_IsImmediateAckRequested; bool m_IsRemoteLeaseChangeInProgress; + bool m_DoubleWinIncCounter; StreamingDestination& m_LocalDestination; std::shared_ptr m_RemoteIdentity; std::shared_ptr m_TransientVerifier; // in case of offline key @@ -375,7 +377,7 @@ namespace stream void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout) { auto s = shared_from_this(); - m_Service.post ([s, buffer, handler, timeout](void) + boost::asio::post (m_Service, [s, buffer, handler, timeout](void) { if (!s->m_ReceiveQueue.empty () || s->m_Status == eStreamStatusReset) s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler, 0); @@ -394,9 +396,9 @@ namespace stream } template - void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout) + void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, Buffer& buffer, ReceiveHandler handler, int remainingTimeout) { - size_t received = ConcatenatePackets (boost::asio::buffer_cast(buffer), boost::asio::buffer_size(buffer)); + size_t received = ConcatenatePackets ((uint8_t *)buffer.data (), buffer.size ()); if (received > 0) handler (boost::system::error_code (), received); else if (ecode == boost::asio::error::operation_aborted) diff --git a/libi2pd/Timestamp.cpp b/libi2pd/Timestamp.cpp index c5c37cd7..a22e9bde 100644 --- a/libi2pd/Timestamp.cpp +++ b/libi2pd/Timestamp.cpp @@ -60,18 +60,16 @@ namespace util static void SyncTimeWithNTP (const std::string& address) { LogPrint (eLogInfo, "Timestamp: NTP request to ", address); - boost::asio::io_service service; + boost::asio::io_context service; boost::system::error_code ec; - auto it = boost::asio::ip::udp::resolver (service).resolve ( - boost::asio::ip::udp::resolver::query (address, "ntp"), ec); + auto endpoints = boost::asio::ip::udp::resolver (service).resolve (address, "ntp", ec); if (!ec) { bool found = false; - boost::asio::ip::udp::resolver::iterator end; boost::asio::ip::udp::endpoint ep; - while (it != end) + for (const auto& it: endpoints) { - ep = *it; + ep = it; if (!ep.address ().is_unspecified ()) { if (ep.address ().is_v4 ()) @@ -88,7 +86,6 @@ namespace util } } if (found) break; - it++; } if (!found) { @@ -154,7 +151,7 @@ namespace util { m_IsRunning = true; LogPrint(eLogInfo, "Timestamp: NTP time sync starting"); - m_Service.post (std::bind (&NTPTimeSync::Sync, this)); + boost::asio::post (m_Service, std::bind (&NTPTimeSync::Sync, this)); m_Thread.reset (new std::thread (std::bind (&NTPTimeSync::Run, this))); } else diff --git a/libi2pd/Timestamp.h b/libi2pd/Timestamp.h index d949d416..00c60433 100644 --- a/libi2pd/Timestamp.h +++ b/libi2pd/Timestamp.h @@ -52,7 +52,7 @@ namespace util bool m_IsRunning; std::unique_ptr m_Thread; - boost::asio::io_service m_Service; + boost::asio::io_context m_Service; boost::asio::deadline_timer m_Timer; int m_SyncInterval; std::vector m_NTPServersList; diff --git a/libi2pd/TransitTunnel.cpp b/libi2pd/TransitTunnel.cpp index edf96c31..8284dc14 100644 --- a/libi2pd/TransitTunnel.cpp +++ b/libi2pd/TransitTunnel.cpp @@ -122,14 +122,86 @@ namespace tunnel } } + TransitTunnels::TransitTunnels (): + m_IsRunning (false) + { + } + + TransitTunnels::~TransitTunnels () + { + Stop (); + } + void TransitTunnels::Start () { + m_IsRunning = true; + m_Thread.reset (new std::thread (std::bind (&TransitTunnels::Run, this))); } void TransitTunnels::Stop () { + m_IsRunning = false; + m_TunnelBuildMsgQueue.WakeUp (); + if (m_Thread) + { + m_Thread->join (); + m_Thread = nullptr; + } m_TransitTunnels.clear (); } + + void TransitTunnels::Run () + { + i2p::util::SetThreadName("TBM"); + uint64_t lastTs = 0; + std::list > msgs; + while (m_IsRunning) + { + try + { + if (m_TunnelBuildMsgQueue.Wait (TRANSIT_TUNNELS_QUEUE_WAIT_INTERVAL, 0)) + { + m_TunnelBuildMsgQueue.GetWholeQueue (msgs); + while (!msgs.empty ()) + { + auto msg = msgs.front (); msgs.pop_front (); + if (!msg) continue; + uint8_t typeID = msg->GetTypeID (); + switch (typeID) + { + case eI2NPShortTunnelBuild: + HandleShortTransitTunnelBuildMsg (std::move (msg)); + break; + case eI2NPVariableTunnelBuild: + HandleVariableTransitTunnelBuildMsg (std::move (msg)); + break; + default: + LogPrint (eLogWarning, "TransitTunnel: Unexpected message type ", (int) typeID); + } + if (!m_IsRunning) break; + } + } + if (m_IsRunning) + { + uint64_t ts = i2p::util::GetSecondsSinceEpoch (); + if (ts >= lastTs + TUNNEL_MANAGE_INTERVAL || ts + TUNNEL_MANAGE_INTERVAL < lastTs) + { + ManageTransitTunnels (ts); + lastTs = ts; + } + } + } + catch (std::exception& ex) + { + LogPrint (eLogError, "TransitTunnel: Runtime exception: ", ex.what ()); + } + } + } + + void TransitTunnels::PostTransitTunnelBuildMsg (std::shared_ptr&& msg) + { + if (msg) m_TunnelBuildMsgQueue.Put (msg); + } void TransitTunnels::HandleShortTransitTunnelBuildMsg (std::shared_ptr&& msg) { @@ -234,6 +306,7 @@ namespace tunnel { if (transitTunnel) { + LogPrint (eLogDebug, "TransitTunnel: Failed to send reply for transit tunnel ", transitTunnel->GetTunnelID ()); auto t = transitTunnel->GetCreationTime (); if (t > i2p::tunnel::TUNNEL_EXPIRATION_TIMEOUT) // make transit tunnel expired diff --git a/libi2pd/TransitTunnel.h b/libi2pd/TransitTunnel.h index fc8676d0..14f9b997 100644 --- a/libi2pd/TransitTunnel.h +++ b/libi2pd/TransitTunnel.h @@ -14,6 +14,7 @@ #include #include #include "Crypto.h" +#include "Queue.h" #include "I2NPProtocol.h" #include "TunnelEndpoint.h" #include "TunnelGateway.h" @@ -72,7 +73,7 @@ namespace tunnel const i2p::data::IdentHash& nextIdent, uint32_t nextTunnelID, const i2p::crypto::AESKey& layerKey, const i2p::crypto::AESKey& ivKey): TransitTunnel (receiveTunnelID, nextIdent, nextTunnelID, - layerKey, ivKey), m_Gateway(this) {}; + layerKey, ivKey), m_Gateway(*this) {}; void SendTunnelDataMsg (std::shared_ptr msg) override; void FlushTunnelDataMsgs () override; @@ -109,33 +110,46 @@ namespace tunnel const i2p::crypto::AESKey& layerKey, const i2p::crypto::AESKey& ivKey, bool isGateway, bool isEndpoint); + + const int TRANSIT_TUNNELS_QUEUE_WAIT_INTERVAL = 10; // in seconds + class TransitTunnels { public: + TransitTunnels (); + ~TransitTunnels (); + void Start (); void Stop (); - void ManageTransitTunnels (uint64_t ts); - + void PostTransitTunnelBuildMsg (std::shared_ptr&& msg); + size_t GetNumTransitTunnels () const { return m_TransitTunnels.size (); } int GetTransitTunnelsExpirationTimeout (); - - void HandleShortTransitTunnelBuildMsg (std::shared_ptr&& msg); - void HandleVariableTransitTunnelBuildMsg (std::shared_ptr&& msg); private: bool AddTransitTunnel (std::shared_ptr tunnel); + void ManageTransitTunnels (uint64_t ts); + + void HandleShortTransitTunnelBuildMsg (std::shared_ptr&& msg); + void HandleVariableTransitTunnelBuildMsg (std::shared_ptr&& msg); bool HandleBuildRequestRecords (int num, uint8_t * records, uint8_t * clearText); + void Run (); + private: + volatile bool m_IsRunning; + std::unique_ptr m_Thread; std::list > m_TransitTunnels; - + i2p::util::Queue > m_TunnelBuildMsgQueue; + public: // for HTTP only - auto& GetTransitTunnels () const { return m_TransitTunnels; }; + const auto& GetTransitTunnels () const { return m_TransitTunnels; }; + size_t GetTunnelBuildMsgQueueSize () const { return m_TunnelBuildMsgQueue.GetSize (); }; }; } } diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index b07b1b0b..03416137 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -174,8 +174,8 @@ namespace transport { if (!m_Service) { - m_Service = new boost::asio::io_service (); - m_Work = new boost::asio::io_service::work (*m_Service); + m_Service = new boost::asio::io_context (); + m_Work = new boost::asio::executor_work_guard (m_Service->get_executor ()); m_PeerCleanupTimer = new boost::asio::deadline_timer (*m_Service); m_PeerTestTimer = new boost::asio::deadline_timer (*m_Service); m_UpdateBandwidthTimer = new boost::asio::deadline_timer (*m_Service); @@ -249,7 +249,7 @@ namespace transport if (!address.empty ()) { boost::system::error_code ec; - auto addr = boost::asio::ip::address::from_string (address, ec); + auto addr = boost::asio::ip::make_address (address, ec); if (!ec) { if (m_NTCP2Server) m_NTCP2Server->SetLocalAddress (addr); @@ -275,7 +275,7 @@ namespace transport if (!address.empty ()) { boost::system::error_code ec; - auto addr = boost::asio::ip::address::from_string (address, ec); + auto addr = boost::asio::ip::make_address (address, ec); if (!ec) { if (m_NTCP2Server) m_NTCP2Server->SetLocalAddress (addr); @@ -302,7 +302,7 @@ namespace transport if (!address.empty ()) { boost::system::error_code ec; - auto addr = boost::asio::ip::address::from_string (address, ec); + auto addr = boost::asio::ip::make_address (address, ec); if (!ec && m_NTCP2Server && i2p::util::net::IsYggdrasilAddress (addr)) m_NTCP2Server->SetLocalAddress (addr); } @@ -447,28 +447,29 @@ namespace transport return std::max (bwCongestionLevel, tbwCongestionLevel); } - void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg) + std::future > Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg) { if (m_IsOnline) - SendMessages (ident, { msg }); + return SendMessages (ident, { msg }); + return {}; // invalid future } - void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs) + std::future > Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs) { std::list > msgs1; msgs.swap (msgs1); - SendMessages (ident, std::move (msgs1)); + return SendMessages (ident, std::move (msgs1)); } - void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs) + std::future > Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs) { - m_Service->post ([this, ident, msgs = std::move(msgs)] () mutable + return boost::asio::post (*m_Service, boost::asio::use_future ([this, ident, msgs = std::move(msgs)] () mutable { - PostMessages (ident, msgs); - }); + return PostMessages (ident, msgs); + })); } - void Transports::PostMessages (const i2p::data::IdentHash& ident, std::list >& msgs) + std::shared_ptr Transports::PostMessages (const i2p::data::IdentHash& ident, std::list >& msgs) { if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) { @@ -476,9 +477,9 @@ namespace transport for (auto& it: msgs) m_LoopbackHandler.PutNextMessage (std::move (it)); m_LoopbackHandler.Flush (); - return; + return nullptr; } - if(RoutesRestricted() && !IsRestrictedPeer(ident)) return; + if(RoutesRestricted() && !IsRestrictedPeer(ident)) return nullptr; std::shared_ptr peer; { std::lock_guard l(m_PeersMutex); @@ -489,13 +490,13 @@ namespace transport if (!peer) { // check if not banned - if (i2p::data::IsRouterBanned (ident)) return; // don't create peer to unreachable router + if (i2p::data::IsRouterBanned (ident)) return nullptr; // don't create peer to unreachable router // try to connect bool connected = false; try { auto r = netdb.FindRouter (ident); - if (r && (r->IsUnreachable () || !r->IsReachableFrom (i2p::context.GetRouterInfo ()))) return; // router found but non-reachable + if (r && (r->IsUnreachable () || !r->IsReachableFrom (i2p::context.GetRouterInfo ()))) return nullptr; // router found but non-reachable peer = std::make_shared(r, i2p::util::GetSecondsSinceEpoch ()); { @@ -509,12 +510,16 @@ namespace transport { LogPrint (eLogError, "Transports: PostMessages exception:", ex.what ()); } - if (!connected) return; + if (!connected) return nullptr; } - if (!peer) return; + if (!peer) return nullptr; if (peer->IsConnected ()) - peer->sessions.front ()->SendI2NPMessages (msgs); + { + auto session = peer->sessions.front (); + if (session) session->SendI2NPMessages (msgs); + return session; + } else { auto sz = peer->delayedMessages.size (); @@ -527,7 +532,7 @@ namespace transport LogPrint (eLogWarning, "Transports: Router ", ident.ToBase64 (), " is banned. Peer dropped"); std::lock_guard l(m_PeersMutex); m_Peers.erase (ident); - return; + return nullptr; } } if (sz > MAX_NUM_DELAYED_MESSAGES/2) @@ -549,6 +554,7 @@ namespace transport m_Peers.erase (ident); } } + return nullptr; } bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr peer) @@ -719,7 +725,7 @@ namespace transport void Transports::RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident) { - m_Service->post (std::bind (&Transports::HandleRequestComplete, this, r, ident)); + boost::asio::post (*m_Service, std::bind (&Transports::HandleRequestComplete, this, r, ident)); } void Transports::HandleRequestComplete (std::shared_ptr r, i2p::data::IdentHash ident) @@ -856,7 +862,7 @@ namespace transport void Transports::PeerConnected (std::shared_ptr session) { - m_Service->post([session, this]() + boost::asio::post (*m_Service, [session, this]() { auto remoteIdentity = session->GetRemoteIdentity (); if (!remoteIdentity) return; @@ -928,7 +934,7 @@ namespace transport void Transports::PeerDisconnected (std::shared_ptr session) { - m_Service->post([session, this]() + boost::asio::post (*m_Service, [session, this]() { auto remoteIdentity = session->GetRemoteIdentity (); if (!remoteIdentity) return; @@ -1276,7 +1282,7 @@ namespace transport std::string yggaddress; i2p::config::GetOption ("meshnets.yggaddress", yggaddress); if (!yggaddress.empty ()) { - yggaddr = boost::asio::ip::address_v6::from_string (yggaddress); + yggaddr = boost::asio::ip::make_address (yggaddress).to_v6 (); if (yggaddr.is_unspecified () || !i2p::util::net::IsYggdrasilAddress (yggaddr) || !i2p::util::net::IsLocalAddress (yggaddr)) { @@ -1321,7 +1327,7 @@ namespace transport if (ipv6) { std::string ipv6Addr; i2p::config::GetOption("ntcp2.addressv6", ipv6Addr); - auto addr = boost::asio::ip::address_v6::from_string (ipv6Addr); + auto addr = boost::asio::ip::make_address (ipv6Addr).to_v6 (); if (!addr.is_unspecified () && addr != boost::asio::ip::address_v6::any ()) i2p::context.UpdateNTCP2V6Address (addr); // set ipv6 address if configured } diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index 8271108d..b72d5b70 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -139,13 +140,13 @@ namespace transport bool IsOnline() const { return m_IsOnline; }; void SetOnline (bool online); - boost::asio::io_service& GetService () { return *m_Service; }; + auto& GetService () { return *m_Service; }; std::shared_ptr GetNextX25519KeysPair (); void ReuseX25519KeysPair (std::shared_ptr pair); - void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg); - void SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs); - void SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs); + std::future > SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg); + std::future > SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs); + std::future > SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs); void PeerConnected (std::shared_ptr session); void PeerDisconnected (std::shared_ptr session); @@ -189,7 +190,7 @@ namespace transport void Run (); void RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident); void HandleRequestComplete (std::shared_ptr r, i2p::data::IdentHash ident); - void PostMessages (const i2p::data::IdentHash& ident, std::list >& msgs); + std::shared_ptr PostMessages (const i2p::data::IdentHash& ident, std::list >& msgs); bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr peer); void SetPriority (std::shared_ptr peer) const; void HandlePeerCleanupTimer (const boost::system::error_code& ecode); @@ -207,8 +208,8 @@ namespace transport volatile bool m_IsOnline; bool m_IsRunning, m_IsNAT, m_CheckReserved; std::thread * m_Thread; - boost::asio::io_service * m_Service; - boost::asio::io_service::work * m_Work; + boost::asio::io_context * m_Service; + boost::asio::executor_work_guard * m_Work; boost::asio::deadline_timer * m_PeerCleanupTimer, * m_PeerTestTimer, * m_UpdateBandwidthTimer; SSU2Server * m_SSU2Server; diff --git a/libi2pd/Tunnel.cpp b/libi2pd/Tunnel.cpp index d809e48a..e7443b6c 100644 --- a/libi2pd/Tunnel.cpp +++ b/libi2pd/Tunnel.cpp @@ -373,6 +373,7 @@ namespace tunnel std::shared_ptr Tunnels::GetTunnel (uint32_t tunnelID) { + std::lock_guard l(m_TunnelsMutex); auto it = m_Tunnels.find(tunnelID); if (it != m_Tunnels.end ()) return it->second; @@ -382,11 +383,13 @@ namespace tunnel bool Tunnels::AddTunnel (std::shared_ptr tunnel) { if (!tunnel) return false; + std::lock_guard l(m_TunnelsMutex); return m_Tunnels.emplace (tunnel->GetTunnelID (), tunnel).second; } void Tunnels::RemoveTunnel (uint32_t tunnelID) { + std::lock_guard l(m_TunnelsMutex); m_Tunnels.erase (tunnelID); } @@ -655,7 +658,7 @@ namespace tunnel return; } else - m_TransitTunnels.HandleShortTransitTunnelBuildMsg (std::move (msg)); + m_TransitTunnels.PostTransitTunnelBuildMsg (std::move (msg)); } void Tunnels::HandleVariableTunnelBuildMsg (std::shared_ptr msg) @@ -678,7 +681,7 @@ namespace tunnel } } else - m_TransitTunnels.HandleVariableTransitTunnelBuildMsg (std::move (msg)); + m_TransitTunnels.PostTransitTunnelBuildMsg (std::move (msg)); } void Tunnels::HandleTunnelBuildReplyMsg (std::shared_ptr msg, bool isShort) @@ -710,7 +713,6 @@ namespace tunnel ManagePendingTunnels (ts); ManageInboundTunnels (ts); ManageOutboundTunnels (ts); - m_TransitTunnels.ManageTransitTunnels (ts); } void Tunnels::ManagePendingTunnels (uint64_t ts) diff --git a/libi2pd/Tunnel.h b/libi2pd/Tunnel.h index fcccd236..e11a360d 100644 --- a/libi2pd/Tunnel.h +++ b/libi2pd/Tunnel.h @@ -139,7 +139,7 @@ namespace tunnel public: OutboundTunnel (std::shared_ptr config): - Tunnel (config), m_Gateway (this), m_EndpointIdentHash (config->GetLastIdentHash ()) {}; + Tunnel (config), m_Gateway (*this), m_EndpointIdentHash (config->GetLastIdentHash ()) {}; void SendTunnelDataMsgTo (const uint8_t * gwHash, uint32_t gwTunnel, std::shared_ptr msg); virtual void SendTunnelDataMsgs (const std::vector& msgs); // multiple messages @@ -300,8 +300,9 @@ namespace tunnel std::map > m_PendingOutboundTunnels; // by replyMsgID std::list > m_InboundTunnels; std::list > m_OutboundTunnels; + mutable std::mutex m_TunnelsMutex; std::unordered_map > m_Tunnels; // tunnelID->tunnel known by this id - std::mutex m_PoolsMutex; + mutable std::mutex m_PoolsMutex; std::list> m_Pools; std::shared_ptr m_ExploratoryPool; i2p::util::Queue > m_Queue; @@ -320,13 +321,14 @@ namespace tunnel // for HTTP only const decltype(m_OutboundTunnels)& GetOutboundTunnels () const { return m_OutboundTunnels; }; const decltype(m_InboundTunnels)& GetInboundTunnels () const { return m_InboundTunnels; }; - auto& GetTransitTunnels () const { return m_TransitTunnels.GetTransitTunnels (); }; + const auto& GetTransitTunnels () const { return m_TransitTunnels.GetTransitTunnels (); }; size_t CountTransitTunnels() const; size_t CountInboundTunnels() const; size_t CountOutboundTunnels() const; - int GetQueueSize () { return m_Queue.GetSize (); }; + size_t GetQueueSize () const { return m_Queue.GetSize (); }; + size_t GetTBMQueueSize () const { return m_TransitTunnels.GetTunnelBuildMsgQueueSize (); }; int GetTunnelCreationSuccessRate () const { return std::round(m_TunnelCreationSuccessRate * 100); } // in percents double GetPreciseTunnelCreationSuccessRate () const { return m_TunnelCreationSuccessRate * 100; } // in percents int GetTotalTunnelCreationSuccessRate () const // in percents diff --git a/libi2pd/TunnelGateway.cpp b/libi2pd/TunnelGateway.cpp index 78a63fc4..fabc71bb 100644 --- a/libi2pd/TunnelGateway.cpp +++ b/libi2pd/TunnelGateway.cpp @@ -220,21 +220,55 @@ namespace tunnel void TunnelGateway::SendBuffer () { + // create list or tunnel messages m_Buffer.CompleteCurrentTunnelDataMessage (); std::list > newTunnelMsgs; const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs (); for (auto& tunnelMsg : tunnelDataMsgs) { auto newMsg = CreateEmptyTunnelDataMsg (false); - m_Tunnel->EncryptTunnelMsg (tunnelMsg, newMsg); - htobe32buf (newMsg->GetPayload (), m_Tunnel->GetNextTunnelID ()); + m_Tunnel.EncryptTunnelMsg (tunnelMsg, newMsg); + htobe32buf (newMsg->GetPayload (), m_Tunnel.GetNextTunnelID ()); newMsg->FillI2NPMessageHeader (eI2NPTunnelData); if (tunnelMsg->onDrop) newMsg->onDrop = tunnelMsg->onDrop; newTunnelMsgs.push_back (newMsg); m_NumSentBytes += TUNNEL_DATA_MSG_SIZE; } m_Buffer.ClearTunnelDataMsgs (); - i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), std::move (newTunnelMsgs)); + + // send + auto currentTransport = m_CurrentTransport.lock (); + if (!currentTransport) + { + // try to obtain transport from peding reequest or send thought transport is not complete + if (m_PendingTransport.valid ()) // pending request? + { + if (m_PendingTransport.wait_for(std::chrono::seconds(0)) == std::future_status::ready) + { + // pending request complete + currentTransport = m_PendingTransport.get (); // take tarnsports used in pending request + if (currentTransport) + { + if (currentTransport->IsEstablished ()) + m_CurrentTransport = currentTransport; + else + currentTransport = nullptr; + } + } + else // still pending + { + // send through transports, but don't update pedning transport + i2p::transport::transports.SendMessages (m_Tunnel.GetNextIdentHash (), std::move (newTunnelMsgs)); + return; + } + } + } + if (currentTransport) // session is good + // send to session directly + currentTransport->SendI2NPMessages (newTunnelMsgs); + else // no session yet + // send through transports + m_PendingTransport = i2p::transport::transports.SendMessages (m_Tunnel.GetNextIdentHash (), std::move (newTunnelMsgs)); } } } diff --git a/libi2pd/TunnelGateway.h b/libi2pd/TunnelGateway.h index 741bbe84..4cfd5308 100644 --- a/libi2pd/TunnelGateway.h +++ b/libi2pd/TunnelGateway.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2021, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -12,7 +12,9 @@ #include #include #include +#include #include "I2NPProtocol.h" +#include "TransportSession.h" #include "TunnelBase.h" namespace i2p @@ -45,7 +47,7 @@ namespace tunnel { public: - TunnelGateway (TunnelBase * tunnel): + TunnelGateway (TunnelBase& tunnel): m_Tunnel (tunnel), m_NumSentBytes (0) {}; void SendTunnelDataMsg (const TunnelMessageBlock& block); void PutTunnelDataMsg (const TunnelMessageBlock& block); @@ -54,9 +56,11 @@ namespace tunnel private: - TunnelBase * m_Tunnel; + TunnelBase& m_Tunnel; TunnelGatewayBuffer m_Buffer; size_t m_NumSentBytes; + std::weak_ptr m_CurrentTransport; + std::future > m_PendingTransport; }; } } diff --git a/libi2pd/util.cpp b/libi2pd/util.cpp index 47cecbbe..aff3d0e0 100644 --- a/libi2pd/util.cpp +++ b/libi2pd/util.cpp @@ -123,8 +123,8 @@ const char *inet_ntop_xp(int af, const void *src, char *dst, socklen_t size) #endif #endif -#define address_pair_v4(a,b) { boost::asio::ip::address_v4::from_string (a).to_ulong (), boost::asio::ip::address_v4::from_string (b).to_ulong () } -#define address_pair_v6(a,b) { boost::asio::ip::address_v6::from_string (a).to_bytes (), boost::asio::ip::address_v6::from_string (b).to_bytes () } +#define address_pair_v4(a,b) { boost::asio::ip::make_address (a).to_v4 ().to_uint (), boost::asio::ip::make_address(b).to_v4 ().to_uint () } +#define address_pair_v6(a,b) { boost::asio::ip::make_address (a).to_v6 ().to_bytes (), boost::asio::ip::make_address(b).to_v6 ().to_bytes () } namespace i2p { @@ -478,7 +478,7 @@ namespace net inet_ntop(af, &((sockaddr_in6 *)cur->ifa_addr)->sin6_addr, addr, INET6_ADDRSTRLEN); freeifaddrs(addrs); std::string cur_ifaddr(addr); - return boost::asio::ip::address::from_string(cur_ifaddr); + return boost::asio::ip::make_address(cur_ifaddr); } } } @@ -498,7 +498,7 @@ namespace net fallback = "127.0.0.1"; LogPrint(eLogWarning, "NetIface: Cannot find IPv4 address for interface ", ifname); } - return boost::asio::ip::address::from_string(fallback); + return boost::asio::ip::make_address(fallback); #endif } @@ -664,7 +664,7 @@ namespace net address_pair_v4("224.0.0.0", "255.255.255.255") }; - uint32_t ipv4_address = host.to_v4 ().to_ulong (); + uint32_t ipv4_address = host.to_v4 ().to_uint (); for (const auto& it : reservedIPv4Ranges) { if (ipv4_address >= it.first && ipv4_address <= it.second) return true; diff --git a/libi2pd/util.h b/libi2pd/util.h index 48bcb801..7bd35e67 100644 --- a/libi2pd/util.h +++ b/libi2pd/util.h @@ -177,7 +177,7 @@ namespace util RunnableService (const std::string& name): m_Name (name), m_IsRunning (false) {} virtual ~RunnableService () {} - boost::asio::io_service& GetIOService () { return m_Service; } + auto& GetIOService () { return m_Service; } bool IsRunning () const { return m_IsRunning; }; void StartIOService (); @@ -194,7 +194,7 @@ namespace util std::string m_Name; volatile bool m_IsRunning; std::unique_ptr m_Thread; - boost::asio::io_service m_Service; + boost::asio::io_context m_Service; }; class RunnableServiceWithWork: public RunnableService @@ -202,11 +202,11 @@ namespace util protected: RunnableServiceWithWork (const std::string& name): - RunnableService (name), m_Work (GetIOService ()) {} + RunnableService (name), m_Work (GetIOService ().get_executor ()) {} private: - boost::asio::io_service::work m_Work; + boost::asio::executor_work_guard m_Work; }; void SetThreadName (const char *name); diff --git a/libi2pd_client/BOB.cpp b/libi2pd_client/BOB.cpp index 23c3b72f..f2dab223 100644 --- a/libi2pd_client/BOB.cpp +++ b/libi2pd_client/BOB.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2023, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -129,7 +129,7 @@ namespace client BOBI2POutboundTunnel::BOBI2POutboundTunnel (const std::string& outhost, uint16_t port, std::shared_ptr localDestination, bool quiet): BOBI2PTunnel (localDestination), - m_Endpoint (boost::asio::ip::address::from_string (outhost), port), m_IsQuiet (quiet) + m_Endpoint (boost::asio::ip::make_address (outhost), port), m_IsQuiet (quiet) { } @@ -220,7 +220,7 @@ namespace client if (!inhost.empty ()) { boost::system::error_code ec; - auto addr = boost::asio::ip::address::from_string (inhost, ec); + auto addr = boost::asio::ip::make_address (inhost, ec); if (!ec) ep.address (addr); else @@ -425,7 +425,7 @@ namespace client { // TODO: FIXME: temporary validation, until hostname support is added boost::system::error_code ec; - boost::asio::ip::address::from_string(m_InHost, ec); + boost::asio::ip::make_address(m_InHost, ec); if (ec) { SendReplyError("inhost must be a valid IPv4 address."); @@ -436,7 +436,7 @@ namespace client { // TODO: FIXME: temporary validation, until hostname support is added boost::system::error_code ec; - boost::asio::ip::address::from_string(m_OutHost, ec); + boost::asio::ip::make_address(m_OutHost, ec); if (ec) { SendReplyError("outhost must be a IPv4 address."); @@ -828,7 +828,7 @@ namespace client BOBCommandChannel::BOBCommandChannel (const std::string& address, uint16_t port): RunnableService ("BOB"), - m_Acceptor (GetIOService (), boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(address), port)) + m_Acceptor (GetIOService (), boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(address), port)) { // command -> handler m_CommandHandlers[BOB_COMMAND_ZAP] = &BOBCommandSession::ZapCommandHandler; diff --git a/libi2pd_client/BOB.h b/libi2pd_client/BOB.h index 1f5fda5f..79066154 100644 --- a/libi2pd_client/BOB.h +++ b/libi2pd_client/BOB.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2023, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -254,7 +254,7 @@ namespace client void Start (); void Stop (); - boost::asio::io_service& GetService () { return GetIOService (); }; + auto& GetService () { return GetIOService (); }; void AddDestination (const std::string& name, std::shared_ptr dest); void DeleteDestination (const std::string& name); std::shared_ptr FindDestination (const std::string& name); diff --git a/libi2pd_client/ClientContext.cpp b/libi2pd_client/ClientContext.cpp index aa49f83e..8c8c2bfa 100644 --- a/libi2pd_client/ClientContext.cpp +++ b/libi2pd_client/ClientContext.cpp @@ -345,7 +345,7 @@ namespace client } std::shared_ptr ClientContext::CreateNewLocalDestination ( - boost::asio::io_service& service, bool isPublic, + boost::asio::io_context& service, bool isPublic, i2p::data::SigningKeyType sigType, i2p::data::CryptoKeyType cryptoType, const std::map * params) { @@ -399,7 +399,7 @@ namespace client return localDestination; } - std::shared_ptr ClientContext::CreateNewLocalDestination (boost::asio::io_service& service, + std::shared_ptr ClientContext::CreateNewLocalDestination (boost::asio::io_context& service, const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params) { auto it = m_Destinations.find (keys.GetPublic ()->GetIdentHash ()); @@ -631,7 +631,7 @@ namespace client if (type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { // udp client // TODO: hostnames - boost::asio::ip::udp::endpoint end (boost::asio::ip::address::from_string(address), port); + boost::asio::ip::udp::endpoint end (boost::asio::ip::make_address(address), port); if (!localDestination) localDestination = m_SharedLocalDestination; @@ -787,7 +787,7 @@ namespace client { // udp server tunnel // TODO: hostnames - boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port); + boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::make_address(host), port); if (address.empty ()) { if (!endpoint.address ().is_unspecified () && endpoint.address ().is_v6 ()) @@ -795,7 +795,7 @@ namespace client else address = "127.0.0.1"; } - auto localAddress = boost::asio::ip::address::from_string(address); + auto localAddress = boost::asio::ip::make_address(address); auto serverTunnel = std::make_shared(name, localDestination, localAddress, endpoint, inPort, gzip); if(!isUniqueLocal) { diff --git a/libi2pd_client/ClientContext.h b/libi2pd_client/ClientContext.h index adec607a..febd5f16 100644 --- a/libi2pd_client/ClientContext.h +++ b/libi2pd_client/ClientContext.h @@ -79,13 +79,13 @@ namespace client i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519, i2p::data::CryptoKeyType cryptoType = i2p::data::CRYPTO_KEY_TYPE_ELGAMAL, const std::map * params = nullptr); // used by SAM only - std::shared_ptr CreateNewLocalDestination (boost::asio::io_service& service, + std::shared_ptr CreateNewLocalDestination (boost::asio::io_context& service, bool isPublic = false, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519, i2p::data::CryptoKeyType cryptoType = i2p::data::CRYPTO_KEY_TYPE_ELGAMAL, const std::map * params = nullptr); // same as previous but on external io_service std::shared_ptr CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true, const std::map * params = nullptr); - std::shared_ptr CreateNewLocalDestination (boost::asio::io_service& service, + std::shared_ptr CreateNewLocalDestination (boost::asio::io_context& service, const i2p::data::PrivateKeys& keys, bool isPublic = true, const std::map * params = nullptr); // same as previous but on external io_service std::shared_ptr CreateNewMatchedTunnelDestination(const i2p::data::PrivateKeys &keys, diff --git a/libi2pd_client/HTTPProxy.cpp b/libi2pd_client/HTTPProxy.cpp index dba65815..13e0f571 100644 --- a/libi2pd_client/HTTPProxy.cpp +++ b/libi2pd_client/HTTPProxy.cpp @@ -98,7 +98,7 @@ namespace proxy { typedef std::function ProxyResolvedHandler; - void HandleUpstreamProxyResolved(const boost::system::error_code & ecode, boost::asio::ip::tcp::resolver::iterator itr, ProxyResolvedHandler handler); + void HandleUpstreamProxyResolved(const boost::system::error_code & ecode, boost::asio::ip::tcp::resolver::results_type endpoints, ProxyResolvedHandler handler); void SocksProxySuccess(); void HandoverToUpstreamProxy(); @@ -584,20 +584,22 @@ namespace proxy { } else { - boost::asio::ip::tcp::resolver::query q(m_ProxyURL.host, std::to_string(m_ProxyURL.port)); - m_proxy_resolver.async_resolve(q, std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) { - m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamHTTPProxyConnect, this, std::placeholders::_1)); - })); + m_proxy_resolver.async_resolve(m_ProxyURL.host, std::to_string(m_ProxyURL.port), std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, + std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) + { + m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamHTTPProxyConnect, this, std::placeholders::_1)); + })); } } else if (m_ProxyURL.schema == "socks") { /* handle upstream socks proxy */ if (!m_ProxyURL.port) m_ProxyURL.port = 9050; // default to tor default if not specified - boost::asio::ip::tcp::resolver::query q(m_ProxyURL.host, std::to_string(m_ProxyURL.port)); - m_proxy_resolver.async_resolve(q, std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) { - m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamSocksProxyConnect, this, std::placeholders::_1)); - })); + m_proxy_resolver.async_resolve(m_ProxyURL.host, std::to_string(m_ProxyURL.port), std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, + std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) + { + m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamSocksProxyConnect, this, std::placeholders::_1)); + })); } else { @@ -606,10 +608,10 @@ namespace proxy { } } - void HTTPReqHandler::HandleUpstreamProxyResolved(const boost::system::error_code & ec, boost::asio::ip::tcp::resolver::iterator it, ProxyResolvedHandler handler) + void HTTPReqHandler::HandleUpstreamProxyResolved(const boost::system::error_code & ec, boost::asio::ip::tcp::resolver::results_type endpoints, ProxyResolvedHandler handler) { if(ec) GenericProxyError(tr("Cannot resolve upstream proxy"), ec.message()); - else handler(*it); + else handler(*endpoints.begin ()); } void HTTPReqHandler::HandleUpstreamSocksProxyConnect(const boost::system::error_code & ec) diff --git a/libi2pd_client/I2CP.cpp b/libi2pd_client/I2CP.cpp index fc6d1b40..1949746a 100644 --- a/libi2pd_client/I2CP.cpp +++ b/libi2pd_client/I2CP.cpp @@ -24,7 +24,7 @@ namespace i2p namespace client { - I2CPDestination::I2CPDestination (boost::asio::io_service& service, std::shared_ptr owner, + I2CPDestination::I2CPDestination (boost::asio::io_context& service, std::shared_ptr owner, std::shared_ptr identity, bool isPublic, bool isSameThread, const std::map& params): LeaseSetDestination (service, isPublic, ¶ms), @@ -88,7 +88,7 @@ namespace client void I2CPDestination::CreateNewLeaseSet (const std::vector >& tunnels) { - GetService ().post (std::bind (&I2CPDestination::PostCreateNewLeaseSet, this, tunnels)); + boost::asio::post (GetService (), std::bind (&I2CPDestination::PostCreateNewLeaseSet, this, tunnels)); } void I2CPDestination::PostCreateNewLeaseSet (std::vector > tunnels) @@ -170,7 +170,7 @@ namespace client { // send in destination's thread auto s = GetSharedFromThis (); - GetService ().post ( + boost::asio::post (GetService (), [s, msg, remote, nonce]() { bool sent = s->SendMsg (msg, remote); @@ -1079,7 +1079,7 @@ namespace client I2CPServer::I2CPServer (const std::string& interface, uint16_t port, bool isSingleThread): RunnableService ("I2CP"), m_IsSingleThread (isSingleThread), m_Acceptor (GetIOService (), - boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(interface), port)) + boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(interface), port)) { memset (m_MessagesHandlers, 0, sizeof (m_MessagesHandlers)); m_MessagesHandlers[I2CP_GET_DATE_MESSAGE] = &I2CPSession::GetDateMessageHandler; diff --git a/libi2pd_client/I2CP.h b/libi2pd_client/I2CP.h index 620ff52e..e929c1e1 100644 --- a/libi2pd_client/I2CP.h +++ b/libi2pd_client/I2CP.h @@ -81,7 +81,7 @@ namespace client { public: - I2CPDestination (boost::asio::io_service& service, std::shared_ptr owner, + I2CPDestination (boost::asio::io_context& service, std::shared_ptr owner, std::shared_ptr identity, bool isPublic, bool isSameThread, const std::map& params); ~I2CPDestination () {}; @@ -227,7 +227,7 @@ namespace client void Start (); void Stop (); - boost::asio::io_service& GetService () { return GetIOService (); }; + auto& GetService () { return GetIOService (); }; bool IsSingleThread () const { return m_IsSingleThread; }; bool InsertSession (std::shared_ptr session); diff --git a/libi2pd_client/I2PService.h b/libi2pd_client/I2PService.h index d35c954d..d3ed36fa 100644 --- a/libi2pd_client/I2PService.h +++ b/libi2pd_client/I2PService.h @@ -61,7 +61,7 @@ namespace client } void CreateStream (StreamRequestComplete streamRequestComplete, const std::string& dest, uint16_t port = 0); void CreateStream(StreamRequestComplete complete, std::shared_ptr address, uint16_t port); - inline boost::asio::io_service& GetService () { return m_LocalDestination->GetService (); } + auto& GetService () { return m_LocalDestination->GetService (); } virtual void Start () = 0; virtual void Stop () = 0; @@ -283,7 +283,7 @@ namespace client public: TCPIPAcceptor (const std::string& address, uint16_t port, std::shared_ptr localDestination = nullptr) : - ServiceAcceptor (boost::asio::ip::tcp::endpoint (boost::asio::ip::address::from_string(address), port), localDestination) {} + ServiceAcceptor (boost::asio::ip::tcp::endpoint (boost::asio::ip::make_address(address), port), localDestination) {} }; } } diff --git a/libi2pd_client/I2PTunnel.cpp b/libi2pd_client/I2PTunnel.cpp index 5ba83e1f..3a3c491e 100644 --- a/libi2pd_client/I2PTunnel.cpp +++ b/libi2pd_client/I2PTunnel.cpp @@ -375,10 +375,10 @@ namespace client } I2PServerTunnelConnectionHTTP::I2PServerTunnelConnectionHTTP (I2PService * owner, std::shared_ptr stream, - const boost::asio::ip::tcp::endpoint& target, const std::string& host, + const boost::asio::ip::tcp::endpoint& target, const std::string& host, const std::string& XI2P, std::shared_ptr sslCtx): - I2PTunnelConnection (owner, stream, target, true, sslCtx), m_Host (host), - m_HeaderSent (false), m_ResponseHeaderSent (false), m_From (stream->GetRemoteIdentity ()) + I2PTunnelConnection (owner, stream, target, true, sslCtx), m_Host (host), m_XI2P (XI2P), + m_HeaderSent (false), m_ResponseHeaderSent (false) { if (sslCtx) SSL_set_tlsext_host_name(GetSSL ()->native_handle(), host.c_str ()); @@ -448,17 +448,12 @@ namespace client if (!connection) m_OutHeader << "Connection: close\r\n"; // add X-I2P fields - if (m_From) - { - m_OutHeader << X_I2P_DEST_B32 << ": " << context.GetAddressBook ().ToAddress(m_From->GetIdentHash ()) << "\r\n"; - m_OutHeader << X_I2P_DEST_HASH << ": " << m_From->GetIdentHash ().ToBase64 () << "\r\n"; - m_OutHeader << X_I2P_DEST_B64 << ": " << m_From->ToBase64 () << "\r\n"; - } - - m_OutHeader << "\r\n"; // end of header + m_OutHeader << m_XI2P; + // end of header + m_OutHeader << "\r\n"; + m_OutHeader << m_InHeader.str ().substr (m_InHeader.tellg ()); // data right after header m_InHeader.str (""); - m_From = nullptr; m_HeaderSent = true; I2PTunnelConnection::Write ((uint8_t *)m_OutHeader.str ().c_str (), m_OutHeader.str ().length ()); } @@ -717,7 +712,7 @@ namespace client { m_Endpoint.port (m_Port); boost::system::error_code ec; - auto addr = boost::asio::ip::address::from_string (m_Address, ec); + auto addr = boost::asio::ip::make_address (m_Address, ec); if (!ec) { m_Endpoint.address (addr); @@ -726,7 +721,7 @@ namespace client else { auto resolver = std::make_shared(GetService ()); - resolver->async_resolve (boost::asio::ip::tcp::resolver::query (m_Address, ""), + resolver->async_resolve (m_Address, "", std::bind (&I2PServerTunnel::HandleResolve, this, std::placeholders::_1, std::placeholders::_2, resolver)); } @@ -743,7 +738,7 @@ namespace client ClearHandlers (); } - void I2PServerTunnel::HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, + void I2PServerTunnel::HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::results_type endpoints, std::shared_ptr resolver) { if (!ecode) @@ -752,10 +747,9 @@ namespace client boost::asio::ip::tcp::endpoint ep; if (m_LocalAddress) { - boost::asio::ip::tcp::resolver::iterator end; - while (it != end) + for (const auto& it: endpoints) { - ep = *it; + ep = it; if (!ep.address ().is_unspecified ()) { if (ep.address ().is_v4 ()) @@ -774,13 +768,12 @@ namespace client } } if (found) break; - it++; } } else { found = true; - ep = *it; // first available + ep = *endpoints.begin (); // first available } if (!found) { @@ -789,7 +782,7 @@ namespace client } auto addr = ep.address (); - LogPrint (eLogInfo, "I2PTunnel: Server tunnel ", (*it).host_name (), " has been resolved to ", addr); + LogPrint (eLogInfo, "I2PTunnel: Server tunnel ", (*endpoints.begin ()).host_name (), " has been resolved to ", addr); m_Endpoint.address (addr); Accept (); } @@ -806,7 +799,7 @@ namespace client void I2PServerTunnel::SetLocalAddress (const std::string& localAddress) { boost::system::error_code ec; - auto addr = boost::asio::ip::address::from_string(localAddress, ec); + auto addr = boost::asio::ip::make_address(localAddress, ec); if (!ec) m_LocalAddress.reset (new boost::asio::ip::address (addr)); else @@ -878,7 +871,17 @@ namespace client std::shared_ptr I2PServerTunnelHTTP::CreateI2PConnection (std::shared_ptr stream) { - return std::make_shared (this, stream, GetEndpoint (), m_Host, GetSSLCtx ()); + if (m_XI2P.empty () || stream->GetRemoteIdentity () != m_From.lock ()) + { + auto from = stream->GetRemoteIdentity (); + m_From = from; + std::stringstream ss; + ss << X_I2P_DEST_B32 << ": " << context.GetAddressBook ().ToAddress(from->GetIdentHash ()) << "\r\n"; + ss << X_I2P_DEST_HASH << ": " << from->GetIdentHash ().ToBase64 () << "\r\n"; + ss << X_I2P_DEST_B64 << ": " << from->ToBase64 () << "\r\n"; + m_XI2P = ss.str (); + } + return std::make_shared (this, stream, GetEndpoint (), m_Host, m_XI2P, GetSSLCtx ()); } I2PServerTunnelIRC::I2PServerTunnelIRC (const std::string& name, const std::string& address, diff --git a/libi2pd_client/I2PTunnel.h b/libi2pd_client/I2PTunnel.h index b94eb9e4..aa53f0b6 100644 --- a/libi2pd_client/I2PTunnel.h +++ b/libi2pd_client/I2PTunnel.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2023, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -31,9 +31,9 @@ namespace client const int I2P_TUNNEL_CONNECTION_MAX_IDLE = 3600; // in seconds const int I2P_TUNNEL_DESTINATION_REQUEST_TIMEOUT = 10; // in seconds // for HTTP tunnels - const char X_I2P_DEST_HASH[] = "X-I2P-DestHash"; // hash in base64 - const char X_I2P_DEST_B64[] = "X-I2P-DestB64"; // full address in base64 - const char X_I2P_DEST_B32[] = "X-I2P-DestB32"; // .b32.i2p address + constexpr char X_I2P_DEST_HASH[] = "X-I2P-DestHash"; // hash in base64 + constexpr char X_I2P_DEST_B64[] = "X-I2P-DestB64"; // full address in base64 + constexpr char X_I2P_DEST_B32[] = "X-I2P-DestB32"; // .b32.i2p address const int I2P_TUNNEL_HTTP_MAX_HEADER_SIZE = 8192; class I2PTunnelConnection: public I2PServiceHandler, public std::enable_shared_from_this @@ -107,7 +107,7 @@ namespace client public: I2PServerTunnelConnectionHTTP (I2PService * owner, std::shared_ptr stream, - const boost::asio::ip::tcp::endpoint& target, const std::string& host, + const boost::asio::ip::tcp::endpoint& target, const std::string& host, const std::string& XI2P, std::shared_ptr sslCtx = nullptr); protected: @@ -117,10 +117,9 @@ namespace client private: - std::string m_Host; + std::string m_Host, m_XI2P; std::stringstream m_InHeader, m_OutHeader; bool m_HeaderSent, m_ResponseHeaderSent; - std::shared_ptr m_From; }; class I2PTunnelConnectionIRC: public I2PTunnelConnection @@ -208,7 +207,7 @@ namespace client private: - void HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, + void HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::results_type endpoints, std::shared_ptr resolver); void Accept (); @@ -242,7 +241,8 @@ namespace client private: - std::string m_Host; + std::string m_Host, m_XI2P; + std::weak_ptr m_From; }; class I2PServerTunnelIRC: public I2PServerTunnel diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index e4cbae8a..c124be9b 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -371,7 +371,7 @@ namespace client // udp forward selected boost::system::error_code e; // TODO: support hostnames in udp forward - auto addr = boost::asio::ip::address::from_string(params[SAM_PARAM_HOST], e); + auto addr = boost::asio::ip::make_address(params[SAM_PARAM_HOST], e); if (e) { // not an ip address @@ -624,7 +624,7 @@ namespace client auto socket = session->acceptQueue.front ().first; session->acceptQueue.pop_front (); if (socket) - m_Owner.GetService ().post (std::bind(&SAMSocket::TerminateClose, socket)); + boost::asio::post (m_Owner.GetService (), std::bind(&SAMSocket::TerminateClose, socket)); } if (session->acceptQueue.size () < SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE) { @@ -1046,13 +1046,13 @@ namespace client else { auto s = shared_from_this (); - m_Owner.GetService ().post ([s] { s->Terminate ("stream read error"); }); + boost::asio::post (m_Owner.GetService (), [s] { s->Terminate ("stream read error"); }); } } else { auto s = shared_from_this (); - m_Owner.GetService ().post ([s] { s->Terminate ("stream read error (op aborted)"); }); + boost::asio::post (m_Owner.GetService (), [s] { s->Terminate ("stream read error (op aborted)"); }); } } else @@ -1102,7 +1102,7 @@ namespace client auto socket = session->acceptQueue.front ().first; session->acceptQueue.pop_front (); if (socket) - m_Owner.GetService ().post (std::bind(&SAMSocket::TerminateClose, socket)); + boost::asio::post (m_Owner.GetService (), std::bind(&SAMSocket::TerminateClose, socket)); } if (!session->acceptQueue.empty ()) { @@ -1236,7 +1236,7 @@ namespace client void SAMSocket::HandleStreamSend(const boost::system::error_code & ec) { - m_Owner.GetService ().post (std::bind( !ec ? &SAMSocket::Receive : &SAMSocket::TerminateClose, shared_from_this())); + boost::asio::post (m_Owner.GetService (), std::bind( !ec ? &SAMSocket::Receive : &SAMSocket::TerminateClose, shared_from_this())); } SAMSession::SAMSession (SAMBridge & parent, const std::string & id, SAMSessionType type): @@ -1310,8 +1310,8 @@ namespace client SAMBridge::SAMBridge (const std::string& address, uint16_t portTCP, uint16_t portUDP, bool singleThread): RunnableService ("SAM"), m_IsSingleThread (singleThread), - m_Acceptor (GetIOService (), boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(address), portTCP)), - m_DatagramEndpoint (boost::asio::ip::address::from_string(address), (!portUDP) ? portTCP-1 : portUDP), m_DatagramSocket (GetIOService (), m_DatagramEndpoint), + m_Acceptor (GetIOService (), boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(address), portTCP)), + m_DatagramEndpoint (boost::asio::ip::make_address(address), (!portUDP) ? portTCP-1 : portUDP), m_DatagramSocket (GetIOService (), m_DatagramEndpoint), m_SignatureTypes { {"DSA_SHA1", i2p::data::SIGNING_KEY_TYPE_DSA_SHA1}, diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index 3ed8f00c..10ef4957 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -246,7 +246,7 @@ namespace client void Start (); void Stop (); - boost::asio::io_service& GetService () { return GetIOService (); }; + auto& GetService () { return GetIOService (); }; std::shared_ptr CreateSession (const std::string& id, SAMSessionType type, const std::string& destination, // empty string means transient const std::map * params); bool AddSession (std::shared_ptr session); diff --git a/libi2pd_client/SOCKS.cpp b/libi2pd_client/SOCKS.cpp index 0961690b..27df33c8 100644 --- a/libi2pd_client/SOCKS.cpp +++ b/libi2pd_client/SOCKS.cpp @@ -126,9 +126,8 @@ namespace proxy void HandleSockRecv(const boost::system::error_code & ecode, std::size_t bytes_transfered); void Terminate(); void AsyncSockRead(); - boost::asio::const_buffers_1 GenerateSOCKS5SelectAuth(authMethods method); - boost::asio::const_buffers_1 GenerateSOCKS4Response(errTypes error, uint32_t ip, uint16_t port); - boost::asio::const_buffers_1 GenerateSOCKS5Response(errTypes error, addrTypes type, const address &addr, uint16_t port); + boost::asio::const_buffer GenerateSOCKS4Response(errTypes error, uint32_t ip, uint16_t port); + boost::asio::const_buffer GenerateSOCKS5Response(errTypes error, addrTypes type, const address &addr, uint16_t port); bool Socks5ChooseAuth(); void Socks5UserPasswdResponse (); void SocksRequestFailed(errTypes error); @@ -145,9 +144,9 @@ namespace proxy template void SendUpstreamRequest(std::shared_ptr& upstreamSock); void HandleUpstreamConnected(const boost::system::error_code & ecode, - boost::asio::ip::tcp::resolver::iterator itr); + const boost::asio::ip::tcp::endpoint& ep); void HandleUpstreamResolved(const boost::system::error_code & ecode, - boost::asio::ip::tcp::resolver::iterator itr); + boost::asio::ip::tcp::resolver::results_type endpoints); boost::asio::ip::tcp::resolver m_proxy_resolver; uint8_t m_sock_buff[socks_buffer_size]; @@ -233,17 +232,17 @@ namespace proxy Done(shared_from_this()); } - boost::asio::const_buffers_1 SOCKSHandler::GenerateSOCKS4Response(SOCKSHandler::errTypes error, uint32_t ip, uint16_t port) + boost::asio::const_buffer SOCKSHandler::GenerateSOCKS4Response(SOCKSHandler::errTypes error, uint32_t ip, uint16_t port) { assert(error >= SOCKS4_OK); m_response[0] = '\x00'; // version m_response[1] = error; // response code htobe16buf(m_response + 2, port); // port htobe32buf(m_response + 4, ip); // IP - return boost::asio::const_buffers_1(m_response,8); + return boost::asio::const_buffer (m_response,8); } - boost::asio::const_buffers_1 SOCKSHandler::GenerateSOCKS5Response(SOCKSHandler::errTypes error, SOCKSHandler::addrTypes type, const SOCKSHandler::address &addr, uint16_t port) + boost::asio::const_buffer SOCKSHandler::GenerateSOCKS5Response(SOCKSHandler::errTypes error, SOCKSHandler::addrTypes type, const SOCKSHandler::address &addr, uint16_t port) { size_t size = 6; // header + port assert(error <= SOCKS5_ADDR_UNSUP); @@ -280,14 +279,14 @@ namespace proxy } break; } - return boost::asio::const_buffers_1(m_response, size); + return boost::asio::const_buffer (m_response, size); } bool SOCKSHandler::Socks5ChooseAuth() { m_response[0] = '\x05'; // Version m_response[1] = m_authchosen; // Response code - boost::asio::const_buffers_1 response(m_response, 2); + boost::asio::const_buffer response(m_response, 2); if (m_authchosen == AUTH_UNACCEPTABLE) { LogPrint(eLogWarning, "SOCKS: v5 authentication negotiation failed"); @@ -307,14 +306,14 @@ namespace proxy m_response[0] = 1; // Version of the subnegotiation m_response[1] = 0; // Response code LogPrint(eLogDebug, "SOCKS: v5 user/password response"); - boost::asio::async_write(*m_sock, boost::asio::const_buffers_1(m_response, 2), + boost::asio::async_write(*m_sock, boost::asio::const_buffer(m_response, 2), std::bind(&SOCKSHandler::SentSocksResponse, shared_from_this(), std::placeholders::_1)); } /* All hope is lost beyond this point */ void SOCKSHandler::SocksRequestFailed(SOCKSHandler::errTypes error) { - boost::asio::const_buffers_1 response(nullptr,0); + boost::asio::const_buffer response(nullptr,0); assert(error != SOCKS4_OK && error != SOCKS5_OK); switch (m_socksv) { @@ -334,7 +333,7 @@ namespace proxy void SOCKSHandler::SocksRequestSuccess() { - boost::asio::const_buffers_1 response(nullptr,0); + boost::asio::const_buffer response(nullptr,0); // TODO: this should depend on things like the command type and callbacks may change switch (m_socksv) { @@ -691,9 +690,8 @@ namespace proxy if (m_UpstreamProxyPort) // TCP { EnterState(UPSTREAM_RESOLVE); - boost::asio::ip::tcp::resolver::query q(m_UpstreamProxyAddress, std::to_string(m_UpstreamProxyPort)); - m_proxy_resolver.async_resolve(q, std::bind(&SOCKSHandler::HandleUpstreamResolved, shared_from_this(), - std::placeholders::_1, std::placeholders::_2)); + m_proxy_resolver.async_resolve(m_UpstreamProxyAddress, std::to_string(m_UpstreamProxyPort), + std::bind(&SOCKSHandler::HandleUpstreamResolved, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } else if (!m_UpstreamProxyAddress.empty ())// local { @@ -729,7 +727,7 @@ namespace proxy void SOCKSHandler::SocksUpstreamSuccess(std::shared_ptr& upstreamSock) { LogPrint(eLogInfo, "SOCKS: Upstream success"); - boost::asio::const_buffers_1 response(nullptr, 0); + boost::asio::const_buffer response(nullptr, 0); switch (m_socksv) { case SOCKS4: @@ -775,7 +773,8 @@ namespace proxy LogPrint(eLogError, "SOCKS: No upstream socket to send handshake to"); } - void SOCKSHandler::HandleUpstreamConnected(const boost::system::error_code & ecode, boost::asio::ip::tcp::resolver::iterator itr) + void SOCKSHandler::HandleUpstreamConnected(const boost::system::error_code & ecode, + const boost::asio::ip::tcp::endpoint& ep) { if (ecode) { LogPrint(eLogWarning, "SOCKS: Could not connect to upstream proxy: ", ecode.message()); @@ -786,7 +785,8 @@ namespace proxy SendUpstreamRequest(m_upstreamSock); } - void SOCKSHandler::HandleUpstreamResolved(const boost::system::error_code & ecode, boost::asio::ip::tcp::resolver::iterator itr) + void SOCKSHandler::HandleUpstreamResolved(const boost::system::error_code & ecode, + boost::asio::ip::tcp::resolver::results_type endpoints) { if (ecode) { // error resolving @@ -798,7 +798,7 @@ namespace proxy EnterState(UPSTREAM_CONNECT); auto & service = GetOwner()->GetService(); m_upstreamSock = std::make_shared(service); - boost::asio::async_connect(*m_upstreamSock, itr, + boost::asio::async_connect(*m_upstreamSock, endpoints, std::bind(&SOCKSHandler::HandleUpstreamConnected, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); }