Compare commits

...

4 commits

Author SHA1 Message Date
orignal f04048717d cleanup messages to send if session was terminated
Some checks are pending
Build Debian packages / ${{ matrix.dist }} (buster) (push) Waiting to run
Build Debian packages / ${{ matrix.dist }} (bookworm) (push) Waiting to run
Build Debian packages / ${{ matrix.dist }} (bullseye) (push) Waiting to run
Build on FreeBSD / with UPnP (push) Waiting to run
Build on OSX / With USE_UPNP=${{ matrix.with_upnp }} (no) (push) Waiting to run
Build on OSX / With USE_UPNP=${{ matrix.with_upnp }} (yes) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (x86_64, x64, gcc, MINGW64) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (clang-x86_64, x64-clang, clang, CLANG64) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (i686, x86, gcc, MINGW32) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (ucrt-x86_64, x64-ucrt, gcc, UCRT64) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (clang-x86_64, x64-clang, clang, CLANG64) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (i686, x86, gcc, MINGW32) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (ucrt-x86_64, x64-ucrt, gcc, UCRT64) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (x86_64, x64, gcc, MINGW64) (push) Waiting to run
Build on Windows / XP (push) Waiting to run
Build on Ubuntu / Make with USE_UPNP=${{ matrix.with_upnp }} (no) (push) Waiting to run
Build on Ubuntu / Make with USE_UPNP=${{ matrix.with_upnp }} (yes) (push) Waiting to run
Build on Ubuntu / CMake with -DWITH_UPNP=${{ matrix.with_upnp }} (OFF) (push) Waiting to run
Build on Ubuntu / CMake with -DWITH_UPNP=${{ matrix.with_upnp }} (ON) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (amd64, linux/amd64) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (arm64, linux/arm64) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (armv7, linux/arm/v7) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (i386, linux/386) (push) Waiting to run
Build containers / Pushing merged manifest (push) Blocked by required conditions
2024-10-28 21:34:33 -04:00
orignal 361f364966 intermediate queue for transport sessions. use std::list instead std::vector for multiple I2NP messages 2024-10-28 21:15:16 -04:00
orignal 4c90a88b85 eliminate extra copy of I2NP messages list 2024-10-28 21:10:30 -04:00
orignal 23e66671c2 intermediate queue for transport sessions. use std::list instead std::vector for multiple I2NP messages 2024-10-28 20:36:50 -04:00
11 changed files with 116 additions and 47 deletions

View file

@ -375,6 +375,8 @@ namespace transport
m_Socket.close (); m_Socket.close ();
transports.PeerDisconnected (shared_from_this ()); transports.PeerDisconnected (shared_from_this ());
m_Server.RemoveNTCP2Session (shared_from_this ()); m_Server.RemoveNTCP2Session (shared_from_this ());
if (!m_IntermediateQueue.empty ())
m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue);
for (auto& it: m_SendQueue) for (auto& it: m_SendQueue)
it->Drop (); it->Drop ();
m_SendQueue.clear (); m_SendQueue.clear ();
@ -1207,7 +1209,7 @@ namespace transport
void NTCP2Session::MoveSendQueue (std::shared_ptr<NTCP2Session> other) void NTCP2Session::MoveSendQueue (std::shared_ptr<NTCP2Session> other)
{ {
if (!other || m_SendQueue.empty ()) return; if (!other || m_SendQueue.empty ()) return;
std::vector<std::shared_ptr<I2NPMessage> > msgs; std::list<std::shared_ptr<I2NPMessage> > msgs;
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it: m_SendQueue) for (auto it: m_SendQueue)
if (!it->IsExpired (ts)) if (!it->IsExpired (ts))
@ -1216,7 +1218,7 @@ namespace transport
it->Drop (); it->Drop ();
m_SendQueue.clear (); m_SendQueue.clear ();
if (!msgs.empty ()) if (!msgs.empty ())
other->PostI2NPMessages (msgs); other->SendI2NPMessages (msgs);
} }
size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len) size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len)
@ -1297,20 +1299,42 @@ namespace transport
m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go
} }
void NTCP2Session::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) void NTCP2Session::SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs)
{ {
m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this (), msgs)); if (m_IsTerminated || msgs.empty ())
{
msgs.clear ();
return;
}
bool empty = false;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
empty = m_IntermediateQueue.empty ();
m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs);
}
if (empty)
m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this ()));
} }
void NTCP2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs) void NTCP2Session::PostI2NPMessages ()
{ {
if (m_IsTerminated) return; if (m_IsTerminated) return;
std::list<std::shared_ptr<I2NPMessage> > msgs;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
m_IntermediateQueue.swap (msgs);
}
bool isSemiFull = m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE/2; bool isSemiFull = m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE/2;
for (auto it: msgs) if (isSemiFull)
if (isSemiFull && it->onDrop) {
it->Drop (); // drop earlier because we can handle it for (auto it: msgs)
else if (it->onDrop)
m_SendQueue.push_back (std::move (it)); it->Drop (); // drop earlier because we can handle it
else
m_SendQueue.push_back (std::move (it));
}
else
m_SendQueue.splice (m_SendQueue.end (), msgs);
if (!m_IsSending && m_IsEstablished) if (!m_IsSending && m_IsEstablished)
SendQueue (); SendQueue ();

View file

@ -153,7 +153,7 @@ namespace transport
void ServerLogin (); // Bob void ServerLogin (); // Bob
void SendLocalRouterInfo (bool update) override; // after handshake or by update void SendLocalRouterInfo (bool update) override; // after handshake or by update
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override; void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) override;
void MoveSendQueue (std::shared_ptr<NTCP2Session> other); void MoveSendQueue (std::shared_ptr<NTCP2Session> other);
private: private:
@ -196,7 +196,7 @@ namespace transport
void SendRouterInfo (); void SendRouterInfo ();
void SendTermination (NTCP2TerminationReason reason); void SendTermination (NTCP2TerminationReason reason);
void SendTerminationAndTerminate (NTCP2TerminationReason reason); void SendTerminationAndTerminate (NTCP2TerminationReason reason);
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs); void PostI2NPMessages ();
private: private:
@ -230,6 +230,9 @@ namespace transport
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
uint64_t m_NextRouterInfoResendTime; // seconds since epoch uint64_t m_NextRouterInfoResendTime; // seconds since epoch
std::list<std::shared_ptr<I2NPMessage> > m_IntermediateQueue; // from transports
mutable std::mutex m_IntermediateQueueMutex;
uint16_t m_PaddingSizes[16]; uint16_t m_PaddingSizes[16];
int m_NextPaddingSize; int m_NextPaddingSize;
}; };

View file

@ -480,7 +480,7 @@ namespace data
void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills) void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills)
{ {
LogPrint(eLogInfo, "NetDB: Reseeding from floodfill ", ri.GetIdentHashBase64()); LogPrint(eLogInfo, "NetDB: Reseeding from floodfill ", ri.GetIdentHashBase64());
std::vector<std::shared_ptr<i2p::I2NPMessage> > requests; std::list<std::shared_ptr<i2p::I2NPMessage> > requests;
i2p::data::IdentHash ourIdent = i2p::context.GetIdentHash(); i2p::data::IdentHash ourIdent = i2p::context.GetIdentHash();
i2p::data::IdentHash ih = ri.GetIdentHash(); i2p::data::IdentHash ih = ri.GetIdentHash();

View file

@ -293,6 +293,8 @@ namespace transport
m_SentHandshakePacket.reset (nullptr); m_SentHandshakePacket.reset (nullptr);
m_SessionConfirmedFragment.reset (nullptr); m_SessionConfirmedFragment.reset (nullptr);
m_PathChallenge.reset (nullptr); m_PathChallenge.reset (nullptr);
if (!m_IntermediateQueue.empty ())
m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue);
for (auto& it: m_SendQueue) for (auto& it: m_SendQueue)
it->Drop (); it->Drop ();
m_SendQueue.clear (); m_SendQueue.clear ();
@ -372,14 +374,31 @@ namespace transport
} }
void SSU2Session::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) void SSU2Session::SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs)
{ {
m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this (), msgs)); if (m_State == eSSU2SessionStateTerminated || msgs.empty ())
{
msgs.clear ();
return;
}
bool empty = false;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
empty = m_IntermediateQueue.empty ();
m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs);
}
if (empty)
m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this ()));
} }
void SSU2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs) void SSU2Session::PostI2NPMessages ()
{ {
if (m_State == eSSU2SessionStateTerminated) return; if (m_State == eSSU2SessionStateTerminated) return;
std::list<std::shared_ptr<I2NPMessage> > msgs;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
m_IntermediateQueue.swap (msgs);
}
uint64_t mts = i2p::util::GetMonotonicMicroseconds (); uint64_t mts = i2p::util::GetMonotonicMicroseconds ();
bool isSemiFull = false; bool isSemiFull = false;
if (m_SendQueue.size ()) if (m_SendQueue.size ())
@ -415,7 +434,7 @@ namespace transport
void SSU2Session::MoveSendQueue (std::shared_ptr<SSU2Session> other) void SSU2Session::MoveSendQueue (std::shared_ptr<SSU2Session> other)
{ {
if (!other || m_SendQueue.empty ()) return; if (!other || m_SendQueue.empty ()) return;
std::vector<std::shared_ptr<I2NPMessage> > msgs; std::list<std::shared_ptr<I2NPMessage> > msgs;
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it: m_SendQueue) for (auto it: m_SendQueue)
if (!it->IsExpired (ts)) if (!it->IsExpired (ts))
@ -424,7 +443,7 @@ namespace transport
it->Drop (); it->Drop ();
m_SendQueue.clear (); m_SendQueue.clear ();
if (!msgs.empty ()) if (!msgs.empty ())
other->PostI2NPMessages (msgs); other->SendI2NPMessages (msgs);
} }
bool SSU2Session::SendQueue () bool SSU2Session::SendQueue ()

View file

@ -261,7 +261,7 @@ namespace transport
void FlushData (); void FlushData ();
void Done () override; void Done () override;
void SendLocalRouterInfo (bool update) override; void SendLocalRouterInfo (bool update) override;
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override; void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) override;
void MoveSendQueue (std::shared_ptr<SSU2Session> other); void MoveSendQueue (std::shared_ptr<SSU2Session> other);
uint32_t GetRelayTag () const override { return m_RelayTag; }; uint32_t GetRelayTag () const override { return m_RelayTag; };
size_t Resend (uint64_t ts); // return number of resent packets size_t Resend (uint64_t ts); // return number of resent packets
@ -307,7 +307,7 @@ namespace transport
void Established (); void Established ();
void ScheduleConnectTimer (); void ScheduleConnectTimer ();
void HandleConnectTimer (const boost::system::error_code& ecode); void HandleConnectTimer (const boost::system::error_code& ecode);
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs); void PostI2NPMessages ();
bool SendQueue (); // returns true if ack block was sent bool SendQueue (); // returns true if ack block was sent
bool SendFragmentedMessage (std::shared_ptr<I2NPMessage> msg); bool SendFragmentedMessage (std::shared_ptr<I2NPMessage> msg);
void ResendHandshakePacket (); void ResendHandshakePacket ();
@ -381,6 +381,8 @@ namespace transport
std::unordered_map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice std::unordered_map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
i2p::I2NPMessagesHandler m_Handler; i2p::I2NPMessagesHandler m_Handler;
std::list<std::shared_ptr<I2NPMessage> > m_IntermediateQueue; // from transports
mutable std::mutex m_IntermediateQueueMutex;
bool m_IsDataReceived; bool m_IsDataReceived;
double m_RTT; double m_RTT;
int m_MsgLocalExpirationTimeout; int m_MsgLocalExpirationTimeout;

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2022, The PurpleI2P Project * Copyright (c) 2013-2024, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -59,8 +59,7 @@ namespace tunnel
auto num = m_TunnelDataMsgs.size (); auto num = m_TunnelDataMsgs.size ();
if (num > 1) if (num > 1)
LogPrint (eLogDebug, "TransitTunnel: ", GetTunnelID (), "->", GetNextTunnelID (), " ", num); LogPrint (eLogDebug, "TransitTunnel: ", GetTunnelID (), "->", GetNextTunnelID (), " ", num);
i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs); i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs); // send and clear
m_TunnelDataMsgs.clear ();
} }
} }

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2023, The PurpleI2P Project * Copyright (c) 2013-2024, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -10,7 +10,7 @@
#define TRANSIT_TUNNEL_H__ #define TRANSIT_TUNNEL_H__
#include <inttypes.h> #include <inttypes.h>
#include <vector> #include <list>
#include <mutex> #include <mutex>
#include <memory> #include <memory>
#include "Crypto.h" #include "Crypto.h"
@ -61,7 +61,7 @@ namespace tunnel
private: private:
size_t m_NumTransmittedBytes; size_t m_NumTransmittedBytes;
std::vector<std::shared_ptr<i2p::I2NPMessage> > m_TunnelDataMsgs; std::list<std::shared_ptr<i2p::I2NPMessage> > m_TunnelDataMsgs;
}; };
class TransitTunnelGateway: public TransitTunnel class TransitTunnelGateway: public TransitTunnel

View file

@ -144,8 +144,12 @@ namespace transport
void SetLastActivityTimestamp (uint64_t ts) { m_LastActivityTimestamp = ts; }; void SetLastActivityTimestamp (uint64_t ts) { m_LastActivityTimestamp = ts; };
virtual uint32_t GetRelayTag () const { return 0; }; virtual uint32_t GetRelayTag () const { return 0; };
virtual void SendLocalRouterInfo (bool update = false) { SendI2NPMessages ({ CreateDatabaseStoreMsg () }); }; virtual void SendLocalRouterInfo (bool update = false)
virtual void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) = 0; {
std::list<std::shared_ptr<I2NPMessage> > msgs{ CreateDatabaseStoreMsg () };
SendI2NPMessages (msgs);
};
virtual void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) = 0;
virtual bool IsEstablished () const = 0; virtual bool IsEstablished () const = 0;
private: private:

View file

@ -450,15 +450,25 @@ namespace transport
void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg) void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg)
{ {
if (m_IsOnline) if (m_IsOnline)
SendMessages (ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > {msg }); SendMessages (ident, { msg });
} }
void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs) void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs)
{ {
m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs)); std::list<std::shared_ptr<i2p::I2NPMessage> > msgs1;
msgs.swap (msgs1);
SendMessages (ident, std::move (msgs1));
} }
void Transports::PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs) void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs)
{
m_Service->post ([this, ident, msgs = std::move(msgs)] ()
{
PostMessages (ident, msgs);
});
}
void Transports::PostMessages (i2p::data::IdentHash ident, std::list<std::shared_ptr<i2p::I2NPMessage> > msgs)
{ {
if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) if (ident == i2p::context.GetRouterInfo ().GetIdentHash ())
{ {
@ -517,11 +527,16 @@ namespace transport
return; return;
} }
} }
for (auto& it1: msgs) if (sz > MAX_NUM_DELAYED_MESSAGES/2)
if (sz > MAX_NUM_DELAYED_MESSAGES/2 && it1->onDrop) {
it1->Drop (); // drop earlier because we can handle it for (auto& it1: msgs)
else if (it1->onDrop)
peer->delayedMessages.push_back (it1); it1->Drop (); // drop earlier because we can handle it
else
peer->delayedMessages.push_back (it1);
}
else
peer->delayedMessages.splice (peer->delayedMessages.end (), msgs);
} }
else else
{ {
@ -865,7 +880,7 @@ namespace transport
if (it->second->delayedMessages.size () > 0) if (it->second->delayedMessages.size () > 0)
{ {
// check if first message is our DatabaseStore (publishing) // check if first message is our DatabaseStore (publishing)
auto firstMsg = peer->delayedMessages[0]; auto firstMsg = peer->delayedMessages.front ();
if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore && if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore &&
i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ()) i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ())
sendDatabaseStore = false; // we have it in the list already sendDatabaseStore = false; // we have it in the list already
@ -875,8 +890,7 @@ namespace transport
else else
session->SetTerminationTimeout (10); // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds session->SetTerminationTimeout (10); // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds
peer->sessions.push_back (session); peer->sessions.push_back (session);
session->SendI2NPMessages (peer->delayedMessages); session->SendI2NPMessages (peer->delayedMessages); // send and clear
peer->delayedMessages.clear ();
} }
else // incoming connection or peer test else // incoming connection or peer test
{ {
@ -887,7 +901,10 @@ namespace transport
return; return;
} }
if (!session->IsOutgoing ()) // incoming if (!session->IsOutgoing ()) // incoming
session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore {
std::list<std::shared_ptr<I2NPMessage> > msgs{ CreateDatabaseStoreMsg () };
session->SendI2NPMessages (msgs); // send DatabaseStore
}
auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed
if (r) r->GetProfile ()->Connected (); if (r) r->GetProfile ()->Connected ();
auto ts = i2p::util::GetSecondsSinceEpoch (); auto ts = i2p::util::GetSecondsSinceEpoch ();

View file

@ -71,7 +71,7 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo> router; std::shared_ptr<const i2p::data::RouterInfo> router;
std::list<std::shared_ptr<TransportSession> > sessions; std::list<std::shared_ptr<TransportSession> > sessions;
uint64_t creationTime, nextRouterInfoUpdateTime, lastSelectionTime; uint64_t creationTime, nextRouterInfoUpdateTime, lastSelectionTime;
std::vector<std::shared_ptr<i2p::I2NPMessage> > delayedMessages; std::list<std::shared_ptr<i2p::I2NPMessage> > delayedMessages;
std::vector<i2p::data::RouterInfo::SupportedTransports> priority; std::vector<i2p::data::RouterInfo::SupportedTransports> priority;
bool isHighBandwidth, isEligible; bool isHighBandwidth, isEligible;
@ -141,7 +141,8 @@ namespace transport
void ReuseX25519KeysPair (std::shared_ptr<i2p::crypto::X25519Keys> pair); void ReuseX25519KeysPair (std::shared_ptr<i2p::crypto::X25519Keys> pair);
void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg); void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg);
void SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs); void SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs);
void SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs);
void PeerConnected (std::shared_ptr<TransportSession> session); void PeerConnected (std::shared_ptr<TransportSession> session);
void PeerDisconnected (std::shared_ptr<TransportSession> session); void PeerDisconnected (std::shared_ptr<TransportSession> session);
@ -185,7 +186,7 @@ namespace transport
void Run (); void Run ();
void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident); void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident);
void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, i2p::data::IdentHash ident); void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, i2p::data::IdentHash ident);
void PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs); void PostMessages (i2p::data::IdentHash ident, std::list<std::shared_ptr<i2p::I2NPMessage> > msgs);
bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr<Peer> peer); bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr<Peer> peer);
void SetPriority (std::shared_ptr<Peer> peer) const; void SetPriority (std::shared_ptr<Peer> peer) const;
void HandlePeerCleanupTimer (const boost::system::error_code& ecode); void HandlePeerCleanupTimer (const boost::system::error_code& ecode);

View file

@ -221,7 +221,7 @@ namespace tunnel
void TunnelGateway::SendBuffer () void TunnelGateway::SendBuffer ()
{ {
m_Buffer.CompleteCurrentTunnelDataMessage (); m_Buffer.CompleteCurrentTunnelDataMessage ();
std::vector<std::shared_ptr<I2NPMessage> > newTunnelMsgs; std::list<std::shared_ptr<I2NPMessage> > newTunnelMsgs;
const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs (); const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs ();
for (auto& tunnelMsg : tunnelDataMsgs) for (auto& tunnelMsg : tunnelDataMsgs)
{ {
@ -234,7 +234,7 @@ namespace tunnel
m_NumSentBytes += TUNNEL_DATA_MSG_SIZE; m_NumSentBytes += TUNNEL_DATA_MSG_SIZE;
} }
m_Buffer.ClearTunnelDataMsgs (); m_Buffer.ClearTunnelDataMsgs ();
i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), newTunnelMsgs); i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), std::move (newTunnelMsgs));
} }
} }
} }