From 87ae9c4b742c2112c95343d132a87b8e775c96f8 Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 25 Oct 2024 18:40:51 -0400 Subject: [PATCH 01/20] call main thread as i2pd-daemon --- daemon/UnixDaemon.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemon/UnixDaemon.cpp b/daemon/UnixDaemon.cpp index 9414962d..66661e0f 100644 --- a/daemon/UnixDaemon.cpp +++ b/daemon/UnixDaemon.cpp @@ -221,7 +221,7 @@ namespace i2p void DaemonLinux::run () { - i2p::util::SetThreadName ("Daemon"); + i2p::util::SetThreadName ("i2pd-daemon"); while (running) { std::this_thread::sleep_for (std::chrono::seconds(1)); From f611136ea795868e43a357e60a239c75c6ec8210 Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 26 Oct 2024 15:30:48 -0400 Subject: [PATCH 02/20] resend relay reponnse if remote router >= 0.9.64 --- libi2pd/SSU2Session.cpp | 30 ++++++++++++++++++------------ libi2pd/SSU2Session.h | 3 +++ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 20f21ce5..8a1a44df 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -83,7 +83,7 @@ namespace transport std::shared_ptr addr, bool noise): TransportSession (in_RemoteRouter, SSU2_CONNECT_TIMEOUT), m_Server (server), m_Address (addr), m_RemoteTransports (0), m_RemotePeerTestTransports (0), - m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown), + m_RemoteVersion (0), m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown), m_SendPacketNum (0), m_ReceivePacketNum (0), m_LastDatetimeSentPacketNum (0), m_IsDataReceived (false), m_RTT (SSU2_UNKNOWN_RTT), m_MsgLocalExpirationTimeout (I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MAX), @@ -103,6 +103,7 @@ namespace transport InitNoiseXKState1 (*m_NoiseState, m_Address->s); m_RemoteEndpoint = boost::asio::ip::udp::endpoint (m_Address->host, m_Address->port); m_RemoteTransports = in_RemoteRouter->GetCompatibleTransports (false); + m_RemoteVersion = in_RemoteRouter->GetVersion (); if (in_RemoteRouter->IsSSU2PeerTesting (true)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V4; if (in_RemoteRouter->IsSSU2PeerTesting (false)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V6; RAND_bytes ((uint8_t *)&m_DestConnID, 8); @@ -1185,7 +1186,8 @@ namespace transport m_RemotePeerTestTransports = 0; if (ri->IsSSU2PeerTesting (true)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V4; if (ri->IsSSU2PeerTesting (false)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V6; - + m_RemoteVersion = ri->GetVersion (); + // handle other blocks HandlePayload (decryptedPayload.data () + riSize + 3, decryptedPayload.size () - riSize - 3); Established (); @@ -2037,11 +2039,13 @@ namespace transport holePunchSession->SendHolePunch (packet->payload, packet->payloadSize); // relay response block } packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); - /*uint32_t packetNum = */SendData (packet->payload, packet->payloadSize); - // sometimes Bob doesn't ack this RelayResponse - // TODO: uncomment line below once the problem is resolved - //packet->sendTime = mts; - //m_SentPackets.emplace (packetNum, packet); + uint32_t packetNum = SendData (packet->payload, packet->payloadSize); + if (m_RemoteVersion >= SSU2_MIN_RELAY_RESPONSE_RESEND_VERSION) + { + // sometimes Bob doesn't ack this RelayResponse in older versions + packet->sendTime = i2p::util::GetMillisecondsSinceEpoch (); + m_SentPackets.emplace (packetNum, packet); + } } void SSU2Session::HandleRelayResponse (const uint8_t * buf, size_t len) @@ -2076,11 +2080,13 @@ namespace transport memcpy (payload + 3, buf, len); // forward to Alice as is packet->payloadSize = len + 3; packet->payloadSize += CreatePaddingBlock (payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); - /*uint32_t packetNum = */it->second.first->SendData (packet->payload, packet->payloadSize); - // sometimes Alice doesn't ack this RelayResponse - // TODO: uncomment line below once the problem is resolved - //packet->sendTime = i2p::util::GetMillisecondsSinceEpoch (); - //it->second.first->m_SentPackets.emplace (packetNum, packet); + uint32_t packetNum = it->second.first->SendData (packet->payload, packet->payloadSize); + if (m_RemoteVersion >= SSU2_MIN_RELAY_RESPONSE_RESEND_VERSION) + { + // sometimes Alice doesn't ack this RelayResponse in older versions + packet->sendTime = i2p::util::GetMillisecondsSinceEpoch (); + it->second.first->m_SentPackets.emplace (packetNum, packet); + } } else { diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index e27671f8..d54731dc 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -15,6 +15,7 @@ #include #include #include +#include "version.h" #include "Crypto.h" #include "RouterInfo.h" #include "RouterContext.h" @@ -55,6 +56,7 @@ namespace transport const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send const uint8_t SSU2_MAX_NUM_FRAGMENTS = 64; const int SSU2_SEND_DATETIME_NUM_PACKETS = 256; + const int SSU2_MIN_RELAY_RESPONSE_RESEND_VERSION = MAKE_VERSION_NUMBER(0, 9, 64); // 0.9.64 // flags const uint8_t SSU2_FLAG_IMMEDIATE_ACK_REQUESTED = 0x01; @@ -368,6 +370,7 @@ namespace transport std::shared_ptr m_Address; boost::asio::ip::udp::endpoint m_RemoteEndpoint; i2p::data::RouterInfo::CompatibleTransports m_RemoteTransports, m_RemotePeerTestTransports; + int m_RemoteVersion; uint64_t m_DestConnID, m_SourceConnID; SSU2SessionState m_State; uint8_t m_KeyDataSend[64], m_KeyDataReceive[64]; From 743126b2ad11f774363fd41e280cbeb2806aa675 Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 26 Oct 2024 19:05:08 -0400 Subject: [PATCH 03/20] better hole punch expiration intervals --- libi2pd/SSU2.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 0433033d..6f51f955 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -42,8 +42,8 @@ namespace transport const int SSU2_KEEP_ALIVE_INTERVAL = 15; // in seconds const int SSU2_KEEP_ALIVE_INTERVAL_VARIANCE = 4; // in seconds const int SSU2_PROXY_CONNECT_RETRY_TIMEOUT = 30; // in seconds - const int SSU2_MIN_HOLE_PUNCH_EXPIRATION = 45; // in seconds - const int SSU2_MAX_HOLE_PUNCH_EXPIRATION = 181; // in seconds + const int SSU2_MIN_HOLE_PUNCH_EXPIRATION = 30; // in seconds + const int SSU2_MAX_HOLE_PUNCH_EXPIRATION = 160; // in seconds const size_t SSU2_MAX_NUM_PACKETS_PER_BATCH = 64; class SSU2Server: private i2p::util::RunnableServiceWithWork From 7461b640e32fdd8cabe42557051f59fae166c3b5 Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 26 Oct 2024 19:26:25 -0400 Subject: [PATCH 04/20] reduce CPU usage --- libi2pd/Streaming.cpp | 16 +++++++++++++--- libi2pd/Streaming.h | 3 ++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index f22e724d..f9257974 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -1237,7 +1237,8 @@ namespace stream if (m_Status != eStreamStatusTerminated) { m_SendTimer.cancel (); - m_SendTimer.expires_from_now (boost::posix_time::microseconds(SEND_INTERVAL)); + m_SendTimer.expires_from_now (boost::posix_time::microseconds( + SEND_INTERVAL + m_LocalDestination.GetRandom () % SEND_INTERVAL_VARIANCE)); m_SendTimer.async_wait (std::bind (&Stream::HandleSendTimer, shared_from_this (), std::placeholders::_1)); } @@ -1250,8 +1251,17 @@ namespace stream auto ts = i2p::util::GetMillisecondsSinceEpoch (); if (m_LastSendTime && ts*1000 > m_LastSendTime*1000 + m_PacingTime) { - m_NumPacketsToSend = ((ts*1000 - m_LastSendTime*1000) + m_PacingTimeRem) / m_PacingTime; - m_PacingTimeRem = ((ts*1000 - m_LastSendTime*1000) + m_PacingTimeRem) - (m_NumPacketsToSend * m_PacingTime); + if (m_PacingTime) + { + auto numPackets = std::lldiv (m_PacingTimeRem + ts*1000 - m_LastSendTime*1000, m_PacingTime); + m_NumPacketsToSend = numPackets.quot; + m_PacingTimeRem = numPackets.rem; + } + else + { + LogPrint (eLogError, "Streaming: pacing time is zero"); + m_NumPacketsToSend = 1; m_PacingTimeRem = 0; + } m_IsSendTime = true; if (m_WindowIncCounter && m_WindowSize < MAX_WINDOW_SIZE && !m_SendBuffer.IsEmpty () && m_PacingTime > m_MinPacingTime) { diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 8183bdad..a686d71b 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -69,7 +69,8 @@ namespace stream const int PENDING_INCOMING_TIMEOUT = 10; // in seconds const int MAX_RECEIVE_TIMEOUT = 20; // in seconds const uint16_t DELAY_CHOKING = 60000; // in milliseconds - const uint64_t SEND_INTERVAL = 1000; // in microseconds + const uint64_t SEND_INTERVAL = 10000; // in microseconds + const uint64_t SEND_INTERVAL_VARIANCE = 2000; // in microseconds const uint64_t REQUEST_IMMEDIATE_ACK_INTERVAL = 7500; // in milliseconds const uint64_t REQUEST_IMMEDIATE_ACK_INTERVAL_VARIANCE = 3200; // in milliseconds const bool LOSS_BASED_CONTROL_ENABLED = 1; // 0/1 From 608056dcd2dcf999a4541ca66bdb78caade4b9b1 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 27 Oct 2024 11:55:10 -0400 Subject: [PATCH 05/20] don't handle RelayRequest and RelayIntro with same nonce twice --- libi2pd/SSU2.cpp | 4 +++- libi2pd/SSU2.h | 2 +- libi2pd/SSU2Session.cpp | 50 ++++++++++++++++++++++++----------------- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 01a3bb6c..c8f2909b 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -477,7 +477,7 @@ namespace transport HandleReceivedPackets (std::move (receivedPackets)); } - void SSU2Server::AddSession (std::shared_ptr session) + bool SSU2Server::AddSession (std::shared_ptr session) { if (session) { @@ -485,8 +485,10 @@ namespace transport { if (session->GetState () != eSSU2SessionStatePeerTest) AddSessionByRouterHash (session); + return true; } } + return false; } void SSU2Server::RemoveSession (uint64_t connID) diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 6f51f955..319a4780 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -85,7 +85,7 @@ namespace transport bool IsSyncClockFromPeers () const { return m_IsSyncClockFromPeers; }; void AdjustTimeOffset (int64_t offset, std::shared_ptr from); - void AddSession (std::shared_ptr session); + bool AddSession (std::shared_ptr session); void RemoveSession (uint64_t connID); void RequestRemoveSession (uint64_t connID); void AddSessionByRouterHash (std::shared_ptr session); diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 8a1a44df..15cabac9 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -1931,25 +1931,28 @@ namespace transport return; } auto mts = i2p::util::GetMillisecondsSinceEpoch (); - session->m_RelaySessions.emplace (bufbe32toh (buf + 1), // nonce - std::make_pair (shared_from_this (), mts/1000) ); + uint32_t nonce = bufbe32toh (buf + 1); + if (session->m_RelaySessions.emplace (nonce, std::make_pair (shared_from_this (), mts/1000)).second) + { + // send relay intro to Charlie + auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ()); // Alice's RI + if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr; + if (!r) LogPrint (eLogWarning, "SSU2: RelayRequest Alice's router info not found"); - // send relay intro to Charlie - auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ()); // Alice's RI - if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr; - if (!r) LogPrint (eLogWarning, "SSU2: RelayRequest Alice's router info not found"); - - auto packet = m_Server.GetSentPacketsPool ().AcquireShared (); - packet->payloadSize = r ? CreateRouterInfoBlock (packet->payload, m_MaxPayloadSize - len - 32, r) : 0; - if (!packet->payloadSize && r) - session->SendFragmentedMessage (CreateDatabaseStoreMsg (r)); - packet->payloadSize += CreateRelayIntroBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize, buf + 1, len -1); - if (packet->payloadSize < m_MaxPayloadSize) - packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); - uint32_t packetNum = session->SendData (packet->payload, packet->payloadSize); - packet->sendTime = mts; - // Charlie always responds with RelayResponse - session->m_SentPackets.emplace (packetNum, packet); + auto packet = m_Server.GetSentPacketsPool ().AcquireShared (); + packet->payloadSize = r ? CreateRouterInfoBlock (packet->payload, m_MaxPayloadSize - len - 32, r) : 0; + if (!packet->payloadSize && r) + session->SendFragmentedMessage (CreateDatabaseStoreMsg (r)); + packet->payloadSize += CreateRelayIntroBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize, buf + 1, len -1); + if (packet->payloadSize < m_MaxPayloadSize) + packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); + uint32_t packetNum = session->SendData (packet->payload, packet->payloadSize); + packet->sendTime = mts; + // Charlie always responds with RelayResponse + session->m_SentPackets.emplace (packetNum, packet); + } + else + LogPrint (eLogInfo, "SSU2: Relay request nonce ", nonce, " already exists. Ignore"); } void SSU2Session::HandleRelayIntro (const uint8_t * buf, size_t len, int attempts) @@ -2035,8 +2038,13 @@ namespace transport { // send HolePunch auto holePunchSession = std::make_shared(m_Server, nonce, ep, addr); - m_Server.AddSession (holePunchSession); - holePunchSession->SendHolePunch (packet->payload, packet->payloadSize); // relay response block + if (m_Server.AddSession (holePunchSession)) + holePunchSession->SendHolePunch (packet->payload, packet->payloadSize); // relay response block + else + { + LogPrint (eLogInfo, "SSU2: Relay intro nonce ", nonce, " already exists. Ignore"); + return; + } } packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); uint32_t packetNum = SendData (packet->payload, packet->payloadSize); @@ -3043,7 +3051,7 @@ namespace transport { if (ts > it->second.second + SSU2_RELAY_NONCE_EXPIRATION_TIMEOUT) { - LogPrint (eLogWarning, "SSU2: Relay nonce ", it->first, " was not responded in ", SSU2_RELAY_NONCE_EXPIRATION_TIMEOUT, " seconds, deleted"); + LogPrint (eLogInfo, "SSU2: Relay nonce ", it->first, " was not responded in ", SSU2_RELAY_NONCE_EXPIRATION_TIMEOUT, " seconds, deleted"); it = m_RelaySessions.erase (it); } else From 79e8ccbb5b2f4306208f8fe776dd09566963fbea Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 27 Oct 2024 12:24:22 -0400 Subject: [PATCH 06/20] don't handle PeerTest 1 with same nonce twice --- libi2pd/SSU2Session.cpp | 42 ++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 15cabac9..3489a6ba 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -2162,29 +2162,33 @@ namespace transport GetRemoteIdentity ()->GetIdentHash ()); if (session) // session with Charlie { - m_Server.AddPeerTest (nonce, shared_from_this (), ts/1000); - auto packet = m_Server.GetSentPacketsPool ().AcquireShared (); - // Alice's RouterInfo - auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ()); - if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr; - packet->payloadSize = r ? CreateRouterInfoBlock (packet->payload, m_MaxPayloadSize - len - 32, r) : 0; - if (!packet->payloadSize && r) - session->SendFragmentedMessage (CreateDatabaseStoreMsg (r)); - if (packet->payloadSize + len + 48 > m_MaxPayloadSize) - { - // doesn't fit one message, send RouterInfo in separate message + if (m_Server.AddPeerTest (nonce, shared_from_this (), ts/1000)) + { + auto packet = m_Server.GetSentPacketsPool ().AcquireShared (); + // Alice's RouterInfo + auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ()); + if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr; + packet->payloadSize = r ? CreateRouterInfoBlock (packet->payload, m_MaxPayloadSize - len - 32, r) : 0; + if (!packet->payloadSize && r) + session->SendFragmentedMessage (CreateDatabaseStoreMsg (r)); + if (packet->payloadSize + len + 48 > m_MaxPayloadSize) + { + // doesn't fit one message, send RouterInfo in separate message + uint32_t packetNum = session->SendData (packet->payload, packet->payloadSize, SSU2_FLAG_IMMEDIATE_ACK_REQUESTED); + packet->sendTime = ts; + session->m_SentPackets.emplace (packetNum, packet); + packet = m_Server.GetSentPacketsPool ().AcquireShared (); // new packet + } + // PeerTest to Charlie + packet->payloadSize += CreatePeerTestBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize, 2, + eSSU2PeerTestCodeAccept, GetRemoteIdentity ()->GetIdentHash (), buf + offset, len - offset); + packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); uint32_t packetNum = session->SendData (packet->payload, packet->payloadSize, SSU2_FLAG_IMMEDIATE_ACK_REQUESTED); packet->sendTime = ts; session->m_SentPackets.emplace (packetNum, packet); - packet = m_Server.GetSentPacketsPool ().AcquireShared (); // new packet } - // PeerTest to Charlie - packet->payloadSize += CreatePeerTestBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize, 2, - eSSU2PeerTestCodeAccept, GetRemoteIdentity ()->GetIdentHash (), buf + offset, len - offset); - packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); - uint32_t packetNum = session->SendData (packet->payload, packet->payloadSize, SSU2_FLAG_IMMEDIATE_ACK_REQUESTED); - packet->sendTime = ts; - session->m_SentPackets.emplace (packetNum, packet); + else + LogPrint (eLogInfo, "SSU2: Peer test 1 nonce ", nonce, " already exists. Ignored"); } else { From ec4fe9a1e680e677b94fab21c7febd8151478ab4 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 27 Oct 2024 18:17:28 -0400 Subject: [PATCH 07/20] set congesion cap G if symmetric NAT and ipv4 in only transport --- libi2pd/RouterContext.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libi2pd/RouterContext.cpp b/libi2pd/RouterContext.cpp index 1efb25db..f5f8b4fb 100644 --- a/libi2pd/RouterContext.cpp +++ b/libi2pd/RouterContext.cpp @@ -1489,7 +1489,7 @@ namespace i2p void RouterContext::UpdateCongestion () { auto c = i2p::data::RouterInfo::eLowCongestion; - if (!AcceptsTunnels () || !m_ShareRatio) + if (!AcceptsTunnels () || !m_ShareRatio || (m_Error == eRouterErrorSymmetricNAT && !SupportsV6 () && !SupportsMesh ())) c = i2p::data::RouterInfo::eRejectAll; else { From 4c66608caf0971902df0a528ea5738802d2fe828 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 27 Oct 2024 21:58:19 -0400 Subject: [PATCH 08/20] random tunnel reject when medium congestion --- libi2pd/I2NPProtocol.cpp | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/libi2pd/I2NPProtocol.cpp b/libi2pd/I2NPProtocol.cpp index 3b2c204e..198f798b 100644 --- a/libi2pd/I2NPProtocol.cpp +++ b/libi2pd/I2NPProtocol.cpp @@ -396,8 +396,26 @@ namespace i2p return false; } uint8_t retCode = 0; + // decide if we should accept tunnel + bool accept = i2p::context.AcceptsTunnels (); + if (accept) + { + auto congestionLevel = i2p::context.GetCongestionLevel (false); + if (congestionLevel >= CONGESTION_LEVEL_MEDIUM) + { + if (congestionLevel < CONGESTION_LEVEL_FULL) + { + // random reject depending on congestion level + int level = i2p::tunnel::tunnels.GetRng ()() % (CONGESTION_LEVEL_FULL - CONGESTION_LEVEL_MEDIUM) + CONGESTION_LEVEL_MEDIUM; + if (congestionLevel > level) + accept = false; + } + else + accept = false; + } + } // replace record to reply - if (i2p::context.AcceptsTunnels () && i2p::context.GetCongestionLevel (false) < CONGESTION_LEVEL_FULL) + if (accept) { auto transitTunnel = i2p::tunnel::CreateTransitTunnel ( bufbe32toh (clearText + ECIES_BUILD_REQUEST_RECORD_RECEIVE_TUNNEL_OFFSET), From 43939cedf4aa0ba2db17b96ba424f132daf9f079 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 27 Oct 2024 22:19:06 -0400 Subject: [PATCH 09/20] random tunnel reject when medium congestion --- libi2pd/Tunnel.cpp | 3 ++- libi2pd/Tunnel.h | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/libi2pd/Tunnel.cpp b/libi2pd/Tunnel.cpp index c41bb775..1e2a75d9 100644 --- a/libi2pd/Tunnel.cpp +++ b/libi2pd/Tunnel.cpp @@ -350,7 +350,8 @@ namespace tunnel Tunnels::Tunnels (): m_IsRunning (false), m_Thread (nullptr), m_MaxNumTransitTunnels (DEFAULT_MAX_NUM_TRANSIT_TUNNELS), m_TotalNumSuccesiveTunnelCreations (0), m_TotalNumFailedTunnelCreations (0), // for normal average - m_TunnelCreationSuccessRate (TCSR_START_VALUE), m_TunnelCreationAttemptsNum(0) + m_TunnelCreationSuccessRate (TCSR_START_VALUE), m_TunnelCreationAttemptsNum(0), + m_Rng(i2p::util::GetMonotonicMicroseconds ()%1000000LL) { } diff --git a/libi2pd/Tunnel.h b/libi2pd/Tunnel.h index 00a05386..02bfb374 100644 --- a/libi2pd/Tunnel.h +++ b/libi2pd/Tunnel.h @@ -18,6 +18,7 @@ #include #include #include +#include #include "util.h" #include "Queue.h" #include "Crypto.h" @@ -244,6 +245,8 @@ namespace tunnel uint32_t GetMaxNumTransitTunnels () const { return m_MaxNumTransitTunnels; }; int GetCongestionLevel() const { return m_MaxNumTransitTunnels ? CONGESTION_LEVEL_FULL * m_TransitTunnels.size() / m_MaxNumTransitTunnels : CONGESTION_LEVEL_FULL; } + std::mt19937& GetRng () { return m_Rng; }; + private: template @@ -307,6 +310,7 @@ namespace tunnel int m_TotalNumSuccesiveTunnelCreations, m_TotalNumFailedTunnelCreations; double m_TunnelCreationSuccessRate; int m_TunnelCreationAttemptsNum; + std::mt19937 m_Rng; public: From 8f9874570a01698fe739b759feed4e0a8d67148b Mon Sep 17 00:00:00 2001 From: mittwerk Date: Mon, 28 Oct 2024 09:11:09 +0200 Subject: [PATCH 10/20] hardening iterator --- libi2pd_client/UDPTunnel.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libi2pd_client/UDPTunnel.cpp b/libi2pd_client/UDPTunnel.cpp index cd17bbf0..9f7fe864 100644 --- a/libi2pd_client/UDPTunnel.cpp +++ b/libi2pd_client/UDPTunnel.cpp @@ -203,7 +203,7 @@ namespace client std::vector > sessions; std::lock_guard lock (m_SessionsMutex); - for (auto it: m_Sessions) + for (const auto &it: m_Sessions) { auto s = it.second; if (!s->m_Destination) continue; From 0e8d624d861bd08c705e8390f98609e3876d0acf Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 28 Oct 2024 08:38:04 -0400 Subject: [PATCH 11/20] move UpdatePacingTime out of loop --- libi2pd/Streaming.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index f9257974..966c172d 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -1276,10 +1276,12 @@ namespace stream else m_WindowSize += (m_WindowSize - (1 - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowSize; if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; - m_WindowIncCounter --; - UpdatePacingTime (); + m_WindowIncCounter--; } + else + break; } + UpdatePacingTime (); } if (m_IsNAcked) ResendPacket (); From ec67f48d855c196669938a2da71064da8a501d4b Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 28 Oct 2024 08:46:01 -0400 Subject: [PATCH 12/20] fixed possible memory leak --- libi2pd/NTCP2.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libi2pd/NTCP2.cpp b/libi2pd/NTCP2.cpp index 728ac01d..a05469f1 100644 --- a/libi2pd/NTCP2.cpp +++ b/libi2pd/NTCP2.cpp @@ -1822,7 +1822,7 @@ namespace transport LogPrint(eLogError, "NTCP2: HTTP proxy write error ", ec.message()); }); - boost::asio::streambuf * readbuff = new boost::asio::streambuf; + auto readbuff = std::make_shared(); boost::asio::async_read_until(conn->GetSocket(), *readbuff, "\r\n\r\n", [readbuff, timer, conn] (const boost::system::error_code & ec, std::size_t transferred) { @@ -1842,7 +1842,6 @@ namespace transport { timer->cancel(); conn->ClientLogin(); - delete readbuff; return; } else @@ -1852,7 +1851,6 @@ namespace transport LogPrint(eLogError, "NTCP2: HTTP proxy gave malformed response"); timer->cancel(); conn->Terminate(); - delete readbuff; } }); break; From 23e66671c2fcd1fc11abd11f07ec18f93a346aab Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 28 Oct 2024 20:36:50 -0400 Subject: [PATCH 13/20] intermediate queue for transport sessions. use std::list instead std::vector for multiple I2NP messages --- libi2pd/NTCP2.cpp | 40 ++++++++++++++++++++++++++++---------- libi2pd/NTCP2.h | 9 ++++++--- libi2pd/NetDb.cpp | 2 +- libi2pd/SSU2Session.cpp | 25 +++++++++++++++++++----- libi2pd/SSU2Session.h | 6 ++++-- libi2pd/TransportSession.h | 8 ++++++-- libi2pd/Transports.cpp | 36 ++++++++++++++++++++++++---------- libi2pd/Transports.h | 7 ++++--- libi2pd/TunnelGateway.cpp | 2 +- 9 files changed, 98 insertions(+), 37 deletions(-) diff --git a/libi2pd/NTCP2.cpp b/libi2pd/NTCP2.cpp index a05469f1..747dc0b0 100644 --- a/libi2pd/NTCP2.cpp +++ b/libi2pd/NTCP2.cpp @@ -375,6 +375,8 @@ namespace transport m_Socket.close (); transports.PeerDisconnected (shared_from_this ()); m_Server.RemoveNTCP2Session (shared_from_this ()); + if (!m_IntermediateQueue.empty ()) + m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue); for (auto& it: m_SendQueue) it->Drop (); m_SendQueue.clear (); @@ -1207,7 +1209,7 @@ namespace transport void NTCP2Session::MoveSendQueue (std::shared_ptr other) { if (!other || m_SendQueue.empty ()) return; - std::vector > msgs; + std::list > msgs; auto ts = i2p::util::GetMillisecondsSinceEpoch (); for (auto it: m_SendQueue) if (!it->IsExpired (ts)) @@ -1216,7 +1218,7 @@ namespace transport it->Drop (); m_SendQueue.clear (); if (!msgs.empty ()) - other->PostI2NPMessages (msgs); + other->SendI2NPMessages (msgs); } size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len) @@ -1297,20 +1299,38 @@ namespace transport m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go } - void NTCP2Session::SendI2NPMessages (const std::vector >& msgs) + void NTCP2Session::SendI2NPMessages (std::list >& msgs) { - m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this (), msgs)); + if (m_IsTerminated || msgs.empty ()) return; + bool empty = false; + { + std::lock_guard l(m_IntermediateQueueMutex); + empty = m_IntermediateQueue.empty (); + m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs); + } + if (empty) + m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this ())); } - void NTCP2Session::PostI2NPMessages (std::vector > msgs) + void NTCP2Session::PostI2NPMessages () { if (m_IsTerminated) return; + std::list > msgs; + { + std::lock_guard l(m_IntermediateQueueMutex); + m_IntermediateQueue.swap (msgs); + } bool isSemiFull = m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE/2; - for (auto it: msgs) - if (isSemiFull && it->onDrop) - it->Drop (); // drop earlier because we can handle it - else - m_SendQueue.push_back (std::move (it)); + if (isSemiFull) + { + for (auto it: msgs) + if (it->onDrop) + it->Drop (); // drop earlier because we can handle it + else + m_SendQueue.push_back (std::move (it)); + } + else + m_SendQueue.splice (m_SendQueue.end (), msgs); if (!m_IsSending && m_IsEstablished) SendQueue (); diff --git a/libi2pd/NTCP2.h b/libi2pd/NTCP2.h index f7912b54..27acb529 100644 --- a/libi2pd/NTCP2.h +++ b/libi2pd/NTCP2.h @@ -153,7 +153,7 @@ namespace transport void ServerLogin (); // Bob void SendLocalRouterInfo (bool update) override; // after handshake or by update - void SendI2NPMessages (const std::vector >& msgs) override; + void SendI2NPMessages (std::list >& msgs) override; void MoveSendQueue (std::shared_ptr other); private: @@ -196,7 +196,7 @@ namespace transport void SendRouterInfo (); void SendTermination (NTCP2TerminationReason reason); void SendTerminationAndTerminate (NTCP2TerminationReason reason); - void PostI2NPMessages (std::vector > msgs); + void PostI2NPMessages (); private: @@ -229,7 +229,10 @@ namespace transport bool m_IsSending, m_IsReceiving; std::list > m_SendQueue; uint64_t m_NextRouterInfoResendTime; // seconds since epoch - + + std::list > m_IntermediateQueue; // from transports + mutable std::mutex m_IntermediateQueueMutex; + uint16_t m_PaddingSizes[16]; int m_NextPaddingSize; }; diff --git a/libi2pd/NetDb.cpp b/libi2pd/NetDb.cpp index 341d617e..c96bcf95 100644 --- a/libi2pd/NetDb.cpp +++ b/libi2pd/NetDb.cpp @@ -480,7 +480,7 @@ namespace data void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills) { LogPrint(eLogInfo, "NetDB: Reseeding from floodfill ", ri.GetIdentHashBase64()); - std::vector > requests; + std::list > requests; i2p::data::IdentHash ourIdent = i2p::context.GetIdentHash(); i2p::data::IdentHash ih = ri.GetIdentHash(); diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 3489a6ba..9ceb491e 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -293,6 +293,8 @@ namespace transport m_SentHandshakePacket.reset (nullptr); m_SessionConfirmedFragment.reset (nullptr); m_PathChallenge.reset (nullptr); + if (!m_IntermediateQueue.empty ()) + m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue); for (auto& it: m_SendQueue) it->Drop (); m_SendQueue.clear (); @@ -372,14 +374,27 @@ namespace transport } - void SSU2Session::SendI2NPMessages (const std::vector >& msgs) + void SSU2Session::SendI2NPMessages (std::list >& msgs) { - m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this (), msgs)); + if (m_State == eSSU2SessionStateTerminated || msgs.empty ()) return; + bool empty = false; + { + std::lock_guard l(m_IntermediateQueueMutex); + empty = m_IntermediateQueue.empty (); + m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs); + } + if (empty) + m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this ())); } - void SSU2Session::PostI2NPMessages (std::vector > msgs) + void SSU2Session::PostI2NPMessages () { if (m_State == eSSU2SessionStateTerminated) return; + std::list > msgs; + { + std::lock_guard l(m_IntermediateQueueMutex); + m_IntermediateQueue.swap (msgs); + } uint64_t mts = i2p::util::GetMonotonicMicroseconds (); bool isSemiFull = false; if (m_SendQueue.size ()) @@ -415,7 +430,7 @@ namespace transport void SSU2Session::MoveSendQueue (std::shared_ptr other) { if (!other || m_SendQueue.empty ()) return; - std::vector > msgs; + std::list > msgs; auto ts = i2p::util::GetMillisecondsSinceEpoch (); for (auto it: m_SendQueue) if (!it->IsExpired (ts)) @@ -424,7 +439,7 @@ namespace transport it->Drop (); m_SendQueue.clear (); if (!msgs.empty ()) - other->PostI2NPMessages (msgs); + other->SendI2NPMessages (msgs); } bool SSU2Session::SendQueue () diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index d54731dc..4b3139a7 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -261,7 +261,7 @@ namespace transport void FlushData (); void Done () override; void SendLocalRouterInfo (bool update) override; - void SendI2NPMessages (const std::vector >& msgs) override; + void SendI2NPMessages (std::list >& msgs) override; void MoveSendQueue (std::shared_ptr other); uint32_t GetRelayTag () const override { return m_RelayTag; }; size_t Resend (uint64_t ts); // return number of resent packets @@ -307,7 +307,7 @@ namespace transport void Established (); void ScheduleConnectTimer (); void HandleConnectTimer (const boost::system::error_code& ecode); - void PostI2NPMessages (std::vector > msgs); + void PostI2NPMessages (); bool SendQueue (); // returns true if ack block was sent bool SendFragmentedMessage (std::shared_ptr msg); void ResendHandshakePacket (); @@ -381,6 +381,8 @@ namespace transport std::unordered_map, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice std::list > m_SendQueue; i2p::I2NPMessagesHandler m_Handler; + std::list > m_IntermediateQueue; // from transports + mutable std::mutex m_IntermediateQueueMutex; bool m_IsDataReceived; double m_RTT; int m_MsgLocalExpirationTimeout; diff --git a/libi2pd/TransportSession.h b/libi2pd/TransportSession.h index c6bf0de3..6c878d11 100644 --- a/libi2pd/TransportSession.h +++ b/libi2pd/TransportSession.h @@ -144,8 +144,12 @@ namespace transport void SetLastActivityTimestamp (uint64_t ts) { m_LastActivityTimestamp = ts; }; virtual uint32_t GetRelayTag () const { return 0; }; - virtual void SendLocalRouterInfo (bool update = false) { SendI2NPMessages ({ CreateDatabaseStoreMsg () }); }; - virtual void SendI2NPMessages (const std::vector >& msgs) = 0; + virtual void SendLocalRouterInfo (bool update = false) + { + std::list > msgs{ CreateDatabaseStoreMsg () }; + SendI2NPMessages (msgs); + }; + virtual void SendI2NPMessages (std::list >& msgs) = 0; virtual bool IsEstablished () const = 0; private: diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index 34bc6142..3954e2cf 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -450,15 +450,23 @@ namespace transport void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg) { if (m_IsOnline) - SendMessages (ident, std::vector > {msg }); + SendMessages (ident, { msg }); } - void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs) + void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::list >& msgs) { m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs)); } - void Transports::PostMessages (i2p::data::IdentHash ident, std::vector > msgs) + void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs) + { + m_Service->post ([this, ident, msgs = std::move(msgs)] () + { + PostMessages (ident, msgs); + }); + } + + void Transports::PostMessages (i2p::data::IdentHash ident, std::list > msgs) { if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) { @@ -517,11 +525,16 @@ namespace transport return; } } - for (auto& it1: msgs) - if (sz > MAX_NUM_DELAYED_MESSAGES/2 && it1->onDrop) - it1->Drop (); // drop earlier because we can handle it - else - peer->delayedMessages.push_back (it1); + if (sz > MAX_NUM_DELAYED_MESSAGES/2) + { + for (auto& it1: msgs) + if (it1->onDrop) + it1->Drop (); // drop earlier because we can handle it + else + peer->delayedMessages.push_back (it1); + } + else + peer->delayedMessages.splice (peer->delayedMessages.end (), msgs); } else { @@ -865,7 +878,7 @@ namespace transport if (it->second->delayedMessages.size () > 0) { // check if first message is our DatabaseStore (publishing) - auto firstMsg = peer->delayedMessages[0]; + auto firstMsg = peer->delayedMessages.front (); if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore && i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ()) sendDatabaseStore = false; // we have it in the list already @@ -887,7 +900,10 @@ namespace transport return; } if (!session->IsOutgoing ()) // incoming - session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore + { + std::list > msgs{ CreateDatabaseStoreMsg () }; + session->SendI2NPMessages (msgs); // send DatabaseStore + } auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed if (r) r->GetProfile ()->Connected (); auto ts = i2p::util::GetSecondsSinceEpoch (); diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index 70273094..e18ec29d 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -71,7 +71,7 @@ namespace transport std::shared_ptr router; std::list > sessions; uint64_t creationTime, nextRouterInfoUpdateTime, lastSelectionTime; - std::vector > delayedMessages; + std::list > delayedMessages; std::vector priority; bool isHighBandwidth, isEligible; @@ -141,7 +141,8 @@ namespace transport void ReuseX25519KeysPair (std::shared_ptr pair); void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg); - void SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs); + void SendMessages (const i2p::data::IdentHash& ident, const std::list >& msgs); + void SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs); void PeerConnected (std::shared_ptr session); void PeerDisconnected (std::shared_ptr session); @@ -185,7 +186,7 @@ namespace transport void Run (); void RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident); void HandleRequestComplete (std::shared_ptr r, i2p::data::IdentHash ident); - void PostMessages (i2p::data::IdentHash ident, std::vector > msgs); + void PostMessages (i2p::data::IdentHash ident, std::list > msgs); bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr peer); void SetPriority (std::shared_ptr peer) const; void HandlePeerCleanupTimer (const boost::system::error_code& ecode); diff --git a/libi2pd/TunnelGateway.cpp b/libi2pd/TunnelGateway.cpp index 85ff224e..77110c39 100644 --- a/libi2pd/TunnelGateway.cpp +++ b/libi2pd/TunnelGateway.cpp @@ -221,7 +221,7 @@ namespace tunnel void TunnelGateway::SendBuffer () { m_Buffer.CompleteCurrentTunnelDataMessage (); - std::vector > newTunnelMsgs; + std::list > newTunnelMsgs; const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs (); for (auto& tunnelMsg : tunnelDataMsgs) { From 4c90a88b85c736dad7e70ef1e2503a9360358042 Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 28 Oct 2024 21:10:30 -0400 Subject: [PATCH 14/20] eliminate extra copy of I2NP messages list --- libi2pd/TransitTunnel.cpp | 5 ++--- libi2pd/Transports.cpp | 9 +++++---- libi2pd/Transports.h | 2 +- libi2pd/TunnelGateway.cpp | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/libi2pd/TransitTunnel.cpp b/libi2pd/TransitTunnel.cpp index 6c2c52a7..72d7b8c2 100644 --- a/libi2pd/TransitTunnel.cpp +++ b/libi2pd/TransitTunnel.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2022, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -59,8 +59,7 @@ namespace tunnel auto num = m_TunnelDataMsgs.size (); if (num > 1) LogPrint (eLogDebug, "TransitTunnel: ", GetTunnelID (), "->", GetNextTunnelID (), " ", num); - i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs); - m_TunnelDataMsgs.clear (); + i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs); // send and clear } } diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index 3954e2cf..d18d3429 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -453,9 +453,11 @@ namespace transport SendMessages (ident, { msg }); } - void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::list >& msgs) + void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs) { - m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs)); + std::list > msgs1; + msgs.swap (msgs1); + SendMessages (ident, std::move (msgs1)); } void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs) @@ -888,8 +890,7 @@ namespace transport else session->SetTerminationTimeout (10); // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds peer->sessions.push_back (session); - session->SendI2NPMessages (peer->delayedMessages); - peer->delayedMessages.clear (); + session->SendI2NPMessages (peer->delayedMessages); // send and clear } else // incoming connection or peer test { diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index e18ec29d..bfabc6b3 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -141,7 +141,7 @@ namespace transport void ReuseX25519KeysPair (std::shared_ptr pair); void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg); - void SendMessages (const i2p::data::IdentHash& ident, const std::list >& msgs); + void SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs); void SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs); void PeerConnected (std::shared_ptr session); diff --git a/libi2pd/TunnelGateway.cpp b/libi2pd/TunnelGateway.cpp index 77110c39..78a63fc4 100644 --- a/libi2pd/TunnelGateway.cpp +++ b/libi2pd/TunnelGateway.cpp @@ -234,7 +234,7 @@ namespace tunnel m_NumSentBytes += TUNNEL_DATA_MSG_SIZE; } m_Buffer.ClearTunnelDataMsgs (); - i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), newTunnelMsgs); + i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), std::move (newTunnelMsgs)); } } } From 361f3649667593dd95d2ad6dd79b38959ceaf1b2 Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 28 Oct 2024 21:15:16 -0400 Subject: [PATCH 15/20] intermediate queue for transport sessions. use std::list instead std::vector for multiple I2NP messages --- libi2pd/TransitTunnel.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libi2pd/TransitTunnel.h b/libi2pd/TransitTunnel.h index f83007a9..b3381fb7 100644 --- a/libi2pd/TransitTunnel.h +++ b/libi2pd/TransitTunnel.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2023, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -10,7 +10,7 @@ #define TRANSIT_TUNNEL_H__ #include -#include +#include #include #include #include "Crypto.h" @@ -61,7 +61,7 @@ namespace tunnel private: size_t m_NumTransmittedBytes; - std::vector > m_TunnelDataMsgs; + std::list > m_TunnelDataMsgs; }; class TransitTunnelGateway: public TransitTunnel From f04048717dc1633c2d21ef2d223f79138b2b03bd Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 28 Oct 2024 21:34:33 -0400 Subject: [PATCH 16/20] cleanup messages to send if session was terminated --- libi2pd/NTCP2.cpp | 6 +++++- libi2pd/SSU2Session.cpp | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/libi2pd/NTCP2.cpp b/libi2pd/NTCP2.cpp index 747dc0b0..33c33596 100644 --- a/libi2pd/NTCP2.cpp +++ b/libi2pd/NTCP2.cpp @@ -1301,7 +1301,11 @@ namespace transport void NTCP2Session::SendI2NPMessages (std::list >& msgs) { - if (m_IsTerminated || msgs.empty ()) return; + if (m_IsTerminated || msgs.empty ()) + { + msgs.clear (); + return; + } bool empty = false; { std::lock_guard l(m_IntermediateQueueMutex); diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 9ceb491e..5d5d5249 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -376,7 +376,11 @@ namespace transport void SSU2Session::SendI2NPMessages (std::list >& msgs) { - if (m_State == eSSU2SessionStateTerminated || msgs.empty ()) return; + if (m_State == eSSU2SessionStateTerminated || msgs.empty ()) + { + msgs.clear (); + return; + } bool empty = false; { std::lock_guard l(m_IntermediateQueueMutex); From 9bc595a9a2849743e85b3707e6f30163ede676d1 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 29 Oct 2024 08:41:59 -0400 Subject: [PATCH 17/20] eliminate extra copy --- libi2pd/Transports.cpp | 4 ++-- libi2pd/Transports.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index d18d3429..178188f2 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -462,13 +462,13 @@ namespace transport void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs) { - m_Service->post ([this, ident, msgs = std::move(msgs)] () + m_Service->post ([this, ident, msgs = std::move(msgs)] () mutable { PostMessages (ident, msgs); }); } - void Transports::PostMessages (i2p::data::IdentHash ident, std::list > msgs) + void Transports::PostMessages (const i2p::data::IdentHash& ident, std::list >& msgs) { if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) { diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index bfabc6b3..095cc81a 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -186,7 +186,7 @@ namespace transport void Run (); void RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident); void HandleRequestComplete (std::shared_ptr r, i2p::data::IdentHash ident); - void PostMessages (i2p::data::IdentHash ident, std::list > msgs); + void PostMessages (const i2p::data::IdentHash& ident, std::list >& msgs); bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr peer); void SetPriority (std::shared_ptr peer) const; void HandlePeerCleanupTimer (const boost::system::error_code& ecode); From 3f10f6651d7b966a23c68621d63389ea35e55839 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 29 Oct 2024 12:46:14 -0400 Subject: [PATCH 18/20] use splice if queue is not semi-full --- libi2pd/SSU2Session.cpp | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 5d5d5249..aba8195e 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -412,16 +412,24 @@ namespace transport " is semi-full (size = ", m_SendQueue.size (), ", lag = ", queueLag / 1000, ", rtt = ", (int)m_RTT, ")"); } } - for (auto it: msgs) - { - if (isSemiFull && it->onDrop) - it->Drop (); // drop earlier because we can handle it - else + if (isSemiFull) + { + for (auto it: msgs) { - it->SetEnqueueTime (mts); - m_SendQueue.push_back (std::move (it)); + if (it->onDrop) + it->Drop (); // drop earlier because we can handle it + else + { + it->SetEnqueueTime (mts); + m_SendQueue.push_back (std::move (it)); + } } - } + } + else + { + for (auto& it: msgs) it->SetEnqueueTime (mts); + m_SendQueue.splice (m_SendQueue.end (), msgs); + } if (IsEstablished ()) { SendQueue (); From 8a8277edda0392a3fd99278409b811fc44f9e739 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 29 Oct 2024 13:59:21 -0400 Subject: [PATCH 19/20] check for empty URL string --- libi2pd/HTTP.cpp | 1 + libi2pd_client/AddressBook.cpp | 9 ++++----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libi2pd/HTTP.cpp b/libi2pd/HTTP.cpp index 990781bc..258d3ada 100644 --- a/libi2pd/HTTP.cpp +++ b/libi2pd/HTTP.cpp @@ -103,6 +103,7 @@ namespace http bool URL::parse(std::string_view url) { + if (url.empty ()) return false; std::size_t pos_p = 0; /* < current parse position */ std::size_t pos_c = 0; /* < work position */ if(url.at(0) != '/' || pos_p > 0) diff --git a/libi2pd_client/AddressBook.cpp b/libi2pd_client/AddressBook.cpp index 14599cf7..802b7996 100644 --- a/libi2pd_client/AddressBook.cpp +++ b/libi2pd_client/AddressBook.cpp @@ -582,16 +582,15 @@ namespace client } else { - LogPrint (eLogInfo, "Addressbook: Loading subscriptions from config file"); + LogPrint (eLogInfo, "Addressbook: Loading subscriptions from config"); // using config file items std::string subscriptionURLs; i2p::config::GetOption("addressbook.subscriptions", subscriptionURLs); std::vector subsList; boost::split(subsList, subscriptionURLs, boost::is_any_of(","), boost::token_compress_on); for (const auto& s: subsList) - { - m_Subscriptions.push_back (std::make_shared (*this, s)); - } + if (!s.empty ()) + m_Subscriptions.push_back (std::make_shared (*this, s)); LogPrint (eLogInfo, "Addressbook: ", m_Subscriptions.size (), " subscriptions urls loaded"); } } @@ -823,7 +822,7 @@ namespace client } } - AddressBookSubscription::AddressBookSubscription (AddressBook& book, const std::string& link): + AddressBookSubscription::AddressBookSubscription (AddressBook& book, std::string_view link): m_Book (book), m_Link (link) { } From 0086f8e27ad0f4fd702838c4782bb057df74ca77 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 29 Oct 2024 15:32:06 -0400 Subject: [PATCH 20/20] use std::async for address book download --- libi2pd_client/AddressBook.cpp | 51 +++++++++++++++++++++------------- libi2pd_client/AddressBook.h | 7 +++-- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/libi2pd_client/AddressBook.cpp b/libi2pd_client/AddressBook.cpp index 802b7996..c9452cc6 100644 --- a/libi2pd_client/AddressBook.cpp +++ b/libi2pd_client/AddressBook.cpp @@ -305,7 +305,7 @@ namespace client identHash = hash; } - AddressBook::AddressBook (): m_Storage(nullptr), m_IsLoaded (false), m_IsDownloading (false), + AddressBook::AddressBook (): m_Storage(nullptr), m_IsLoaded (false), m_NumRetries (0), m_DefaultSubscription (nullptr), m_SubscriptionsUpdateTimer (nullptr), m_IsEnabled (true) { @@ -344,20 +344,28 @@ namespace client delete m_SubscriptionsUpdateTimer; m_SubscriptionsUpdateTimer = nullptr; } - if (m_IsDownloading) + bool isDownloading = m_Downloading.valid (); + if (isDownloading) { - LogPrint (eLogInfo, "Addressbook: Subscriptions are downloading, abort"); - for (int i = 0; i < 30; i++) - { - if (!m_IsDownloading) + if (m_Downloading.wait_for(std::chrono::seconds(0)) == std::future_status::ready) + isDownloading = false; + else + { + LogPrint (eLogInfo, "Addressbook: Subscriptions are downloading, abort"); + for (int i = 0; i < 30; i++) { - LogPrint (eLogInfo, "Addressbook: Subscriptions download complete"); - break; + if (m_Downloading.wait_for(std::chrono::seconds(1)) == std::future_status::ready) // wait for 1 seconds + { + isDownloading = false; + LogPrint (eLogInfo, "Addressbook: Subscriptions download complete"); + break; + } } - std::this_thread::sleep_for (std::chrono::seconds (1)); // wait for 1 seconds - } - LogPrint (eLogError, "Addressbook: Subscription download timeout"); - m_IsDownloading = false; + } + if (!isDownloading) + m_Downloading.get (); + else + LogPrint (eLogError, "Addressbook: Subscription download timeout"); } if (m_Storage) { @@ -644,7 +652,6 @@ namespace client void AddressBook::DownloadComplete (bool success, const i2p::data::IdentHash& subscription, const std::string& etag, const std::string& lastModified) { - m_IsDownloading = false; m_NumRetries++; int nextUpdateTimeout = m_NumRetries*CONTINIOUS_SUBSCRIPTION_RETRY_TIMEOUT; if (m_NumRetries > CONTINIOUS_SUBSCRIPTION_MAX_NUM_RETRIES || nextUpdateTimeout > CONTINIOUS_SUBSCRIPTION_UPDATE_TIMEOUT) @@ -699,7 +706,13 @@ namespace client LogPrint(eLogWarning, "Addressbook: Missing local destination, skip subscription update"); return; } - if (!m_IsDownloading && dest->IsReady ()) + bool isDownloading = m_Downloading.valid (); + if (isDownloading && m_Downloading.wait_for(std::chrono::seconds(0)) == std::future_status::ready) // still active? + { + m_Downloading.get (); + isDownloading = false; + } + if (!isDownloading && dest->IsReady ()) { if (!m_IsLoaded) { @@ -708,17 +721,15 @@ namespace client std::string defaultSubURL; i2p::config::GetOption("addressbook.defaulturl", defaultSubURL); if (!m_DefaultSubscription) m_DefaultSubscription = std::make_shared(*this, defaultSubURL); - m_IsDownloading = true; - std::thread load_hosts(std::bind (&AddressBookSubscription::CheckUpdates, m_DefaultSubscription)); - load_hosts.detach(); // TODO: use join + m_Downloading = std::async (std::launch::async, + std::bind (&AddressBookSubscription::CheckUpdates, m_DefaultSubscription)); } else if (!m_Subscriptions.empty ()) { // pick random subscription auto ind = rand () % m_Subscriptions.size(); - m_IsDownloading = true; - std::thread load_hosts(std::bind (&AddressBookSubscription::CheckUpdates, m_Subscriptions[ind])); - load_hosts.detach(); // TODO: use join + m_Downloading = std::async (std::launch::async, + std::bind (&AddressBookSubscription::CheckUpdates, m_Subscriptions[ind])); } } else diff --git a/libi2pd_client/AddressBook.h b/libi2pd_client/AddressBook.h index fc4f19a7..553ae51b 100644 --- a/libi2pd_client/AddressBook.h +++ b/libi2pd_client/AddressBook.h @@ -11,10 +11,12 @@ #include #include +#include #include #include #include #include +#include #include #include #include "Base.h" @@ -124,7 +126,8 @@ namespace client std::mutex m_LookupsMutex; std::map m_Lookups; // nonce -> address AddressBookStorage * m_Storage; - volatile bool m_IsLoaded, m_IsDownloading; + volatile bool m_IsLoaded; + std::future m_Downloading; int m_NumRetries; std::vector > m_Subscriptions; std::shared_ptr m_DefaultSubscription; // in case if we don't know any addresses yet @@ -136,7 +139,7 @@ namespace client { public: - AddressBookSubscription (AddressBook& book, const std::string& link); + AddressBookSubscription (AddressBook& book, std::string_view link); void CheckUpdates (); private: