diff --git a/libi2pd/NTCP2.cpp b/libi2pd/NTCP2.cpp index 33c33596..a05469f1 100644 --- a/libi2pd/NTCP2.cpp +++ b/libi2pd/NTCP2.cpp @@ -375,8 +375,6 @@ namespace transport m_Socket.close (); transports.PeerDisconnected (shared_from_this ()); m_Server.RemoveNTCP2Session (shared_from_this ()); - if (!m_IntermediateQueue.empty ()) - m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue); for (auto& it: m_SendQueue) it->Drop (); m_SendQueue.clear (); @@ -1209,7 +1207,7 @@ namespace transport void NTCP2Session::MoveSendQueue (std::shared_ptr other) { if (!other || m_SendQueue.empty ()) return; - std::list > msgs; + std::vector > msgs; auto ts = i2p::util::GetMillisecondsSinceEpoch (); for (auto it: m_SendQueue) if (!it->IsExpired (ts)) @@ -1218,7 +1216,7 @@ namespace transport it->Drop (); m_SendQueue.clear (); if (!msgs.empty ()) - other->SendI2NPMessages (msgs); + other->PostI2NPMessages (msgs); } size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len) @@ -1299,42 +1297,20 @@ namespace transport m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go } - void NTCP2Session::SendI2NPMessages (std::list >& msgs) + void NTCP2Session::SendI2NPMessages (const std::vector >& msgs) { - if (m_IsTerminated || msgs.empty ()) - { - msgs.clear (); - return; - } - bool empty = false; - { - std::lock_guard l(m_IntermediateQueueMutex); - empty = m_IntermediateQueue.empty (); - m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs); - } - if (empty) - m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this ())); + m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this (), msgs)); } - void NTCP2Session::PostI2NPMessages () + void NTCP2Session::PostI2NPMessages (std::vector > msgs) { if (m_IsTerminated) return; - std::list > msgs; - { - std::lock_guard l(m_IntermediateQueueMutex); - m_IntermediateQueue.swap (msgs); - } bool isSemiFull = m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE/2; - if (isSemiFull) - { - for (auto it: msgs) - if (it->onDrop) - it->Drop (); // drop earlier because we can handle it - else - m_SendQueue.push_back (std::move (it)); - } - else - m_SendQueue.splice (m_SendQueue.end (), msgs); + for (auto it: msgs) + if (isSemiFull && it->onDrop) + it->Drop (); // drop earlier because we can handle it + else + m_SendQueue.push_back (std::move (it)); if (!m_IsSending && m_IsEstablished) SendQueue (); diff --git a/libi2pd/NTCP2.h b/libi2pd/NTCP2.h index 27acb529..f7912b54 100644 --- a/libi2pd/NTCP2.h +++ b/libi2pd/NTCP2.h @@ -153,7 +153,7 @@ namespace transport void ServerLogin (); // Bob void SendLocalRouterInfo (bool update) override; // after handshake or by update - void SendI2NPMessages (std::list >& msgs) override; + void SendI2NPMessages (const std::vector >& msgs) override; void MoveSendQueue (std::shared_ptr other); private: @@ -196,7 +196,7 @@ namespace transport void SendRouterInfo (); void SendTermination (NTCP2TerminationReason reason); void SendTerminationAndTerminate (NTCP2TerminationReason reason); - void PostI2NPMessages (); + void PostI2NPMessages (std::vector > msgs); private: @@ -229,10 +229,7 @@ namespace transport bool m_IsSending, m_IsReceiving; std::list > m_SendQueue; uint64_t m_NextRouterInfoResendTime; // seconds since epoch - - std::list > m_IntermediateQueue; // from transports - mutable std::mutex m_IntermediateQueueMutex; - + uint16_t m_PaddingSizes[16]; int m_NextPaddingSize; }; diff --git a/libi2pd/NetDb.cpp b/libi2pd/NetDb.cpp index c96bcf95..341d617e 100644 --- a/libi2pd/NetDb.cpp +++ b/libi2pd/NetDb.cpp @@ -480,7 +480,7 @@ namespace data void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills) { LogPrint(eLogInfo, "NetDB: Reseeding from floodfill ", ri.GetIdentHashBase64()); - std::list > requests; + std::vector > requests; i2p::data::IdentHash ourIdent = i2p::context.GetIdentHash(); i2p::data::IdentHash ih = ri.GetIdentHash(); diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 5d5d5249..3489a6ba 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -293,8 +293,6 @@ namespace transport m_SentHandshakePacket.reset (nullptr); m_SessionConfirmedFragment.reset (nullptr); m_PathChallenge.reset (nullptr); - if (!m_IntermediateQueue.empty ()) - m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue); for (auto& it: m_SendQueue) it->Drop (); m_SendQueue.clear (); @@ -374,31 +372,14 @@ namespace transport } - void SSU2Session::SendI2NPMessages (std::list >& msgs) + void SSU2Session::SendI2NPMessages (const std::vector >& msgs) { - if (m_State == eSSU2SessionStateTerminated || msgs.empty ()) - { - msgs.clear (); - return; - } - bool empty = false; - { - std::lock_guard l(m_IntermediateQueueMutex); - empty = m_IntermediateQueue.empty (); - m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs); - } - if (empty) - m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this ())); + m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this (), msgs)); } - void SSU2Session::PostI2NPMessages () + void SSU2Session::PostI2NPMessages (std::vector > msgs) { if (m_State == eSSU2SessionStateTerminated) return; - std::list > msgs; - { - std::lock_guard l(m_IntermediateQueueMutex); - m_IntermediateQueue.swap (msgs); - } uint64_t mts = i2p::util::GetMonotonicMicroseconds (); bool isSemiFull = false; if (m_SendQueue.size ()) @@ -434,7 +415,7 @@ namespace transport void SSU2Session::MoveSendQueue (std::shared_ptr other) { if (!other || m_SendQueue.empty ()) return; - std::list > msgs; + std::vector > msgs; auto ts = i2p::util::GetMillisecondsSinceEpoch (); for (auto it: m_SendQueue) if (!it->IsExpired (ts)) @@ -443,7 +424,7 @@ namespace transport it->Drop (); m_SendQueue.clear (); if (!msgs.empty ()) - other->SendI2NPMessages (msgs); + other->PostI2NPMessages (msgs); } bool SSU2Session::SendQueue () diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index 4b3139a7..d54731dc 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -261,7 +261,7 @@ namespace transport void FlushData (); void Done () override; void SendLocalRouterInfo (bool update) override; - void SendI2NPMessages (std::list >& msgs) override; + void SendI2NPMessages (const std::vector >& msgs) override; void MoveSendQueue (std::shared_ptr other); uint32_t GetRelayTag () const override { return m_RelayTag; }; size_t Resend (uint64_t ts); // return number of resent packets @@ -307,7 +307,7 @@ namespace transport void Established (); void ScheduleConnectTimer (); void HandleConnectTimer (const boost::system::error_code& ecode); - void PostI2NPMessages (); + void PostI2NPMessages (std::vector > msgs); bool SendQueue (); // returns true if ack block was sent bool SendFragmentedMessage (std::shared_ptr msg); void ResendHandshakePacket (); @@ -381,8 +381,6 @@ namespace transport std::unordered_map, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice std::list > m_SendQueue; i2p::I2NPMessagesHandler m_Handler; - std::list > m_IntermediateQueue; // from transports - mutable std::mutex m_IntermediateQueueMutex; bool m_IsDataReceived; double m_RTT; int m_MsgLocalExpirationTimeout; diff --git a/libi2pd/TransitTunnel.cpp b/libi2pd/TransitTunnel.cpp index 72d7b8c2..6c2c52a7 100644 --- a/libi2pd/TransitTunnel.cpp +++ b/libi2pd/TransitTunnel.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2024, The PurpleI2P Project +* Copyright (c) 2013-2022, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -59,7 +59,8 @@ namespace tunnel auto num = m_TunnelDataMsgs.size (); if (num > 1) LogPrint (eLogDebug, "TransitTunnel: ", GetTunnelID (), "->", GetNextTunnelID (), " ", num); - i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs); // send and clear + i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs); + m_TunnelDataMsgs.clear (); } } diff --git a/libi2pd/TransitTunnel.h b/libi2pd/TransitTunnel.h index b3381fb7..f83007a9 100644 --- a/libi2pd/TransitTunnel.h +++ b/libi2pd/TransitTunnel.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2024, The PurpleI2P Project +* Copyright (c) 2013-2023, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -10,7 +10,7 @@ #define TRANSIT_TUNNEL_H__ #include -#include +#include #include #include #include "Crypto.h" @@ -61,7 +61,7 @@ namespace tunnel private: size_t m_NumTransmittedBytes; - std::list > m_TunnelDataMsgs; + std::vector > m_TunnelDataMsgs; }; class TransitTunnelGateway: public TransitTunnel diff --git a/libi2pd/TransportSession.h b/libi2pd/TransportSession.h index 6c878d11..c6bf0de3 100644 --- a/libi2pd/TransportSession.h +++ b/libi2pd/TransportSession.h @@ -144,12 +144,8 @@ namespace transport void SetLastActivityTimestamp (uint64_t ts) { m_LastActivityTimestamp = ts; }; virtual uint32_t GetRelayTag () const { return 0; }; - virtual void SendLocalRouterInfo (bool update = false) - { - std::list > msgs{ CreateDatabaseStoreMsg () }; - SendI2NPMessages (msgs); - }; - virtual void SendI2NPMessages (std::list >& msgs) = 0; + virtual void SendLocalRouterInfo (bool update = false) { SendI2NPMessages ({ CreateDatabaseStoreMsg () }); }; + virtual void SendI2NPMessages (const std::vector >& msgs) = 0; virtual bool IsEstablished () const = 0; private: diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index d18d3429..34bc6142 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -450,25 +450,15 @@ namespace transport void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg) { if (m_IsOnline) - SendMessages (ident, { msg }); + SendMessages (ident, std::vector > {msg }); } - void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs) + void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs) { - std::list > msgs1; - msgs.swap (msgs1); - SendMessages (ident, std::move (msgs1)); + m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs)); } - void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs) - { - m_Service->post ([this, ident, msgs = std::move(msgs)] () - { - PostMessages (ident, msgs); - }); - } - - void Transports::PostMessages (i2p::data::IdentHash ident, std::list > msgs) + void Transports::PostMessages (i2p::data::IdentHash ident, std::vector > msgs) { if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) { @@ -527,16 +517,11 @@ namespace transport return; } } - if (sz > MAX_NUM_DELAYED_MESSAGES/2) - { - for (auto& it1: msgs) - if (it1->onDrop) - it1->Drop (); // drop earlier because we can handle it - else - peer->delayedMessages.push_back (it1); - } - else - peer->delayedMessages.splice (peer->delayedMessages.end (), msgs); + for (auto& it1: msgs) + if (sz > MAX_NUM_DELAYED_MESSAGES/2 && it1->onDrop) + it1->Drop (); // drop earlier because we can handle it + else + peer->delayedMessages.push_back (it1); } else { @@ -880,7 +865,7 @@ namespace transport if (it->second->delayedMessages.size () > 0) { // check if first message is our DatabaseStore (publishing) - auto firstMsg = peer->delayedMessages.front (); + auto firstMsg = peer->delayedMessages[0]; if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore && i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ()) sendDatabaseStore = false; // we have it in the list already @@ -890,7 +875,8 @@ namespace transport else session->SetTerminationTimeout (10); // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds peer->sessions.push_back (session); - session->SendI2NPMessages (peer->delayedMessages); // send and clear + session->SendI2NPMessages (peer->delayedMessages); + peer->delayedMessages.clear (); } else // incoming connection or peer test { @@ -901,10 +887,7 @@ namespace transport return; } if (!session->IsOutgoing ()) // incoming - { - std::list > msgs{ CreateDatabaseStoreMsg () }; - session->SendI2NPMessages (msgs); // send DatabaseStore - } + session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed if (r) r->GetProfile ()->Connected (); auto ts = i2p::util::GetSecondsSinceEpoch (); diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index bfabc6b3..70273094 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -71,7 +71,7 @@ namespace transport std::shared_ptr router; std::list > sessions; uint64_t creationTime, nextRouterInfoUpdateTime, lastSelectionTime; - std::list > delayedMessages; + std::vector > delayedMessages; std::vector priority; bool isHighBandwidth, isEligible; @@ -141,8 +141,7 @@ namespace transport void ReuseX25519KeysPair (std::shared_ptr pair); void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg); - void SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs); - void SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs); + void SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs); void PeerConnected (std::shared_ptr session); void PeerDisconnected (std::shared_ptr session); @@ -186,7 +185,7 @@ namespace transport void Run (); void RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident); void HandleRequestComplete (std::shared_ptr r, i2p::data::IdentHash ident); - void PostMessages (i2p::data::IdentHash ident, std::list > msgs); + void PostMessages (i2p::data::IdentHash ident, std::vector > msgs); bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr peer); void SetPriority (std::shared_ptr peer) const; void HandlePeerCleanupTimer (const boost::system::error_code& ecode); diff --git a/libi2pd/TunnelGateway.cpp b/libi2pd/TunnelGateway.cpp index 78a63fc4..85ff224e 100644 --- a/libi2pd/TunnelGateway.cpp +++ b/libi2pd/TunnelGateway.cpp @@ -221,7 +221,7 @@ namespace tunnel void TunnelGateway::SendBuffer () { m_Buffer.CompleteCurrentTunnelDataMessage (); - std::list > newTunnelMsgs; + std::vector > newTunnelMsgs; const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs (); for (auto& tunnelMsg : tunnelDataMsgs) { @@ -234,7 +234,7 @@ namespace tunnel m_NumSentBytes += TUNNEL_DATA_MSG_SIZE; } m_Buffer.ClearTunnelDataMsgs (); - i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), std::move (newTunnelMsgs)); + i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), newTunnelMsgs); } } }