Compare commits

...

4 commits

Author SHA1 Message Date
orignal f04048717d cleanup messages to send if session was terminated
Some checks are pending
Build Debian packages / ${{ matrix.dist }} (buster) (push) Waiting to run
Build Debian packages / ${{ matrix.dist }} (bookworm) (push) Waiting to run
Build Debian packages / ${{ matrix.dist }} (bullseye) (push) Waiting to run
Build on FreeBSD / with UPnP (push) Waiting to run
Build on OSX / With USE_UPNP=${{ matrix.with_upnp }} (no) (push) Waiting to run
Build on OSX / With USE_UPNP=${{ matrix.with_upnp }} (yes) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (x86_64, x64, gcc, MINGW64) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (clang-x86_64, x64-clang, clang, CLANG64) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (i686, x86, gcc, MINGW32) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (ucrt-x86_64, x64-ucrt, gcc, UCRT64) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (clang-x86_64, x64-clang, clang, CLANG64) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (i686, x86, gcc, MINGW32) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (ucrt-x86_64, x64-ucrt, gcc, UCRT64) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (x86_64, x64, gcc, MINGW64) (push) Waiting to run
Build on Windows / XP (push) Waiting to run
Build on Ubuntu / Make with USE_UPNP=${{ matrix.with_upnp }} (no) (push) Waiting to run
Build on Ubuntu / Make with USE_UPNP=${{ matrix.with_upnp }} (yes) (push) Waiting to run
Build on Ubuntu / CMake with -DWITH_UPNP=${{ matrix.with_upnp }} (OFF) (push) Waiting to run
Build on Ubuntu / CMake with -DWITH_UPNP=${{ matrix.with_upnp }} (ON) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (amd64, linux/amd64) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (arm64, linux/arm64) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (armv7, linux/arm/v7) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (i386, linux/386) (push) Waiting to run
Build containers / Pushing merged manifest (push) Blocked by required conditions
2024-10-28 21:34:33 -04:00
orignal 361f364966 intermediate queue for transport sessions. use std::list instead std::vector for multiple I2NP messages 2024-10-28 21:15:16 -04:00
orignal 4c90a88b85 eliminate extra copy of I2NP messages list 2024-10-28 21:10:30 -04:00
orignal 23e66671c2 intermediate queue for transport sessions. use std::list instead std::vector for multiple I2NP messages 2024-10-28 20:36:50 -04:00
11 changed files with 116 additions and 47 deletions

View file

@ -375,6 +375,8 @@ namespace transport
m_Socket.close ();
transports.PeerDisconnected (shared_from_this ());
m_Server.RemoveNTCP2Session (shared_from_this ());
if (!m_IntermediateQueue.empty ())
m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue);
for (auto& it: m_SendQueue)
it->Drop ();
m_SendQueue.clear ();
@ -1207,7 +1209,7 @@ namespace transport
void NTCP2Session::MoveSendQueue (std::shared_ptr<NTCP2Session> other)
{
if (!other || m_SendQueue.empty ()) return;
std::vector<std::shared_ptr<I2NPMessage> > msgs;
std::list<std::shared_ptr<I2NPMessage> > msgs;
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it: m_SendQueue)
if (!it->IsExpired (ts))
@ -1216,7 +1218,7 @@ namespace transport
it->Drop ();
m_SendQueue.clear ();
if (!msgs.empty ())
other->PostI2NPMessages (msgs);
other->SendI2NPMessages (msgs);
}
size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len)
@ -1297,20 +1299,42 @@ namespace transport
m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go
}
void NTCP2Session::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
void NTCP2Session::SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs)
{
m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this (), msgs));
if (m_IsTerminated || msgs.empty ())
{
msgs.clear ();
return;
}
bool empty = false;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
empty = m_IntermediateQueue.empty ();
m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs);
}
if (empty)
m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this ()));
}
void NTCP2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
void NTCP2Session::PostI2NPMessages ()
{
if (m_IsTerminated) return;
std::list<std::shared_ptr<I2NPMessage> > msgs;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
m_IntermediateQueue.swap (msgs);
}
bool isSemiFull = m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE/2;
if (isSemiFull)
{
for (auto it: msgs)
if (isSemiFull && it->onDrop)
if (it->onDrop)
it->Drop (); // drop earlier because we can handle it
else
m_SendQueue.push_back (std::move (it));
}
else
m_SendQueue.splice (m_SendQueue.end (), msgs);
if (!m_IsSending && m_IsEstablished)
SendQueue ();

View file

@ -153,7 +153,7 @@ namespace transport
void ServerLogin (); // Bob
void SendLocalRouterInfo (bool update) override; // after handshake or by update
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) override;
void MoveSendQueue (std::shared_ptr<NTCP2Session> other);
private:
@ -196,7 +196,7 @@ namespace transport
void SendRouterInfo ();
void SendTermination (NTCP2TerminationReason reason);
void SendTerminationAndTerminate (NTCP2TerminationReason reason);
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
void PostI2NPMessages ();
private:
@ -230,6 +230,9 @@ namespace transport
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
uint64_t m_NextRouterInfoResendTime; // seconds since epoch
std::list<std::shared_ptr<I2NPMessage> > m_IntermediateQueue; // from transports
mutable std::mutex m_IntermediateQueueMutex;
uint16_t m_PaddingSizes[16];
int m_NextPaddingSize;
};

View file

@ -480,7 +480,7 @@ namespace data
void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills)
{
LogPrint(eLogInfo, "NetDB: Reseeding from floodfill ", ri.GetIdentHashBase64());
std::vector<std::shared_ptr<i2p::I2NPMessage> > requests;
std::list<std::shared_ptr<i2p::I2NPMessage> > requests;
i2p::data::IdentHash ourIdent = i2p::context.GetIdentHash();
i2p::data::IdentHash ih = ri.GetIdentHash();

View file

@ -293,6 +293,8 @@ namespace transport
m_SentHandshakePacket.reset (nullptr);
m_SessionConfirmedFragment.reset (nullptr);
m_PathChallenge.reset (nullptr);
if (!m_IntermediateQueue.empty ())
m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue);
for (auto& it: m_SendQueue)
it->Drop ();
m_SendQueue.clear ();
@ -372,14 +374,31 @@ namespace transport
}
void SSU2Session::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
void SSU2Session::SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs)
{
m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this (), msgs));
if (m_State == eSSU2SessionStateTerminated || msgs.empty ())
{
msgs.clear ();
return;
}
bool empty = false;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
empty = m_IntermediateQueue.empty ();
m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs);
}
if (empty)
m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this ()));
}
void SSU2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
void SSU2Session::PostI2NPMessages ()
{
if (m_State == eSSU2SessionStateTerminated) return;
std::list<std::shared_ptr<I2NPMessage> > msgs;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
m_IntermediateQueue.swap (msgs);
}
uint64_t mts = i2p::util::GetMonotonicMicroseconds ();
bool isSemiFull = false;
if (m_SendQueue.size ())
@ -415,7 +434,7 @@ namespace transport
void SSU2Session::MoveSendQueue (std::shared_ptr<SSU2Session> other)
{
if (!other || m_SendQueue.empty ()) return;
std::vector<std::shared_ptr<I2NPMessage> > msgs;
std::list<std::shared_ptr<I2NPMessage> > msgs;
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it: m_SendQueue)
if (!it->IsExpired (ts))
@ -424,7 +443,7 @@ namespace transport
it->Drop ();
m_SendQueue.clear ();
if (!msgs.empty ())
other->PostI2NPMessages (msgs);
other->SendI2NPMessages (msgs);
}
bool SSU2Session::SendQueue ()

View file

@ -261,7 +261,7 @@ namespace transport
void FlushData ();
void Done () override;
void SendLocalRouterInfo (bool update) override;
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) override;
void MoveSendQueue (std::shared_ptr<SSU2Session> other);
uint32_t GetRelayTag () const override { return m_RelayTag; };
size_t Resend (uint64_t ts); // return number of resent packets
@ -307,7 +307,7 @@ namespace transport
void Established ();
void ScheduleConnectTimer ();
void HandleConnectTimer (const boost::system::error_code& ecode);
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
void PostI2NPMessages ();
bool SendQueue (); // returns true if ack block was sent
bool SendFragmentedMessage (std::shared_ptr<I2NPMessage> msg);
void ResendHandshakePacket ();
@ -381,6 +381,8 @@ namespace transport
std::unordered_map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
i2p::I2NPMessagesHandler m_Handler;
std::list<std::shared_ptr<I2NPMessage> > m_IntermediateQueue; // from transports
mutable std::mutex m_IntermediateQueueMutex;
bool m_IsDataReceived;
double m_RTT;
int m_MsgLocalExpirationTimeout;

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2022, The PurpleI2P Project
* Copyright (c) 2013-2024, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
@ -59,8 +59,7 @@ namespace tunnel
auto num = m_TunnelDataMsgs.size ();
if (num > 1)
LogPrint (eLogDebug, "TransitTunnel: ", GetTunnelID (), "->", GetNextTunnelID (), " ", num);
i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs);
m_TunnelDataMsgs.clear ();
i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs); // send and clear
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2023, The PurpleI2P Project
* Copyright (c) 2013-2024, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
@ -10,7 +10,7 @@
#define TRANSIT_TUNNEL_H__
#include <inttypes.h>
#include <vector>
#include <list>
#include <mutex>
#include <memory>
#include "Crypto.h"
@ -61,7 +61,7 @@ namespace tunnel
private:
size_t m_NumTransmittedBytes;
std::vector<std::shared_ptr<i2p::I2NPMessage> > m_TunnelDataMsgs;
std::list<std::shared_ptr<i2p::I2NPMessage> > m_TunnelDataMsgs;
};
class TransitTunnelGateway: public TransitTunnel

View file

@ -144,8 +144,12 @@ namespace transport
void SetLastActivityTimestamp (uint64_t ts) { m_LastActivityTimestamp = ts; };
virtual uint32_t GetRelayTag () const { return 0; };
virtual void SendLocalRouterInfo (bool update = false) { SendI2NPMessages ({ CreateDatabaseStoreMsg () }); };
virtual void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) = 0;
virtual void SendLocalRouterInfo (bool update = false)
{
std::list<std::shared_ptr<I2NPMessage> > msgs{ CreateDatabaseStoreMsg () };
SendI2NPMessages (msgs);
};
virtual void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) = 0;
virtual bool IsEstablished () const = 0;
private:

View file

@ -450,15 +450,25 @@ namespace transport
void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg)
{
if (m_IsOnline)
SendMessages (ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > {msg });
SendMessages (ident, { msg });
}
void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs)
void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs)
{
m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs));
std::list<std::shared_ptr<i2p::I2NPMessage> > msgs1;
msgs.swap (msgs1);
SendMessages (ident, std::move (msgs1));
}
void Transports::PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs)
void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs)
{
m_Service->post ([this, ident, msgs = std::move(msgs)] ()
{
PostMessages (ident, msgs);
});
}
void Transports::PostMessages (i2p::data::IdentHash ident, std::list<std::shared_ptr<i2p::I2NPMessage> > msgs)
{
if (ident == i2p::context.GetRouterInfo ().GetIdentHash ())
{
@ -517,12 +527,17 @@ namespace transport
return;
}
}
if (sz > MAX_NUM_DELAYED_MESSAGES/2)
{
for (auto& it1: msgs)
if (sz > MAX_NUM_DELAYED_MESSAGES/2 && it1->onDrop)
if (it1->onDrop)
it1->Drop (); // drop earlier because we can handle it
else
peer->delayedMessages.push_back (it1);
}
else
peer->delayedMessages.splice (peer->delayedMessages.end (), msgs);
}
else
{
LogPrint (eLogWarning, "Transports: Delayed messages queue size to ",
@ -865,7 +880,7 @@ namespace transport
if (it->second->delayedMessages.size () > 0)
{
// check if first message is our DatabaseStore (publishing)
auto firstMsg = peer->delayedMessages[0];
auto firstMsg = peer->delayedMessages.front ();
if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore &&
i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ())
sendDatabaseStore = false; // we have it in the list already
@ -875,8 +890,7 @@ namespace transport
else
session->SetTerminationTimeout (10); // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds
peer->sessions.push_back (session);
session->SendI2NPMessages (peer->delayedMessages);
peer->delayedMessages.clear ();
session->SendI2NPMessages (peer->delayedMessages); // send and clear
}
else // incoming connection or peer test
{
@ -887,7 +901,10 @@ namespace transport
return;
}
if (!session->IsOutgoing ()) // incoming
session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore
{
std::list<std::shared_ptr<I2NPMessage> > msgs{ CreateDatabaseStoreMsg () };
session->SendI2NPMessages (msgs); // send DatabaseStore
}
auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed
if (r) r->GetProfile ()->Connected ();
auto ts = i2p::util::GetSecondsSinceEpoch ();

View file

@ -71,7 +71,7 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo> router;
std::list<std::shared_ptr<TransportSession> > sessions;
uint64_t creationTime, nextRouterInfoUpdateTime, lastSelectionTime;
std::vector<std::shared_ptr<i2p::I2NPMessage> > delayedMessages;
std::list<std::shared_ptr<i2p::I2NPMessage> > delayedMessages;
std::vector<i2p::data::RouterInfo::SupportedTransports> priority;
bool isHighBandwidth, isEligible;
@ -141,7 +141,8 @@ namespace transport
void ReuseX25519KeysPair (std::shared_ptr<i2p::crypto::X25519Keys> pair);
void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg);
void SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs);
void SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs);
void SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs);
void PeerConnected (std::shared_ptr<TransportSession> session);
void PeerDisconnected (std::shared_ptr<TransportSession> session);
@ -185,7 +186,7 @@ namespace transport
void Run ();
void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident);
void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, i2p::data::IdentHash ident);
void PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs);
void PostMessages (i2p::data::IdentHash ident, std::list<std::shared_ptr<i2p::I2NPMessage> > msgs);
bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr<Peer> peer);
void SetPriority (std::shared_ptr<Peer> peer) const;
void HandlePeerCleanupTimer (const boost::system::error_code& ecode);

View file

@ -221,7 +221,7 @@ namespace tunnel
void TunnelGateway::SendBuffer ()
{
m_Buffer.CompleteCurrentTunnelDataMessage ();
std::vector<std::shared_ptr<I2NPMessage> > newTunnelMsgs;
std::list<std::shared_ptr<I2NPMessage> > newTunnelMsgs;
const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs ();
for (auto& tunnelMsg : tunnelDataMsgs)
{
@ -234,7 +234,7 @@ namespace tunnel
m_NumSentBytes += TUNNEL_DATA_MSG_SIZE;
}
m_Buffer.ClearTunnelDataMsgs ();
i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), newTunnelMsgs);
i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), std::move (newTunnelMsgs));
}
}
}