From 253a892b0fec69409bab050ab58c3499753769cb Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 19 Jul 2025 09:25:51 -0400 Subject: [PATCH] limit number of outbound packets if the peer can't handle it --- libi2pd/Garlic.h | 4 +++- libi2pd/Streaming.cpp | 35 ++++++++++++++++++++++++----------- libi2pd/Streaming.h | 4 +++- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/libi2pd/Garlic.h b/libi2pd/Garlic.h index 25106c45..055c778d 100644 --- a/libi2pd/Garlic.h +++ b/libi2pd/Garlic.h @@ -87,8 +87,10 @@ namespace garlic { std::shared_ptr outboundTunnel; std::shared_ptr remoteLease; + // for streaming only int rtt; // RTT uint32_t updateTime; // seconds since epoch + bool isJava; // based on choked value }; class GarlicDestination; @@ -129,7 +131,7 @@ namespace garlic std::shared_ptr GetSharedRoutingPath (); void SetSharedRoutingPath (std::shared_ptr path); - + GarlicDestination * GetOwner () const { return m_Owner; } void SetOwner (GarlicDestination * owner) { m_Owner = owner; } diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index f0469719..4c676e86 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -89,10 +89,10 @@ namespace stream m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed m_Status (eStreamStatusNew), m_IsIncoming (false), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsChoking2 (false), m_IsClientChoked (false), m_IsClientChoked2 (false), - m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_IsBufferEmpty (false), m_LocalDestination (local), + m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_IsBufferEmpty (false), m_IsJavaClient (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), - m_RTT (INITIAL_RTT), m_MinRTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_FastRTT (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), + m_RTT (INITIAL_RTT), m_MinRTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_FastRTT (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_MaxWindowSize (MAX_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT), m_Jitter (0), m_MinPacingTime (0), @@ -117,10 +117,10 @@ namespace stream m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed m_Status (eStreamStatusNew), m_IsIncoming (true), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsChoking2 (false), m_IsClientChoked (false), m_IsClientChoked2 (false), - m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_IsBufferEmpty (false), m_LocalDestination (local), + m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_IsBufferEmpty (false), m_IsJavaClient (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), m_MinRTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_FastRTT (INITIAL_RTT), - m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0), + m_WindowSize (INITIAL_WINDOW_SIZE), m_MaxWindowSize (MAX_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT), m_Jitter (0), m_MinPacingTime (0), m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_LastACKRecieveTime (0), m_ACKRecieveInterval (local.GetOwner ()->GetStreamingAckDelay ()), m_RemoteLeaseChangeTime (0), m_LastWindowIncTime (0), @@ -421,6 +421,17 @@ namespace stream else if (!m_IsClientChoked) { LogPrint (eLogDebug, "Streaming: Client choked, set min. window size"); + if (delayRequested == DELAY_CHOKING_JAVA) // java detected + { + LogPrint (eLogDebug, "Streaming: limit window size for java client"); + m_MaxWindowSize = 32; + m_IsJavaClient = true; + if (m_RoutingSession) + { + auto path = m_RoutingSession->GetSharedRoutingPath (); + if (path) path->isJava = true; + } + } m_WindowDropTargetSize = MIN_WINDOW_SIZE; m_LastWindowDropSize = 0; m_WindowIncCounter = 0; @@ -646,7 +657,7 @@ namespace stream m_SentPackets.erase (it++); m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; - if (m_WindowIncCounter < MAX_WINDOW_SIZE && !m_IsFirstACK && !m_IsWinDropped) + if (m_WindowIncCounter < m_MaxWindowSize && !m_IsFirstACK && !m_IsWinDropped) incCounter++; } else @@ -761,7 +772,7 @@ namespace stream if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath ( std::make_shared ( - i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0})); + i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0, false})); m_IsFirstACK = false; } if (acknowledged) @@ -1276,6 +1287,8 @@ namespace stream m_CurrentOutboundTunnel = routingPath->outboundTunnel; m_CurrentRemoteLease = routingPath->remoteLease; m_RTT = routingPath->rtt; + m_IsJavaClient = routingPath->isJava; + if (m_IsJavaClient) m_MaxWindowSize = 32; } } @@ -1416,12 +1429,12 @@ namespace stream m_NumPacketsToSend = 1; m_PacingTimeRem = 0; } m_IsSendTime = true; - if (m_WindowIncCounter && (m_WindowSize < MAX_WINDOW_SIZE || m_WindowDropTargetSize) && !m_SendBuffer.IsEmpty () && m_PacingTime > m_MinPacingTime) + if (m_WindowIncCounter && (m_WindowSize < m_MaxWindowSize || m_WindowDropTargetSize) && !m_SendBuffer.IsEmpty () && m_PacingTime > m_MinPacingTime) { float winSize = m_WindowSize; if (m_WindowDropTargetSize) winSize = m_WindowDropTargetSize; - float maxWinSize = MAX_WINDOW_SIZE; + float maxWinSize = m_MaxWindowSize; if (m_LastWindowIncTime) maxWinSize = (ts - m_LastWindowIncTime) / (m_RTT / MAX_WINDOW_SIZE_INC_PER_RTT) + winSize; for (int i = 0; i < m_NumPacketsToSend; i++) @@ -1436,7 +1449,7 @@ namespace stream m_WindowDropTargetSize += (m_WindowDropTargetSize - (m_LastWindowDropSize - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowDropTargetSize; // some magic here else m_WindowDropTargetSize += (m_WindowDropTargetSize - (1 - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowDropTargetSize; - if (m_WindowDropTargetSize > MAX_WINDOW_SIZE) m_WindowDropTargetSize = MAX_WINDOW_SIZE; + if (m_WindowDropTargetSize > m_MaxWindowSize) m_WindowDropTargetSize = m_MaxWindowSize; m_WindowIncCounter--; if (m_WindowDropTargetSize >= maxWinSize) { @@ -1452,7 +1465,7 @@ namespace stream m_WindowSize += (m_WindowSize - (m_LastWindowDropSize - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowSize; // some magic here else m_WindowSize += (m_WindowSize - (1 - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowSize; - if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; + if (m_WindowSize > m_MaxWindowSize) m_WindowSize = m_MaxWindowSize; m_WindowIncCounter--; if (m_WindowSize >= maxWinSize) { @@ -1555,7 +1568,7 @@ namespace stream it->sendTime = ts; packets.push_back (it); if (m_IsClientChoked2 && it->GetSeqn () == m_DropWindowDelaySequenceNumber) - m_IsClientChoked2 = false; + m_IsClientChoked2 = false; if ((int)packets.size () >= m_NumPacketsToSend) break; } } diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index a3f9734e..91cb59f7 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -76,6 +76,7 @@ namespace stream const int PENDING_INCOMING_TIMEOUT = 10; // in seconds const int MAX_RECEIVE_TIMEOUT = 20; // in seconds const uint16_t DELAY_CHOKING = 60000; // in milliseconds + const uint16_t DELAY_CHOKING_JAVA = 61000; // in milliseconds const uint16_t DELAY_CHOKING_2 = 65535; // in milliseconds const uint64_t SEND_INTERVAL = 10000; // in microseconds const uint64_t SEND_INTERVAL_VARIANCE = 2000; // in microseconds @@ -287,6 +288,7 @@ namespace stream bool m_IsImmediateAckRequested; bool m_IsRemoteLeaseChangeInProgress; bool m_IsBufferEmpty; + bool m_IsJavaClient; StreamingDestination& m_LocalDestination; std::shared_ptr m_RemoteIdentity; std::shared_ptr m_TransientVerifier; // in case of offline key @@ -305,7 +307,7 @@ namespace stream SendBufferQueue m_SendBuffer; double m_RTT, m_MinRTT, m_SlowRTT, m_FastRTT; - float m_WindowSize, m_LastWindowDropSize, m_WindowDropTargetSize; + float m_WindowSize, m_MaxWindowSize, m_LastWindowDropSize, m_WindowDropTargetSize; int m_WindowIncCounter, m_RTO, m_AckDelay, m_PrevRTTSample; double m_Jitter; uint64_t m_MinPacingTime, m_PacingTime, m_PacingTimeRem, // microseconds