diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index 70c59067..3d71e8d5 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -78,9 +78,9 @@ namespace stream m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_RTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_SlowRTT2 (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO), - m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT), + m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT), m_WindowSizeTail (0), m_Jitter (0), m_MinPacingTime (0), - m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_RemoteLeaseChangeTime (0), + m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_LastACKRecieveTime (0), m_ACKRecieveInterval (local.GetOwner ()->GetStreamingAckDelay ()), m_RemoteLeaseChangeTime (0), m_LastACKSendTime (0), m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed m_NumResendAttempts (0), m_NumPacketsToSend (0), m_MTU (STREAMING_MTU) { @@ -106,8 +106,8 @@ namespace stream m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_SlowRTT2 (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), - m_PrevRTTSample (INITIAL_RTT), m_Jitter (0), m_MinPacingTime (0), - m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_RemoteLeaseChangeTime (0), + m_PrevRTTSample (INITIAL_RTT), m_WindowSizeTail (0), m_Jitter (0), m_MinPacingTime (0), + m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_LastACKRecieveTime (0), m_ACKRecieveInterval (local.GetOwner ()->GetStreamingAckDelay ()), m_RemoteLeaseChangeTime (0), m_LastACKSendTime (0), m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed m_NumResendAttempts (0), m_NumPacketsToSend (0), m_MTU (STREAMING_MTU) { @@ -366,6 +366,7 @@ namespace stream { if (!m_IsWinDropped) { + LogPrint (eLogDebug, "Streaming: Client choked, set min. window size"); m_WindowDropTargetSize = MIN_WINDOW_SIZE; m_LastWindowDropSize = 0; m_WindowIncCounter = 0; @@ -537,12 +538,21 @@ namespace stream m_SentPackets.erase (it++); m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; - if (m_WindowSize < MAX_WINDOW_SIZE && !m_IsFirstACK) + if (m_WindowIncCounter < MAX_WINDOW_SIZE && !m_IsFirstACK && !m_IsWinDropped) incCounter++; } else break; } + if (m_LastACKRecieveTime) + { + uint64_t interval = ts - m_LastACKRecieveTime; + if (m_ACKRecieveInterval) + m_ACKRecieveInterval = (m_ACKRecieveInterval + interval) / 2; + else + m_ACKRecieveInterval = interval; + } + m_LastACKRecieveTime = ts; if (rttSample != INT_MAX) { if (m_IsFirstRttSample && !m_IsFirstACK) @@ -589,12 +599,15 @@ namespace stream // // delay-based CC if ((m_SlowRTT2 > m_SlowRTT + m_Jitter && rttSample > m_SlowRTT2 && rttSample > m_PrevRTTSample) && !m_IsWinDropped) // Drop window if RTT grows too fast, late detection + { + LogPrint (eLogDebug, "Streaming: Congestion detected, reduce window size"); ProcessWindowDrop (); + } UpdatePacingTime (); m_PrevRTTSample = rttSample; bool wasInitial = m_RTO == INITIAL_RTO; - m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.3 + m_Jitter)); // TODO: implement it better + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.3 + m_Jitter + m_ACKRecieveInterval)); // TODO: implement it better if (wasInitial) ScheduleResend (); @@ -604,20 +617,10 @@ namespace stream m_IsFirstRttSample = true; m_IsWinDropped = false; } - if (m_WindowDropTargetSize && m_WindowSize <= m_WindowDropTargetSize) + if (m_WindowDropTargetSize && int(m_SentPackets.size ()) <= m_WindowDropTargetSize) { + m_WindowSize = m_WindowDropTargetSize; m_WindowDropTargetSize = 0; - m_DropWindowDelaySequenceNumber = m_SequenceNumber; - } - if (acknowledged && m_WindowDropTargetSize && m_WindowSize > m_WindowDropTargetSize) - { - m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5 + m_Jitter)); // we assume that the next rtt sample may be much larger than the current - m_IsResendNeeded = true; - m_WindowSize = m_SentPackets.size () + 1; // if there are no packets to resend, just send one regular packet - if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; - if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; - m_WindowIncCounter = 0; - UpdatePacingTime (); } if (acknowledged || m_IsNAcked) { @@ -626,12 +629,14 @@ namespace stream if (m_SendBuffer.IsEmpty () && m_SentPackets.size () > 0) // tail loss { m_IsResendNeeded = true; - m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5 + m_Jitter)); // to prevent spurious retransmit + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5 + m_Jitter + m_ACKRecieveInterval)); // to prevent spurious retransmit } if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) { m_ResendTimer.cancel (); m_SendTimer.cancel (); + m_LastACKRecieveTime = 0; + m_ACKRecieveInterval = m_AckDelay; } if (acknowledged && m_IsFirstACK) { @@ -1158,6 +1163,7 @@ namespace stream } if (m_RemoteLeaseChangeTime && m_IsRemoteLeaseChangeInProgress && ts > m_RemoteLeaseChangeTime + INITIAL_RTO) { + LogPrint (eLogDebug, "Streaming: RemoteLease changed, set initial window size"); CancelRemoteLeaseChange (); m_CurrentRemoteLease = m_NextRemoteLease; HalveWindowSize (); @@ -1305,6 +1311,11 @@ namespace stream } UpdatePacingTime (); } + else if (m_WindowIncCounter && m_WindowSize == MAX_WINDOW_SIZE && !m_SendBuffer.IsEmpty () && m_PacingTime > m_MinPacingTime) + { + m_WindowSizeTail = m_WindowSizeTail + m_WindowIncCounter; + if (m_WindowSizeTail > MAX_WINDOW_SIZE) m_WindowSizeTail = MAX_WINDOW_SIZE; + } if (m_IsNAcked) ResendPacket (); else if (m_IsResendNeeded) // resend packets @@ -1409,7 +1420,10 @@ namespace stream { // loss-based CC if (!m_IsWinDropped && LOSS_BASED_CONTROL_ENABLED) + { + LogPrint (eLogDebug, "Streaming: Packet loss, reduce window size"); ProcessWindowDrop (); + } } else if (m_IsTimeOutResend) { @@ -1422,6 +1436,8 @@ namespace stream m_IsFirstRttSample = true; m_DropWindowDelaySequenceNumber = 0; m_IsFirstACK = true; + m_LastACKRecieveTime = 0; + m_ACKRecieveInterval = m_AckDelay; UpdatePacingTime (); if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); if (m_NumResendAttempts & 1) @@ -1585,6 +1601,7 @@ namespace stream } if (isLeaseChanged && !m_IsRemoteLeaseChangeInProgress) { + LogPrint (eLogDebug, "Streaming: RemoteLease changed, set initial window size"); HalveWindowSize (); } } @@ -1601,7 +1618,10 @@ namespace stream void Stream::UpdatePacingTime () { - m_PacingTime = std::round (m_RTT*1000/m_WindowSize); + if (m_WindowDropTargetSize) + m_PacingTime = std::round (m_RTT*1000/m_WindowDropTargetSize); + else + m_PacingTime = std::round (m_RTT*1000/m_WindowSize); if (m_MinPacingTime && m_PacingTime < m_MinPacingTime) m_PacingTime = m_MinPacingTime; } @@ -1609,18 +1629,20 @@ namespace stream void Stream::ProcessWindowDrop () { if (m_WindowSize > m_LastWindowDropSize) - m_LastWindowDropSize = (m_LastWindowDropSize + m_WindowSize) / 2; + { + m_LastWindowDropSize = (m_LastWindowDropSize + m_WindowSize + m_WindowSizeTail) / 2; + if (m_LastWindowDropSize > MAX_WINDOW_SIZE) m_LastWindowDropSize = MAX_WINDOW_SIZE; + } else m_LastWindowDropSize = m_WindowSize; m_WindowDropTargetSize = m_LastWindowDropSize - (m_LastWindowDropSize / 4); // -25%; - if (m_WindowDropTargetSize < MIN_WINDOW_SIZE + 1) - m_WindowDropTargetSize = MIN_WINDOW_SIZE + 1; - m_WindowSize = m_SentPackets.size (); // stop sending now - if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; + if (m_WindowDropTargetSize < MIN_WINDOW_SIZE) + m_WindowDropTargetSize = MIN_WINDOW_SIZE; m_WindowIncCounter = 0; // disable window growth - m_DropWindowDelaySequenceNumber = m_SequenceNumber; + m_DropWindowDelaySequenceNumber = m_SequenceNumber + int(m_WindowDropTargetSize); m_IsFirstACK = true; // ignore first RTT sample m_IsWinDropped = true; // don't drop window twice + m_WindowSizeTail = 0; UpdatePacingTime (); } @@ -1638,6 +1660,7 @@ namespace stream m_WindowIncCounter = 0; m_IsFirstRttSample = true; m_IsFirstACK = true; + m_WindowSizeTail = 0; UpdatePacingTime (); } diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 71450b2b..a9e8a404 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -295,10 +295,10 @@ namespace stream SendBufferQueue m_SendBuffer; double m_RTT, m_SlowRTT, m_SlowRTT2; float m_WindowSize, m_LastWindowDropSize, m_WindowDropTargetSize; - int m_WindowIncCounter, m_RTO, m_AckDelay, m_PrevRTTSample; + int m_WindowIncCounter, m_RTO, m_AckDelay, m_PrevRTTSample, m_WindowSizeTail; double m_Jitter; uint64_t m_MinPacingTime, m_PacingTime, m_PacingTimeRem, // microseconds - m_LastSendTime, m_RemoteLeaseChangeTime; // milliseconds + m_LastSendTime, m_LastACKRecieveTime, m_ACKRecieveInterval, m_RemoteLeaseChangeTime; // milliseconds uint64_t m_LastACKSendTime, m_PacketACKInterval, m_PacketACKIntervalRem; // for limit inbound speed int m_NumResendAttempts, m_NumPacketsToSend; size_t m_MTU;