From cf48c205e199677df54b80405bcb6d56f836468c Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 6 Oct 2025 21:10:59 -0400 Subject: [PATCH] save unacked seqn. handle acked option. calculate RTT --- libi2pd_client/UDPTunnel.cpp | 37 +++++++++++++++++++++++++++--------- libi2pd_client/UDPTunnel.h | 12 ++++++++---- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/libi2pd_client/UDPTunnel.cpp b/libi2pd_client/UDPTunnel.cpp index 200d6034..edbb116b 100644 --- a/libi2pd_client/UDPTunnel.cpp +++ b/libi2pd_client/UDPTunnel.cpp @@ -38,10 +38,10 @@ namespace client if (options->Get (UDP_SESSION_SEQN, seqn) && seqn > m_LastSession->m_LastReceivedPacketNum) { m_LastSession->m_LastReceivedPacketNum = seqn; - i2p::util::Mapping options; - options.Put (UDP_SESSION_ACKED, m_LastSession->m_LastReceivedPacketNum); + i2p::util::Mapping replyOptions; + replyOptions.Put (UDP_SESSION_ACKED, m_LastSession->m_LastReceivedPacketNum); m_LastSession->m_Destination->SendDatagram(m_LastSession->GetDatagramSession (), - nullptr, 0, m_LastSession->LocalPort, m_LastSession->RemotePort, &options); // Ack only, no payload + nullptr, 0, m_LastSession->LocalPort, m_LastSession->RemotePort, &replyOptions); // Ack only, no payload } } } @@ -277,7 +277,7 @@ namespace client m_Name (name), m_RemoteDest (remoteDest), m_LocalDest (localDestination), m_LocalEndpoint (localEndpoint), m_ResolveThread (nullptr), m_LocalSocket (nullptr), RemotePort (remotePort), m_LastPort (0), m_cancel_resolve (false), m_Gzip (gzip), m_DatagramVersion (datagramVersion), - m_NextSendPacketNum (1), m_LastReceivedPacketNum (0) + m_NextSendPacketNum (1), m_LastReceivedPacketNum (0), m_RTT (0) { } @@ -377,10 +377,13 @@ namespace client auto ts = i2p::util::GetMillisecondsSinceEpoch (); LogPrint (eLogDebug, "UDP Client: Send ", transferred, " to ", m_RemoteAddr->identHash.ToBase32 (), ":", RemotePort); auto session = GetDatagramSession (); - if (ts > m_LastSession->second + I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL) + uint64_t repliableDatagramInterval = I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL; + if (m_RTT && m_RTT < I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL*10) repliableDatagramInterval = m_RTT/10; + if (ts > m_LastSession->second + repliableDatagramInterval ) { if (m_DatagramVersion == i2p::datagram::eDatagramV3) { + m_UnackedDatagrams.push_back ({ m_NextSendPacketNum, ts }); i2p::util::Mapping options; options.Put (UDP_SESSION_SEQN, m_NextSendPacketNum); if (m_LastReceivedPacketNum > 0) @@ -467,11 +470,13 @@ namespace client if (options->Get (UDP_SESSION_SEQN, seqn) && seqn > m_LastReceivedPacketNum) { m_LastReceivedPacketNum = seqn; - i2p::util::Mapping options; - options.Put (UDP_SESSION_ACKED, m_LastReceivedPacketNum); + i2p::util::Mapping replyOptions; + replyOptions.Put (UDP_SESSION_ACKED, m_LastReceivedPacketNum); m_LocalDest->GetDatagramDestination ()->SendDatagram (GetDatagramSession (), - nullptr, 0, m_LastPort, RemotePort, &options); // Ack only, no payload - } + nullptr, 0, m_LastPort, RemotePort, &replyOptions); // Ack only, no payload + } + if (options->Get (UDP_SESSION_ACKED, seqn)) + Acked (seqn); } if (len > 0) HandleRecvFromI2PRaw (fromPort, toPort, buf, len); @@ -503,5 +508,19 @@ namespace client LogPrint (eLogWarning, "UDP Client: Not tracking udp session using port ", (int) toPort); } + void I2PUDPClientTunnel::Acked (uint32_t seqn) + { + if (m_UnackedDatagrams.empty () && seqn < m_UnackedDatagrams.front ().first) return; + auto it = m_UnackedDatagrams.begin (); + while (it != m_UnackedDatagrams.end ()) + { + if (it->first > seqn) break; + if (it->first == seqn) + m_RTT = i2p::util::GetMillisecondsSinceEpoch () - it->second; + it++; + } + m_UnackedDatagrams.erase (m_UnackedDatagrams.begin (), it); + } + } } diff --git a/libi2pd_client/UDPTunnel.h b/libi2pd_client/UDPTunnel.h index 5a43fcbc..6a8d3c9f 100644 --- a/libi2pd_client/UDPTunnel.h +++ b/libi2pd_client/UDPTunnel.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include "util.h" @@ -32,7 +33,7 @@ namespace client /** max size for i2p udp */ const size_t I2P_UDP_MAX_MTU = 64*1024; - + struct UDPSession { i2p::datagram::DatagramDestination * m_Destination; @@ -110,7 +111,7 @@ namespace client void HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); UDPSessionPtr ObtainUDPSession (const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); uint32_t GetSessionIndex (uint16_t fromPort, uint16_t toPort) const { return ((uint32_t)fromPort << 16) + toPort; } - + private: bool m_IsUniqueLocal; @@ -165,7 +166,8 @@ namespace client void HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); void TryResolving (); std::shared_ptr GetDatagramSession (); - + void Acked (uint32_t seqn); + private: const std::string m_Name; @@ -186,7 +188,9 @@ namespace client std::shared_ptr m_LastSession; std::weak_ptr m_LastDatagramSession; uint32_t m_NextSendPacketNum, m_LastReceivedPacketNum; - + std::list > m_UnackedDatagrams; // list of sent but not acked repliable datagrams(seqn, timestamp) in ascending order + uint64_t m_RTT; // milliseconds + public: bool isUpdated; // transient, used during reload only