mirror of
https://github.com/PurpleI2P/i2pd.git
synced 2025-01-22 21:37:17 +01:00
Compare commits
4 commits
ec67f48d85
...
f04048717d
Author | SHA1 | Date | |
---|---|---|---|
f04048717d | |||
361f364966 | |||
4c90a88b85 | |||
23e66671c2 |
|
@ -375,6 +375,8 @@ namespace transport
|
|||
m_Socket.close ();
|
||||
transports.PeerDisconnected (shared_from_this ());
|
||||
m_Server.RemoveNTCP2Session (shared_from_this ());
|
||||
if (!m_IntermediateQueue.empty ())
|
||||
m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue);
|
||||
for (auto& it: m_SendQueue)
|
||||
it->Drop ();
|
||||
m_SendQueue.clear ();
|
||||
|
@ -1207,7 +1209,7 @@ namespace transport
|
|||
void NTCP2Session::MoveSendQueue (std::shared_ptr<NTCP2Session> other)
|
||||
{
|
||||
if (!other || m_SendQueue.empty ()) return;
|
||||
std::vector<std::shared_ptr<I2NPMessage> > msgs;
|
||||
std::list<std::shared_ptr<I2NPMessage> > msgs;
|
||||
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
|
||||
for (auto it: m_SendQueue)
|
||||
if (!it->IsExpired (ts))
|
||||
|
@ -1216,7 +1218,7 @@ namespace transport
|
|||
it->Drop ();
|
||||
m_SendQueue.clear ();
|
||||
if (!msgs.empty ())
|
||||
other->PostI2NPMessages (msgs);
|
||||
other->SendI2NPMessages (msgs);
|
||||
}
|
||||
|
||||
size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len)
|
||||
|
@ -1297,20 +1299,42 @@ namespace transport
|
|||
m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go
|
||||
}
|
||||
|
||||
void NTCP2Session::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
|
||||
void NTCP2Session::SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs)
|
||||
{
|
||||
m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this (), msgs));
|
||||
if (m_IsTerminated || msgs.empty ())
|
||||
{
|
||||
msgs.clear ();
|
||||
return;
|
||||
}
|
||||
bool empty = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
|
||||
empty = m_IntermediateQueue.empty ();
|
||||
m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs);
|
||||
}
|
||||
if (empty)
|
||||
m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this ()));
|
||||
}
|
||||
|
||||
void NTCP2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
|
||||
void NTCP2Session::PostI2NPMessages ()
|
||||
{
|
||||
if (m_IsTerminated) return;
|
||||
std::list<std::shared_ptr<I2NPMessage> > msgs;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
|
||||
m_IntermediateQueue.swap (msgs);
|
||||
}
|
||||
bool isSemiFull = m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE/2;
|
||||
if (isSemiFull)
|
||||
{
|
||||
for (auto it: msgs)
|
||||
if (isSemiFull && it->onDrop)
|
||||
if (it->onDrop)
|
||||
it->Drop (); // drop earlier because we can handle it
|
||||
else
|
||||
m_SendQueue.push_back (std::move (it));
|
||||
}
|
||||
else
|
||||
m_SendQueue.splice (m_SendQueue.end (), msgs);
|
||||
|
||||
if (!m_IsSending && m_IsEstablished)
|
||||
SendQueue ();
|
||||
|
|
|
@ -153,7 +153,7 @@ namespace transport
|
|||
void ServerLogin (); // Bob
|
||||
|
||||
void SendLocalRouterInfo (bool update) override; // after handshake or by update
|
||||
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
|
||||
void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) override;
|
||||
void MoveSendQueue (std::shared_ptr<NTCP2Session> other);
|
||||
|
||||
private:
|
||||
|
@ -196,7 +196,7 @@ namespace transport
|
|||
void SendRouterInfo ();
|
||||
void SendTermination (NTCP2TerminationReason reason);
|
||||
void SendTerminationAndTerminate (NTCP2TerminationReason reason);
|
||||
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
|
||||
void PostI2NPMessages ();
|
||||
|
||||
private:
|
||||
|
||||
|
@ -230,6 +230,9 @@ namespace transport
|
|||
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
|
||||
uint64_t m_NextRouterInfoResendTime; // seconds since epoch
|
||||
|
||||
std::list<std::shared_ptr<I2NPMessage> > m_IntermediateQueue; // from transports
|
||||
mutable std::mutex m_IntermediateQueueMutex;
|
||||
|
||||
uint16_t m_PaddingSizes[16];
|
||||
int m_NextPaddingSize;
|
||||
};
|
||||
|
|
|
@ -480,7 +480,7 @@ namespace data
|
|||
void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills)
|
||||
{
|
||||
LogPrint(eLogInfo, "NetDB: Reseeding from floodfill ", ri.GetIdentHashBase64());
|
||||
std::vector<std::shared_ptr<i2p::I2NPMessage> > requests;
|
||||
std::list<std::shared_ptr<i2p::I2NPMessage> > requests;
|
||||
|
||||
i2p::data::IdentHash ourIdent = i2p::context.GetIdentHash();
|
||||
i2p::data::IdentHash ih = ri.GetIdentHash();
|
||||
|
|
|
@ -293,6 +293,8 @@ namespace transport
|
|||
m_SentHandshakePacket.reset (nullptr);
|
||||
m_SessionConfirmedFragment.reset (nullptr);
|
||||
m_PathChallenge.reset (nullptr);
|
||||
if (!m_IntermediateQueue.empty ())
|
||||
m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue);
|
||||
for (auto& it: m_SendQueue)
|
||||
it->Drop ();
|
||||
m_SendQueue.clear ();
|
||||
|
@ -372,14 +374,31 @@ namespace transport
|
|||
|
||||
}
|
||||
|
||||
void SSU2Session::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
|
||||
void SSU2Session::SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs)
|
||||
{
|
||||
m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this (), msgs));
|
||||
if (m_State == eSSU2SessionStateTerminated || msgs.empty ())
|
||||
{
|
||||
msgs.clear ();
|
||||
return;
|
||||
}
|
||||
bool empty = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
|
||||
empty = m_IntermediateQueue.empty ();
|
||||
m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs);
|
||||
}
|
||||
if (empty)
|
||||
m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this ()));
|
||||
}
|
||||
|
||||
void SSU2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
|
||||
void SSU2Session::PostI2NPMessages ()
|
||||
{
|
||||
if (m_State == eSSU2SessionStateTerminated) return;
|
||||
std::list<std::shared_ptr<I2NPMessage> > msgs;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
|
||||
m_IntermediateQueue.swap (msgs);
|
||||
}
|
||||
uint64_t mts = i2p::util::GetMonotonicMicroseconds ();
|
||||
bool isSemiFull = false;
|
||||
if (m_SendQueue.size ())
|
||||
|
@ -415,7 +434,7 @@ namespace transport
|
|||
void SSU2Session::MoveSendQueue (std::shared_ptr<SSU2Session> other)
|
||||
{
|
||||
if (!other || m_SendQueue.empty ()) return;
|
||||
std::vector<std::shared_ptr<I2NPMessage> > msgs;
|
||||
std::list<std::shared_ptr<I2NPMessage> > msgs;
|
||||
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
|
||||
for (auto it: m_SendQueue)
|
||||
if (!it->IsExpired (ts))
|
||||
|
@ -424,7 +443,7 @@ namespace transport
|
|||
it->Drop ();
|
||||
m_SendQueue.clear ();
|
||||
if (!msgs.empty ())
|
||||
other->PostI2NPMessages (msgs);
|
||||
other->SendI2NPMessages (msgs);
|
||||
}
|
||||
|
||||
bool SSU2Session::SendQueue ()
|
||||
|
|
|
@ -261,7 +261,7 @@ namespace transport
|
|||
void FlushData ();
|
||||
void Done () override;
|
||||
void SendLocalRouterInfo (bool update) override;
|
||||
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
|
||||
void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) override;
|
||||
void MoveSendQueue (std::shared_ptr<SSU2Session> other);
|
||||
uint32_t GetRelayTag () const override { return m_RelayTag; };
|
||||
size_t Resend (uint64_t ts); // return number of resent packets
|
||||
|
@ -307,7 +307,7 @@ namespace transport
|
|||
void Established ();
|
||||
void ScheduleConnectTimer ();
|
||||
void HandleConnectTimer (const boost::system::error_code& ecode);
|
||||
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
|
||||
void PostI2NPMessages ();
|
||||
bool SendQueue (); // returns true if ack block was sent
|
||||
bool SendFragmentedMessage (std::shared_ptr<I2NPMessage> msg);
|
||||
void ResendHandshakePacket ();
|
||||
|
@ -381,6 +381,8 @@ namespace transport
|
|||
std::unordered_map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice
|
||||
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
|
||||
i2p::I2NPMessagesHandler m_Handler;
|
||||
std::list<std::shared_ptr<I2NPMessage> > m_IntermediateQueue; // from transports
|
||||
mutable std::mutex m_IntermediateQueueMutex;
|
||||
bool m_IsDataReceived;
|
||||
double m_RTT;
|
||||
int m_MsgLocalExpirationTimeout;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (c) 2013-2022, The PurpleI2P Project
|
||||
* Copyright (c) 2013-2024, The PurpleI2P Project
|
||||
*
|
||||
* This file is part of Purple i2pd project and licensed under BSD3
|
||||
*
|
||||
|
@ -59,8 +59,7 @@ namespace tunnel
|
|||
auto num = m_TunnelDataMsgs.size ();
|
||||
if (num > 1)
|
||||
LogPrint (eLogDebug, "TransitTunnel: ", GetTunnelID (), "->", GetNextTunnelID (), " ", num);
|
||||
i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs);
|
||||
m_TunnelDataMsgs.clear ();
|
||||
i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs); // send and clear
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (c) 2013-2023, The PurpleI2P Project
|
||||
* Copyright (c) 2013-2024, The PurpleI2P Project
|
||||
*
|
||||
* This file is part of Purple i2pd project and licensed under BSD3
|
||||
*
|
||||
|
@ -10,7 +10,7 @@
|
|||
#define TRANSIT_TUNNEL_H__
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <vector>
|
||||
#include <list>
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
#include "Crypto.h"
|
||||
|
@ -61,7 +61,7 @@ namespace tunnel
|
|||
private:
|
||||
|
||||
size_t m_NumTransmittedBytes;
|
||||
std::vector<std::shared_ptr<i2p::I2NPMessage> > m_TunnelDataMsgs;
|
||||
std::list<std::shared_ptr<i2p::I2NPMessage> > m_TunnelDataMsgs;
|
||||
};
|
||||
|
||||
class TransitTunnelGateway: public TransitTunnel
|
||||
|
|
|
@ -144,8 +144,12 @@ namespace transport
|
|||
void SetLastActivityTimestamp (uint64_t ts) { m_LastActivityTimestamp = ts; };
|
||||
|
||||
virtual uint32_t GetRelayTag () const { return 0; };
|
||||
virtual void SendLocalRouterInfo (bool update = false) { SendI2NPMessages ({ CreateDatabaseStoreMsg () }); };
|
||||
virtual void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) = 0;
|
||||
virtual void SendLocalRouterInfo (bool update = false)
|
||||
{
|
||||
std::list<std::shared_ptr<I2NPMessage> > msgs{ CreateDatabaseStoreMsg () };
|
||||
SendI2NPMessages (msgs);
|
||||
};
|
||||
virtual void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) = 0;
|
||||
virtual bool IsEstablished () const = 0;
|
||||
|
||||
private:
|
||||
|
|
|
@ -450,15 +450,25 @@ namespace transport
|
|||
void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg)
|
||||
{
|
||||
if (m_IsOnline)
|
||||
SendMessages (ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > {msg });
|
||||
SendMessages (ident, { msg });
|
||||
}
|
||||
|
||||
void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs)
|
||||
void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs)
|
||||
{
|
||||
m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs));
|
||||
std::list<std::shared_ptr<i2p::I2NPMessage> > msgs1;
|
||||
msgs.swap (msgs1);
|
||||
SendMessages (ident, std::move (msgs1));
|
||||
}
|
||||
|
||||
void Transports::PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs)
|
||||
void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs)
|
||||
{
|
||||
m_Service->post ([this, ident, msgs = std::move(msgs)] ()
|
||||
{
|
||||
PostMessages (ident, msgs);
|
||||
});
|
||||
}
|
||||
|
||||
void Transports::PostMessages (i2p::data::IdentHash ident, std::list<std::shared_ptr<i2p::I2NPMessage> > msgs)
|
||||
{
|
||||
if (ident == i2p::context.GetRouterInfo ().GetIdentHash ())
|
||||
{
|
||||
|
@ -517,12 +527,17 @@ namespace transport
|
|||
return;
|
||||
}
|
||||
}
|
||||
if (sz > MAX_NUM_DELAYED_MESSAGES/2)
|
||||
{
|
||||
for (auto& it1: msgs)
|
||||
if (sz > MAX_NUM_DELAYED_MESSAGES/2 && it1->onDrop)
|
||||
if (it1->onDrop)
|
||||
it1->Drop (); // drop earlier because we can handle it
|
||||
else
|
||||
peer->delayedMessages.push_back (it1);
|
||||
}
|
||||
else
|
||||
peer->delayedMessages.splice (peer->delayedMessages.end (), msgs);
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint (eLogWarning, "Transports: Delayed messages queue size to ",
|
||||
|
@ -865,7 +880,7 @@ namespace transport
|
|||
if (it->second->delayedMessages.size () > 0)
|
||||
{
|
||||
// check if first message is our DatabaseStore (publishing)
|
||||
auto firstMsg = peer->delayedMessages[0];
|
||||
auto firstMsg = peer->delayedMessages.front ();
|
||||
if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore &&
|
||||
i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ())
|
||||
sendDatabaseStore = false; // we have it in the list already
|
||||
|
@ -875,8 +890,7 @@ namespace transport
|
|||
else
|
||||
session->SetTerminationTimeout (10); // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds
|
||||
peer->sessions.push_back (session);
|
||||
session->SendI2NPMessages (peer->delayedMessages);
|
||||
peer->delayedMessages.clear ();
|
||||
session->SendI2NPMessages (peer->delayedMessages); // send and clear
|
||||
}
|
||||
else // incoming connection or peer test
|
||||
{
|
||||
|
@ -887,7 +901,10 @@ namespace transport
|
|||
return;
|
||||
}
|
||||
if (!session->IsOutgoing ()) // incoming
|
||||
session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore
|
||||
{
|
||||
std::list<std::shared_ptr<I2NPMessage> > msgs{ CreateDatabaseStoreMsg () };
|
||||
session->SendI2NPMessages (msgs); // send DatabaseStore
|
||||
}
|
||||
auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed
|
||||
if (r) r->GetProfile ()->Connected ();
|
||||
auto ts = i2p::util::GetSecondsSinceEpoch ();
|
||||
|
|
|
@ -71,7 +71,7 @@ namespace transport
|
|||
std::shared_ptr<const i2p::data::RouterInfo> router;
|
||||
std::list<std::shared_ptr<TransportSession> > sessions;
|
||||
uint64_t creationTime, nextRouterInfoUpdateTime, lastSelectionTime;
|
||||
std::vector<std::shared_ptr<i2p::I2NPMessage> > delayedMessages;
|
||||
std::list<std::shared_ptr<i2p::I2NPMessage> > delayedMessages;
|
||||
std::vector<i2p::data::RouterInfo::SupportedTransports> priority;
|
||||
bool isHighBandwidth, isEligible;
|
||||
|
||||
|
@ -141,7 +141,8 @@ namespace transport
|
|||
void ReuseX25519KeysPair (std::shared_ptr<i2p::crypto::X25519Keys> pair);
|
||||
|
||||
void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg);
|
||||
void SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs);
|
||||
void SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs);
|
||||
void SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs);
|
||||
|
||||
void PeerConnected (std::shared_ptr<TransportSession> session);
|
||||
void PeerDisconnected (std::shared_ptr<TransportSession> session);
|
||||
|
@ -185,7 +186,7 @@ namespace transport
|
|||
void Run ();
|
||||
void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident);
|
||||
void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, i2p::data::IdentHash ident);
|
||||
void PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs);
|
||||
void PostMessages (i2p::data::IdentHash ident, std::list<std::shared_ptr<i2p::I2NPMessage> > msgs);
|
||||
bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr<Peer> peer);
|
||||
void SetPriority (std::shared_ptr<Peer> peer) const;
|
||||
void HandlePeerCleanupTimer (const boost::system::error_code& ecode);
|
||||
|
|
|
@ -221,7 +221,7 @@ namespace tunnel
|
|||
void TunnelGateway::SendBuffer ()
|
||||
{
|
||||
m_Buffer.CompleteCurrentTunnelDataMessage ();
|
||||
std::vector<std::shared_ptr<I2NPMessage> > newTunnelMsgs;
|
||||
std::list<std::shared_ptr<I2NPMessage> > newTunnelMsgs;
|
||||
const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs ();
|
||||
for (auto& tunnelMsg : tunnelDataMsgs)
|
||||
{
|
||||
|
@ -234,7 +234,7 @@ namespace tunnel
|
|||
m_NumSentBytes += TUNNEL_DATA_MSG_SIZE;
|
||||
}
|
||||
m_Buffer.ClearTunnelDataMsgs ();
|
||||
i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), newTunnelMsgs);
|
||||
i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), std::move (newTunnelMsgs));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue