Compare commits

..

1 commit

Author SHA1 Message Date
self-related
40e5d4c5e3
Merge 32a70562c4 into d3630fb2b2 2024-10-25 21:32:24 +02:00
24 changed files with 153 additions and 301 deletions

View file

@ -221,7 +221,7 @@ namespace i2p
void DaemonLinux::run ()
{
i2p::util::SetThreadName ("i2pd-daemon");
i2p::util::SetThreadName ("Daemon");
while (running)
{
std::this_thread::sleep_for (std::chrono::seconds(1));

View file

@ -103,7 +103,6 @@ namespace http
bool URL::parse(std::string_view url)
{
if (url.empty ()) return false;
std::size_t pos_p = 0; /* < current parse position */
std::size_t pos_c = 0; /* < work position */
if(url.at(0) != '/' || pos_p > 0)

View file

@ -396,26 +396,8 @@ namespace i2p
return false;
}
uint8_t retCode = 0;
// decide if we should accept tunnel
bool accept = i2p::context.AcceptsTunnels ();
if (accept)
{
auto congestionLevel = i2p::context.GetCongestionLevel (false);
if (congestionLevel >= CONGESTION_LEVEL_MEDIUM)
{
if (congestionLevel < CONGESTION_LEVEL_FULL)
{
// random reject depending on congestion level
int level = i2p::tunnel::tunnels.GetRng ()() % (CONGESTION_LEVEL_FULL - CONGESTION_LEVEL_MEDIUM) + CONGESTION_LEVEL_MEDIUM;
if (congestionLevel > level)
accept = false;
}
else
accept = false;
}
}
// replace record to reply
if (accept)
if (i2p::context.AcceptsTunnels () && i2p::context.GetCongestionLevel (false) < CONGESTION_LEVEL_FULL)
{
auto transitTunnel = i2p::tunnel::CreateTransitTunnel (
bufbe32toh (clearText + ECIES_BUILD_REQUEST_RECORD_RECEIVE_TUNNEL_OFFSET),

View file

@ -375,8 +375,6 @@ namespace transport
m_Socket.close ();
transports.PeerDisconnected (shared_from_this ());
m_Server.RemoveNTCP2Session (shared_from_this ());
if (!m_IntermediateQueue.empty ())
m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue);
for (auto& it: m_SendQueue)
it->Drop ();
m_SendQueue.clear ();
@ -1209,7 +1207,7 @@ namespace transport
void NTCP2Session::MoveSendQueue (std::shared_ptr<NTCP2Session> other)
{
if (!other || m_SendQueue.empty ()) return;
std::list<std::shared_ptr<I2NPMessage> > msgs;
std::vector<std::shared_ptr<I2NPMessage> > msgs;
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it: m_SendQueue)
if (!it->IsExpired (ts))
@ -1218,7 +1216,7 @@ namespace transport
it->Drop ();
m_SendQueue.clear ();
if (!msgs.empty ())
other->SendI2NPMessages (msgs);
other->PostI2NPMessages (msgs);
}
size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len)
@ -1299,42 +1297,20 @@ namespace transport
m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go
}
void NTCP2Session::SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs)
void NTCP2Session::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
{
if (m_IsTerminated || msgs.empty ())
{
msgs.clear ();
return;
}
bool empty = false;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
empty = m_IntermediateQueue.empty ();
m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs);
}
if (empty)
m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this ()));
m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this (), msgs));
}
void NTCP2Session::PostI2NPMessages ()
void NTCP2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
{
if (m_IsTerminated) return;
std::list<std::shared_ptr<I2NPMessage> > msgs;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
m_IntermediateQueue.swap (msgs);
}
bool isSemiFull = m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE/2;
if (isSemiFull)
{
for (auto it: msgs)
if (it->onDrop)
it->Drop (); // drop earlier because we can handle it
else
m_SendQueue.push_back (std::move (it));
}
else
m_SendQueue.splice (m_SendQueue.end (), msgs);
for (auto it: msgs)
if (isSemiFull && it->onDrop)
it->Drop (); // drop earlier because we can handle it
else
m_SendQueue.push_back (std::move (it));
if (!m_IsSending && m_IsEstablished)
SendQueue ();
@ -1846,7 +1822,7 @@ namespace transport
LogPrint(eLogError, "NTCP2: HTTP proxy write error ", ec.message());
});
auto readbuff = std::make_shared<boost::asio::streambuf>();
boost::asio::streambuf * readbuff = new boost::asio::streambuf;
boost::asio::async_read_until(conn->GetSocket(), *readbuff, "\r\n\r\n",
[readbuff, timer, conn] (const boost::system::error_code & ec, std::size_t transferred)
{
@ -1866,6 +1842,7 @@ namespace transport
{
timer->cancel();
conn->ClientLogin();
delete readbuff;
return;
}
else
@ -1875,6 +1852,7 @@ namespace transport
LogPrint(eLogError, "NTCP2: HTTP proxy gave malformed response");
timer->cancel();
conn->Terminate();
delete readbuff;
}
});
break;

View file

@ -153,7 +153,7 @@ namespace transport
void ServerLogin (); // Bob
void SendLocalRouterInfo (bool update) override; // after handshake or by update
void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) override;
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
void MoveSendQueue (std::shared_ptr<NTCP2Session> other);
private:
@ -196,7 +196,7 @@ namespace transport
void SendRouterInfo ();
void SendTermination (NTCP2TerminationReason reason);
void SendTerminationAndTerminate (NTCP2TerminationReason reason);
void PostI2NPMessages ();
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
private:
@ -229,10 +229,7 @@ namespace transport
bool m_IsSending, m_IsReceiving;
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
uint64_t m_NextRouterInfoResendTime; // seconds since epoch
std::list<std::shared_ptr<I2NPMessage> > m_IntermediateQueue; // from transports
mutable std::mutex m_IntermediateQueueMutex;
uint16_t m_PaddingSizes[16];
int m_NextPaddingSize;
};

View file

@ -480,7 +480,7 @@ namespace data
void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills)
{
LogPrint(eLogInfo, "NetDB: Reseeding from floodfill ", ri.GetIdentHashBase64());
std::list<std::shared_ptr<i2p::I2NPMessage> > requests;
std::vector<std::shared_ptr<i2p::I2NPMessage> > requests;
i2p::data::IdentHash ourIdent = i2p::context.GetIdentHash();
i2p::data::IdentHash ih = ri.GetIdentHash();

View file

@ -1489,7 +1489,7 @@ namespace i2p
void RouterContext::UpdateCongestion ()
{
auto c = i2p::data::RouterInfo::eLowCongestion;
if (!AcceptsTunnels () || !m_ShareRatio || (m_Error == eRouterErrorSymmetricNAT && !SupportsV6 () && !SupportsMesh ()))
if (!AcceptsTunnels () || !m_ShareRatio)
c = i2p::data::RouterInfo::eRejectAll;
else
{

View file

@ -477,7 +477,7 @@ namespace transport
HandleReceivedPackets (std::move (receivedPackets));
}
bool SSU2Server::AddSession (std::shared_ptr<SSU2Session> session)
void SSU2Server::AddSession (std::shared_ptr<SSU2Session> session)
{
if (session)
{
@ -485,10 +485,8 @@ namespace transport
{
if (session->GetState () != eSSU2SessionStatePeerTest)
AddSessionByRouterHash (session);
return true;
}
}
return false;
}
void SSU2Server::RemoveSession (uint64_t connID)

View file

@ -42,8 +42,8 @@ namespace transport
const int SSU2_KEEP_ALIVE_INTERVAL = 15; // in seconds
const int SSU2_KEEP_ALIVE_INTERVAL_VARIANCE = 4; // in seconds
const int SSU2_PROXY_CONNECT_RETRY_TIMEOUT = 30; // in seconds
const int SSU2_MIN_HOLE_PUNCH_EXPIRATION = 30; // in seconds
const int SSU2_MAX_HOLE_PUNCH_EXPIRATION = 160; // in seconds
const int SSU2_MIN_HOLE_PUNCH_EXPIRATION = 45; // in seconds
const int SSU2_MAX_HOLE_PUNCH_EXPIRATION = 181; // in seconds
const size_t SSU2_MAX_NUM_PACKETS_PER_BATCH = 64;
class SSU2Server: private i2p::util::RunnableServiceWithWork
@ -85,7 +85,7 @@ namespace transport
bool IsSyncClockFromPeers () const { return m_IsSyncClockFromPeers; };
void AdjustTimeOffset (int64_t offset, std::shared_ptr<const i2p::data::IdentityEx> from);
bool AddSession (std::shared_ptr<SSU2Session> session);
void AddSession (std::shared_ptr<SSU2Session> session);
void RemoveSession (uint64_t connID);
void RequestRemoveSession (uint64_t connID);
void AddSessionByRouterHash (std::shared_ptr<SSU2Session> session);

View file

@ -83,7 +83,7 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo::Address> addr, bool noise):
TransportSession (in_RemoteRouter, SSU2_CONNECT_TIMEOUT),
m_Server (server), m_Address (addr), m_RemoteTransports (0), m_RemotePeerTestTransports (0),
m_RemoteVersion (0), m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown),
m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown),
m_SendPacketNum (0), m_ReceivePacketNum (0), m_LastDatetimeSentPacketNum (0),
m_IsDataReceived (false), m_RTT (SSU2_UNKNOWN_RTT),
m_MsgLocalExpirationTimeout (I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MAX),
@ -103,7 +103,6 @@ namespace transport
InitNoiseXKState1 (*m_NoiseState, m_Address->s);
m_RemoteEndpoint = boost::asio::ip::udp::endpoint (m_Address->host, m_Address->port);
m_RemoteTransports = in_RemoteRouter->GetCompatibleTransports (false);
m_RemoteVersion = in_RemoteRouter->GetVersion ();
if (in_RemoteRouter->IsSSU2PeerTesting (true)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V4;
if (in_RemoteRouter->IsSSU2PeerTesting (false)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V6;
RAND_bytes ((uint8_t *)&m_DestConnID, 8);
@ -293,8 +292,6 @@ namespace transport
m_SentHandshakePacket.reset (nullptr);
m_SessionConfirmedFragment.reset (nullptr);
m_PathChallenge.reset (nullptr);
if (!m_IntermediateQueue.empty ())
m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue);
for (auto& it: m_SendQueue)
it->Drop ();
m_SendQueue.clear ();
@ -374,31 +371,14 @@ namespace transport
}
void SSU2Session::SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs)
void SSU2Session::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
{
if (m_State == eSSU2SessionStateTerminated || msgs.empty ())
{
msgs.clear ();
return;
}
bool empty = false;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
empty = m_IntermediateQueue.empty ();
m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs);
}
if (empty)
m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this ()));
m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this (), msgs));
}
void SSU2Session::PostI2NPMessages ()
void SSU2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
{
if (m_State == eSSU2SessionStateTerminated) return;
std::list<std::shared_ptr<I2NPMessage> > msgs;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
m_IntermediateQueue.swap (msgs);
}
uint64_t mts = i2p::util::GetMonotonicMicroseconds ();
bool isSemiFull = false;
if (m_SendQueue.size ())
@ -412,24 +392,16 @@ namespace transport
" is semi-full (size = ", m_SendQueue.size (), ", lag = ", queueLag / 1000, ", rtt = ", (int)m_RTT, ")");
}
}
if (isSemiFull)
{
for (auto it: msgs)
{
if (it->onDrop)
it->Drop (); // drop earlier because we can handle it
else
{
it->SetEnqueueTime (mts);
m_SendQueue.push_back (std::move (it));
}
}
}
else
for (auto it: msgs)
{
for (auto& it: msgs) it->SetEnqueueTime (mts);
m_SendQueue.splice (m_SendQueue.end (), msgs);
}
if (isSemiFull && it->onDrop)
it->Drop (); // drop earlier because we can handle it
else
{
it->SetEnqueueTime (mts);
m_SendQueue.push_back (std::move (it));
}
}
if (IsEstablished ())
{
SendQueue ();
@ -442,7 +414,7 @@ namespace transport
void SSU2Session::MoveSendQueue (std::shared_ptr<SSU2Session> other)
{
if (!other || m_SendQueue.empty ()) return;
std::list<std::shared_ptr<I2NPMessage> > msgs;
std::vector<std::shared_ptr<I2NPMessage> > msgs;
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it: m_SendQueue)
if (!it->IsExpired (ts))
@ -451,7 +423,7 @@ namespace transport
it->Drop ();
m_SendQueue.clear ();
if (!msgs.empty ())
other->SendI2NPMessages (msgs);
other->PostI2NPMessages (msgs);
}
bool SSU2Session::SendQueue ()
@ -1213,8 +1185,7 @@ namespace transport
m_RemotePeerTestTransports = 0;
if (ri->IsSSU2PeerTesting (true)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V4;
if (ri->IsSSU2PeerTesting (false)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V6;
m_RemoteVersion = ri->GetVersion ();
// handle other blocks
HandlePayload (decryptedPayload.data () + riSize + 3, decryptedPayload.size () - riSize - 3);
Established ();
@ -1958,28 +1929,25 @@ namespace transport
return;
}
auto mts = i2p::util::GetMillisecondsSinceEpoch ();
uint32_t nonce = bufbe32toh (buf + 1);
if (session->m_RelaySessions.emplace (nonce, std::make_pair (shared_from_this (), mts/1000)).second)
{
// send relay intro to Charlie
auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ()); // Alice's RI
if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr;
if (!r) LogPrint (eLogWarning, "SSU2: RelayRequest Alice's router info not found");
session->m_RelaySessions.emplace (bufbe32toh (buf + 1), // nonce
std::make_pair (shared_from_this (), mts/1000) );
auto packet = m_Server.GetSentPacketsPool ().AcquireShared ();
packet->payloadSize = r ? CreateRouterInfoBlock (packet->payload, m_MaxPayloadSize - len - 32, r) : 0;
if (!packet->payloadSize && r)
session->SendFragmentedMessage (CreateDatabaseStoreMsg (r));
packet->payloadSize += CreateRelayIntroBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize, buf + 1, len -1);
if (packet->payloadSize < m_MaxPayloadSize)
packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize);
uint32_t packetNum = session->SendData (packet->payload, packet->payloadSize);
packet->sendTime = mts;
// Charlie always responds with RelayResponse
session->m_SentPackets.emplace (packetNum, packet);
}
else
LogPrint (eLogInfo, "SSU2: Relay request nonce ", nonce, " already exists. Ignore");
// send relay intro to Charlie
auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ()); // Alice's RI
if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr;
if (!r) LogPrint (eLogWarning, "SSU2: RelayRequest Alice's router info not found");
auto packet = m_Server.GetSentPacketsPool ().AcquireShared ();
packet->payloadSize = r ? CreateRouterInfoBlock (packet->payload, m_MaxPayloadSize - len - 32, r) : 0;
if (!packet->payloadSize && r)
session->SendFragmentedMessage (CreateDatabaseStoreMsg (r));
packet->payloadSize += CreateRelayIntroBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize, buf + 1, len -1);
if (packet->payloadSize < m_MaxPayloadSize)
packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize);
uint32_t packetNum = session->SendData (packet->payload, packet->payloadSize);
packet->sendTime = mts;
// Charlie always responds with RelayResponse
session->m_SentPackets.emplace (packetNum, packet);
}
void SSU2Session::HandleRelayIntro (const uint8_t * buf, size_t len, int attempts)
@ -2065,22 +2033,15 @@ namespace transport
{
// send HolePunch
auto holePunchSession = std::make_shared<SSU2HolePunchSession>(m_Server, nonce, ep, addr);
if (m_Server.AddSession (holePunchSession))
holePunchSession->SendHolePunch (packet->payload, packet->payloadSize); // relay response block
else
{
LogPrint (eLogInfo, "SSU2: Relay intro nonce ", nonce, " already exists. Ignore");
return;
}
m_Server.AddSession (holePunchSession);
holePunchSession->SendHolePunch (packet->payload, packet->payloadSize); // relay response block
}
packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize);
uint32_t packetNum = SendData (packet->payload, packet->payloadSize);
if (m_RemoteVersion >= SSU2_MIN_RELAY_RESPONSE_RESEND_VERSION)
{
// sometimes Bob doesn't ack this RelayResponse in older versions
packet->sendTime = i2p::util::GetMillisecondsSinceEpoch ();
m_SentPackets.emplace (packetNum, packet);
}
/*uint32_t packetNum = */SendData (packet->payload, packet->payloadSize);
// sometimes Bob doesn't ack this RelayResponse
// TODO: uncomment line below once the problem is resolved
//packet->sendTime = mts;
//m_SentPackets.emplace (packetNum, packet);
}
void SSU2Session::HandleRelayResponse (const uint8_t * buf, size_t len)
@ -2115,13 +2076,11 @@ namespace transport
memcpy (payload + 3, buf, len); // forward to Alice as is
packet->payloadSize = len + 3;
packet->payloadSize += CreatePaddingBlock (payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize);
uint32_t packetNum = it->second.first->SendData (packet->payload, packet->payloadSize);
if (m_RemoteVersion >= SSU2_MIN_RELAY_RESPONSE_RESEND_VERSION)
{
// sometimes Alice doesn't ack this RelayResponse in older versions
packet->sendTime = i2p::util::GetMillisecondsSinceEpoch ();
it->second.first->m_SentPackets.emplace (packetNum, packet);
}
/*uint32_t packetNum = */it->second.first->SendData (packet->payload, packet->payloadSize);
// sometimes Alice doesn't ack this RelayResponse
// TODO: uncomment line below once the problem is resolved
//packet->sendTime = i2p::util::GetMillisecondsSinceEpoch ();
//it->second.first->m_SentPackets.emplace (packetNum, packet);
}
else
{
@ -2189,33 +2148,29 @@ namespace transport
GetRemoteIdentity ()->GetIdentHash ());
if (session) // session with Charlie
{
if (m_Server.AddPeerTest (nonce, shared_from_this (), ts/1000))
{
auto packet = m_Server.GetSentPacketsPool ().AcquireShared ();
// Alice's RouterInfo
auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ());
if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr;
packet->payloadSize = r ? CreateRouterInfoBlock (packet->payload, m_MaxPayloadSize - len - 32, r) : 0;
if (!packet->payloadSize && r)
session->SendFragmentedMessage (CreateDatabaseStoreMsg (r));
if (packet->payloadSize + len + 48 > m_MaxPayloadSize)
{
// doesn't fit one message, send RouterInfo in separate message
uint32_t packetNum = session->SendData (packet->payload, packet->payloadSize, SSU2_FLAG_IMMEDIATE_ACK_REQUESTED);
packet->sendTime = ts;
session->m_SentPackets.emplace (packetNum, packet);
packet = m_Server.GetSentPacketsPool ().AcquireShared (); // new packet
}
// PeerTest to Charlie
packet->payloadSize += CreatePeerTestBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize, 2,
eSSU2PeerTestCodeAccept, GetRemoteIdentity ()->GetIdentHash (), buf + offset, len - offset);
packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize);
m_Server.AddPeerTest (nonce, shared_from_this (), ts/1000);
auto packet = m_Server.GetSentPacketsPool ().AcquireShared ();
// Alice's RouterInfo
auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ());
if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr;
packet->payloadSize = r ? CreateRouterInfoBlock (packet->payload, m_MaxPayloadSize - len - 32, r) : 0;
if (!packet->payloadSize && r)
session->SendFragmentedMessage (CreateDatabaseStoreMsg (r));
if (packet->payloadSize + len + 48 > m_MaxPayloadSize)
{
// doesn't fit one message, send RouterInfo in separate message
uint32_t packetNum = session->SendData (packet->payload, packet->payloadSize, SSU2_FLAG_IMMEDIATE_ACK_REQUESTED);
packet->sendTime = ts;
session->m_SentPackets.emplace (packetNum, packet);
packet = m_Server.GetSentPacketsPool ().AcquireShared (); // new packet
}
else
LogPrint (eLogInfo, "SSU2: Peer test 1 nonce ", nonce, " already exists. Ignored");
// PeerTest to Charlie
packet->payloadSize += CreatePeerTestBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize, 2,
eSSU2PeerTestCodeAccept, GetRemoteIdentity ()->GetIdentHash (), buf + offset, len - offset);
packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize);
uint32_t packetNum = session->SendData (packet->payload, packet->payloadSize, SSU2_FLAG_IMMEDIATE_ACK_REQUESTED);
packet->sendTime = ts;
session->m_SentPackets.emplace (packetNum, packet);
}
else
{
@ -3082,7 +3037,7 @@ namespace transport
{
if (ts > it->second.second + SSU2_RELAY_NONCE_EXPIRATION_TIMEOUT)
{
LogPrint (eLogInfo, "SSU2: Relay nonce ", it->first, " was not responded in ", SSU2_RELAY_NONCE_EXPIRATION_TIMEOUT, " seconds, deleted");
LogPrint (eLogWarning, "SSU2: Relay nonce ", it->first, " was not responded in ", SSU2_RELAY_NONCE_EXPIRATION_TIMEOUT, " seconds, deleted");
it = m_RelaySessions.erase (it);
}
else

View file

@ -15,7 +15,6 @@
#include <set>
#include <list>
#include <boost/asio.hpp>
#include "version.h"
#include "Crypto.h"
#include "RouterInfo.h"
#include "RouterContext.h"
@ -56,7 +55,6 @@ namespace transport
const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send
const uint8_t SSU2_MAX_NUM_FRAGMENTS = 64;
const int SSU2_SEND_DATETIME_NUM_PACKETS = 256;
const int SSU2_MIN_RELAY_RESPONSE_RESEND_VERSION = MAKE_VERSION_NUMBER(0, 9, 64); // 0.9.64
// flags
const uint8_t SSU2_FLAG_IMMEDIATE_ACK_REQUESTED = 0x01;
@ -261,7 +259,7 @@ namespace transport
void FlushData ();
void Done () override;
void SendLocalRouterInfo (bool update) override;
void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) override;
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
void MoveSendQueue (std::shared_ptr<SSU2Session> other);
uint32_t GetRelayTag () const override { return m_RelayTag; };
size_t Resend (uint64_t ts); // return number of resent packets
@ -307,7 +305,7 @@ namespace transport
void Established ();
void ScheduleConnectTimer ();
void HandleConnectTimer (const boost::system::error_code& ecode);
void PostI2NPMessages ();
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
bool SendQueue (); // returns true if ack block was sent
bool SendFragmentedMessage (std::shared_ptr<I2NPMessage> msg);
void ResendHandshakePacket ();
@ -370,7 +368,6 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo::Address> m_Address;
boost::asio::ip::udp::endpoint m_RemoteEndpoint;
i2p::data::RouterInfo::CompatibleTransports m_RemoteTransports, m_RemotePeerTestTransports;
int m_RemoteVersion;
uint64_t m_DestConnID, m_SourceConnID;
SSU2SessionState m_State;
uint8_t m_KeyDataSend[64], m_KeyDataReceive[64];
@ -381,8 +378,6 @@ namespace transport
std::unordered_map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
i2p::I2NPMessagesHandler m_Handler;
std::list<std::shared_ptr<I2NPMessage> > m_IntermediateQueue; // from transports
mutable std::mutex m_IntermediateQueueMutex;
bool m_IsDataReceived;
double m_RTT;
int m_MsgLocalExpirationTimeout;

View file

@ -1237,8 +1237,7 @@ namespace stream
if (m_Status != eStreamStatusTerminated)
{
m_SendTimer.cancel ();
m_SendTimer.expires_from_now (boost::posix_time::microseconds(
SEND_INTERVAL + m_LocalDestination.GetRandom () % SEND_INTERVAL_VARIANCE));
m_SendTimer.expires_from_now (boost::posix_time::microseconds(SEND_INTERVAL));
m_SendTimer.async_wait (std::bind (&Stream::HandleSendTimer,
shared_from_this (), std::placeholders::_1));
}
@ -1251,17 +1250,8 @@ namespace stream
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
if (m_LastSendTime && ts*1000 > m_LastSendTime*1000 + m_PacingTime)
{
if (m_PacingTime)
{
auto numPackets = std::lldiv (m_PacingTimeRem + ts*1000 - m_LastSendTime*1000, m_PacingTime);
m_NumPacketsToSend = numPackets.quot;
m_PacingTimeRem = numPackets.rem;
}
else
{
LogPrint (eLogError, "Streaming: pacing time is zero");
m_NumPacketsToSend = 1; m_PacingTimeRem = 0;
}
m_NumPacketsToSend = ((ts*1000 - m_LastSendTime*1000) + m_PacingTimeRem) / m_PacingTime;
m_PacingTimeRem = ((ts*1000 - m_LastSendTime*1000) + m_PacingTimeRem) - (m_NumPacketsToSend * m_PacingTime);
m_IsSendTime = true;
if (m_WindowIncCounter && m_WindowSize < MAX_WINDOW_SIZE && !m_SendBuffer.IsEmpty () && m_PacingTime > m_MinPacingTime)
{
@ -1276,12 +1266,10 @@ namespace stream
else
m_WindowSize += (m_WindowSize - (1 - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowSize;
if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE;
m_WindowIncCounter--;
m_WindowIncCounter --;
UpdatePacingTime ();
}
else
break;
}
UpdatePacingTime ();
}
if (m_IsNAcked)
ResendPacket ();

View file

@ -69,8 +69,7 @@ namespace stream
const int PENDING_INCOMING_TIMEOUT = 10; // in seconds
const int MAX_RECEIVE_TIMEOUT = 20; // in seconds
const uint16_t DELAY_CHOKING = 60000; // in milliseconds
const uint64_t SEND_INTERVAL = 10000; // in microseconds
const uint64_t SEND_INTERVAL_VARIANCE = 2000; // in microseconds
const uint64_t SEND_INTERVAL = 1000; // in microseconds
const uint64_t REQUEST_IMMEDIATE_ACK_INTERVAL = 7500; // in milliseconds
const uint64_t REQUEST_IMMEDIATE_ACK_INTERVAL_VARIANCE = 3200; // in milliseconds
const bool LOSS_BASED_CONTROL_ENABLED = 1; // 0/1

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2024, The PurpleI2P Project
* Copyright (c) 2013-2022, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
@ -59,7 +59,8 @@ namespace tunnel
auto num = m_TunnelDataMsgs.size ();
if (num > 1)
LogPrint (eLogDebug, "TransitTunnel: ", GetTunnelID (), "->", GetNextTunnelID (), " ", num);
i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs); // send and clear
i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs);
m_TunnelDataMsgs.clear ();
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2024, The PurpleI2P Project
* Copyright (c) 2013-2023, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
@ -10,7 +10,7 @@
#define TRANSIT_TUNNEL_H__
#include <inttypes.h>
#include <list>
#include <vector>
#include <mutex>
#include <memory>
#include "Crypto.h"
@ -61,7 +61,7 @@ namespace tunnel
private:
size_t m_NumTransmittedBytes;
std::list<std::shared_ptr<i2p::I2NPMessage> > m_TunnelDataMsgs;
std::vector<std::shared_ptr<i2p::I2NPMessage> > m_TunnelDataMsgs;
};
class TransitTunnelGateway: public TransitTunnel

View file

@ -144,12 +144,8 @@ namespace transport
void SetLastActivityTimestamp (uint64_t ts) { m_LastActivityTimestamp = ts; };
virtual uint32_t GetRelayTag () const { return 0; };
virtual void SendLocalRouterInfo (bool update = false)
{
std::list<std::shared_ptr<I2NPMessage> > msgs{ CreateDatabaseStoreMsg () };
SendI2NPMessages (msgs);
};
virtual void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) = 0;
virtual void SendLocalRouterInfo (bool update = false) { SendI2NPMessages ({ CreateDatabaseStoreMsg () }); };
virtual void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) = 0;
virtual bool IsEstablished () const = 0;
private:

View file

@ -450,25 +450,15 @@ namespace transport
void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg)
{
if (m_IsOnline)
SendMessages (ident, { msg });
SendMessages (ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > {msg });
}
void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs)
void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs)
{
std::list<std::shared_ptr<i2p::I2NPMessage> > msgs1;
msgs.swap (msgs1);
SendMessages (ident, std::move (msgs1));
m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs));
}
void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs)
{
m_Service->post ([this, ident, msgs = std::move(msgs)] () mutable
{
PostMessages (ident, msgs);
});
}
void Transports::PostMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs)
void Transports::PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs)
{
if (ident == i2p::context.GetRouterInfo ().GetIdentHash ())
{
@ -527,16 +517,11 @@ namespace transport
return;
}
}
if (sz > MAX_NUM_DELAYED_MESSAGES/2)
{
for (auto& it1: msgs)
if (it1->onDrop)
it1->Drop (); // drop earlier because we can handle it
else
peer->delayedMessages.push_back (it1);
}
else
peer->delayedMessages.splice (peer->delayedMessages.end (), msgs);
for (auto& it1: msgs)
if (sz > MAX_NUM_DELAYED_MESSAGES/2 && it1->onDrop)
it1->Drop (); // drop earlier because we can handle it
else
peer->delayedMessages.push_back (it1);
}
else
{
@ -880,7 +865,7 @@ namespace transport
if (it->second->delayedMessages.size () > 0)
{
// check if first message is our DatabaseStore (publishing)
auto firstMsg = peer->delayedMessages.front ();
auto firstMsg = peer->delayedMessages[0];
if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore &&
i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ())
sendDatabaseStore = false; // we have it in the list already
@ -890,7 +875,8 @@ namespace transport
else
session->SetTerminationTimeout (10); // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds
peer->sessions.push_back (session);
session->SendI2NPMessages (peer->delayedMessages); // send and clear
session->SendI2NPMessages (peer->delayedMessages);
peer->delayedMessages.clear ();
}
else // incoming connection or peer test
{
@ -901,10 +887,7 @@ namespace transport
return;
}
if (!session->IsOutgoing ()) // incoming
{
std::list<std::shared_ptr<I2NPMessage> > msgs{ CreateDatabaseStoreMsg () };
session->SendI2NPMessages (msgs); // send DatabaseStore
}
session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore
auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed
if (r) r->GetProfile ()->Connected ();
auto ts = i2p::util::GetSecondsSinceEpoch ();

View file

@ -71,7 +71,7 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo> router;
std::list<std::shared_ptr<TransportSession> > sessions;
uint64_t creationTime, nextRouterInfoUpdateTime, lastSelectionTime;
std::list<std::shared_ptr<i2p::I2NPMessage> > delayedMessages;
std::vector<std::shared_ptr<i2p::I2NPMessage> > delayedMessages;
std::vector<i2p::data::RouterInfo::SupportedTransports> priority;
bool isHighBandwidth, isEligible;
@ -141,8 +141,7 @@ namespace transport
void ReuseX25519KeysPair (std::shared_ptr<i2p::crypto::X25519Keys> pair);
void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg);
void SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs);
void SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs);
void SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs);
void PeerConnected (std::shared_ptr<TransportSession> session);
void PeerDisconnected (std::shared_ptr<TransportSession> session);
@ -186,7 +185,7 @@ namespace transport
void Run ();
void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident);
void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, i2p::data::IdentHash ident);
void PostMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs);
void PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs);
bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr<Peer> peer);
void SetPriority (std::shared_ptr<Peer> peer) const;
void HandlePeerCleanupTimer (const boost::system::error_code& ecode);

View file

@ -350,8 +350,7 @@ namespace tunnel
Tunnels::Tunnels (): m_IsRunning (false), m_Thread (nullptr), m_MaxNumTransitTunnels (DEFAULT_MAX_NUM_TRANSIT_TUNNELS),
m_TotalNumSuccesiveTunnelCreations (0), m_TotalNumFailedTunnelCreations (0), // for normal average
m_TunnelCreationSuccessRate (TCSR_START_VALUE), m_TunnelCreationAttemptsNum(0),
m_Rng(i2p::util::GetMonotonicMicroseconds ()%1000000LL)
m_TunnelCreationSuccessRate (TCSR_START_VALUE), m_TunnelCreationAttemptsNum(0)
{
}

View file

@ -18,7 +18,6 @@
#include <thread>
#include <mutex>
#include <memory>
#include <random>
#include "util.h"
#include "Queue.h"
#include "Crypto.h"
@ -245,8 +244,6 @@ namespace tunnel
uint32_t GetMaxNumTransitTunnels () const { return m_MaxNumTransitTunnels; };
int GetCongestionLevel() const { return m_MaxNumTransitTunnels ? CONGESTION_LEVEL_FULL * m_TransitTunnels.size() / m_MaxNumTransitTunnels : CONGESTION_LEVEL_FULL; }
std::mt19937& GetRng () { return m_Rng; };
private:
template<class TTunnel>
@ -310,7 +307,6 @@ namespace tunnel
int m_TotalNumSuccesiveTunnelCreations, m_TotalNumFailedTunnelCreations;
double m_TunnelCreationSuccessRate;
int m_TunnelCreationAttemptsNum;
std::mt19937 m_Rng;
public:

View file

@ -221,7 +221,7 @@ namespace tunnel
void TunnelGateway::SendBuffer ()
{
m_Buffer.CompleteCurrentTunnelDataMessage ();
std::list<std::shared_ptr<I2NPMessage> > newTunnelMsgs;
std::vector<std::shared_ptr<I2NPMessage> > newTunnelMsgs;
const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs ();
for (auto& tunnelMsg : tunnelDataMsgs)
{
@ -234,7 +234,7 @@ namespace tunnel
m_NumSentBytes += TUNNEL_DATA_MSG_SIZE;
}
m_Buffer.ClearTunnelDataMsgs ();
i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), std::move (newTunnelMsgs));
i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), newTunnelMsgs);
}
}
}

View file

@ -305,7 +305,7 @@ namespace client
identHash = hash;
}
AddressBook::AddressBook (): m_Storage(nullptr), m_IsLoaded (false),
AddressBook::AddressBook (): m_Storage(nullptr), m_IsLoaded (false), m_IsDownloading (false),
m_NumRetries (0), m_DefaultSubscription (nullptr), m_SubscriptionsUpdateTimer (nullptr),
m_IsEnabled (true)
{
@ -344,28 +344,20 @@ namespace client
delete m_SubscriptionsUpdateTimer;
m_SubscriptionsUpdateTimer = nullptr;
}
bool isDownloading = m_Downloading.valid ();
if (isDownloading)
if (m_IsDownloading)
{
if (m_Downloading.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
isDownloading = false;
else
{
LogPrint (eLogInfo, "Addressbook: Subscriptions are downloading, abort");
for (int i = 0; i < 30; i++)
LogPrint (eLogInfo, "Addressbook: Subscriptions are downloading, abort");
for (int i = 0; i < 30; i++)
{
if (!m_IsDownloading)
{
if (m_Downloading.wait_for(std::chrono::seconds(1)) == std::future_status::ready) // wait for 1 seconds
{
isDownloading = false;
LogPrint (eLogInfo, "Addressbook: Subscriptions download complete");
break;
}
LogPrint (eLogInfo, "Addressbook: Subscriptions download complete");
break;
}
}
if (!isDownloading)
m_Downloading.get ();
else
LogPrint (eLogError, "Addressbook: Subscription download timeout");
std::this_thread::sleep_for (std::chrono::seconds (1)); // wait for 1 seconds
}
LogPrint (eLogError, "Addressbook: Subscription download timeout");
m_IsDownloading = false;
}
if (m_Storage)
{
@ -590,15 +582,16 @@ namespace client
}
else
{
LogPrint (eLogInfo, "Addressbook: Loading subscriptions from config");
LogPrint (eLogInfo, "Addressbook: Loading subscriptions from config file");
// using config file items
std::string subscriptionURLs; i2p::config::GetOption("addressbook.subscriptions", subscriptionURLs);
std::vector<std::string> subsList;
boost::split(subsList, subscriptionURLs, boost::is_any_of(","), boost::token_compress_on);
for (const auto& s: subsList)
if (!s.empty ())
m_Subscriptions.push_back (std::make_shared<AddressBookSubscription> (*this, s));
{
m_Subscriptions.push_back (std::make_shared<AddressBookSubscription> (*this, s));
}
LogPrint (eLogInfo, "Addressbook: ", m_Subscriptions.size (), " subscriptions urls loaded");
}
}
@ -652,6 +645,7 @@ namespace client
void AddressBook::DownloadComplete (bool success, const i2p::data::IdentHash& subscription, const std::string& etag, const std::string& lastModified)
{
m_IsDownloading = false;
m_NumRetries++;
int nextUpdateTimeout = m_NumRetries*CONTINIOUS_SUBSCRIPTION_RETRY_TIMEOUT;
if (m_NumRetries > CONTINIOUS_SUBSCRIPTION_MAX_NUM_RETRIES || nextUpdateTimeout > CONTINIOUS_SUBSCRIPTION_UPDATE_TIMEOUT)
@ -706,13 +700,7 @@ namespace client
LogPrint(eLogWarning, "Addressbook: Missing local destination, skip subscription update");
return;
}
bool isDownloading = m_Downloading.valid ();
if (isDownloading && m_Downloading.wait_for(std::chrono::seconds(0)) == std::future_status::ready) // still active?
{
m_Downloading.get ();
isDownloading = false;
}
if (!isDownloading && dest->IsReady ())
if (!m_IsDownloading && dest->IsReady ())
{
if (!m_IsLoaded)
{
@ -721,15 +709,17 @@ namespace client
std::string defaultSubURL; i2p::config::GetOption("addressbook.defaulturl", defaultSubURL);
if (!m_DefaultSubscription)
m_DefaultSubscription = std::make_shared<AddressBookSubscription>(*this, defaultSubURL);
m_Downloading = std::async (std::launch::async,
std::bind (&AddressBookSubscription::CheckUpdates, m_DefaultSubscription));
m_IsDownloading = true;
std::thread load_hosts(std::bind (&AddressBookSubscription::CheckUpdates, m_DefaultSubscription));
load_hosts.detach(); // TODO: use join
}
else if (!m_Subscriptions.empty ())
{
// pick random subscription
auto ind = rand () % m_Subscriptions.size();
m_Downloading = std::async (std::launch::async,
std::bind (&AddressBookSubscription::CheckUpdates, m_Subscriptions[ind]));
m_IsDownloading = true;
std::thread load_hosts(std::bind (&AddressBookSubscription::CheckUpdates, m_Subscriptions[ind]));
load_hosts.detach(); // TODO: use join
}
}
else
@ -833,7 +823,7 @@ namespace client
}
}
AddressBookSubscription::AddressBookSubscription (AddressBook& book, std::string_view link):
AddressBookSubscription::AddressBookSubscription (AddressBook& book, const std::string& link):
m_Book (book), m_Link (link)
{
}

View file

@ -11,12 +11,10 @@
#include <string.h>
#include <string>
#include <string_view>
#include <map>
#include <vector>
#include <iostream>
#include <mutex>
#include <future>
#include <memory>
#include <boost/asio.hpp>
#include "Base.h"
@ -126,8 +124,7 @@ namespace client
std::mutex m_LookupsMutex;
std::map<uint32_t, std::string> m_Lookups; // nonce -> address
AddressBookStorage * m_Storage;
volatile bool m_IsLoaded;
std::future<void> m_Downloading;
volatile bool m_IsLoaded, m_IsDownloading;
int m_NumRetries;
std::vector<std::shared_ptr<AddressBookSubscription> > m_Subscriptions;
std::shared_ptr<AddressBookSubscription> m_DefaultSubscription; // in case if we don't know any addresses yet
@ -139,7 +136,7 @@ namespace client
{
public:
AddressBookSubscription (AddressBook& book, std::string_view link);
AddressBookSubscription (AddressBook& book, const std::string& link);
void CheckUpdates ();
private:

View file

@ -203,7 +203,7 @@ namespace client
std::vector<std::shared_ptr<DatagramSessionInfo> > sessions;
std::lock_guard<std::mutex> lock (m_SessionsMutex);
for (const auto &it: m_Sessions)
for (auto it: m_Sessions)
{
auto s = it.second;
if (!s->m_Destination) continue;