diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index 66c8919d..f0469719 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -88,7 +88,7 @@ namespace stream m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed m_Status (eStreamStatusNew), m_IsIncoming (false), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), - m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsClientChoked (false), + m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsChoking2 (false), m_IsClientChoked (false), m_IsClientChoked2 (false), m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_IsBufferEmpty (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), @@ -116,7 +116,7 @@ namespace stream m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed m_Status (eStreamStatusNew), m_IsIncoming (true), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), - m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsClientChoked (false), + m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsChoking2 (false), m_IsClientChoked (false), m_IsClientChoked2 (false), m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_IsBufferEmpty (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), m_MinRTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_FastRTT (INITIAL_RTT), @@ -288,8 +288,41 @@ namespace stream else { LogPrint (eLogInfo, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); - // save message and wait for missing message again - SavePacket (packet); + if ((receivedSeqn - m_LastReceivedSequenceNumber) >= MAX_WINDOW_SIZE*3) + { + m_LocalDestination.DeletePacket (packet); + m_IsChoking2 = true; + } + else if (m_SavedPackets.empty () && (receivedSeqn - m_LastReceivedSequenceNumber) >= 256) + { + m_LocalDestination.DeletePacket (packet); + m_IsChoking2 = true; + } + else if (!m_SavedPackets.empty ()) + { + uint8_t numNacks = 0; + auto lastSavedSeq = 0; + auto nextSeqn = m_LastReceivedSequenceNumber + 1; + for (auto it: m_SavedPackets) + { + auto seqn = it->GetSeqn (); + if ((int)seqn > lastSavedSeq) lastSavedSeq = seqn; + for (uint32_t i = nextSeqn; i < seqn; i++) numNacks++; + nextSeqn = seqn + 1; + } + + if (numNacks + (receivedSeqn - lastSavedSeq) >= 256) + { + m_LocalDestination.DeletePacket (packet); + m_IsChoking2 = true; + } + else + // save message and wait for missing message again + SavePacket (packet); + } + else + // save message and wait for missing message again + SavePacket (packet); if (m_LastReceivedSequenceNumber >= 0) { if (!m_IsAckSendScheduled) @@ -380,7 +413,12 @@ namespace stream } if (delayRequested >= DELAY_CHOKING) { - if (!m_IsClientChoked) + if (delayRequested == 65535) + { + m_IsClientChoked2 = true; + m_DropWindowDelaySequenceNumber = m_SequenceNumber-1; + } + else if (!m_IsClientChoked) { LogPrint (eLogDebug, "Streaming: Client choked, set min. window size"); m_WindowDropTargetSize = MIN_WINDOW_SIZE; @@ -388,7 +426,7 @@ namespace stream m_WindowIncCounter = 0; m_IsClientChoked = true; m_IsWinDropped = false; - m_DropWindowDelaySequenceNumber = m_SequenceNumber; + m_DropWindowDelaySequenceNumber = m_SequenceNumber-1; m_IsFirstRttSample = true; UpdatePacingTime (); } @@ -690,6 +728,8 @@ namespace stream } if (m_IsClientChoked && (ackThrough >= m_DropWindowDelaySequenceNumber || m_SentPackets.empty ())) m_IsClientChoked = false; + if (m_IsClientChoked2 && (ackThrough >= m_DropWindowDelaySequenceNumber || m_SentPackets.empty ())) + m_IsClientChoked2 = false; if (m_IsWinDropped && ackThrough > m_DropWindowDelaySequenceNumber) { m_IsFirstRttSample = true; @@ -993,7 +1033,7 @@ namespace stream htobe32buf (packet + size, lastReceivedSeqn); size += 4; // ack Through uint8_t numNacks = 0; - bool choking = false; + bool choking = m_IsChoking2; if (lastReceivedSeqn > m_LastReceivedSequenceNumber) { // fill NACKs @@ -1043,7 +1083,10 @@ namespace stream if (choking || requestImmediateAck) { htobe16buf (packet + size, 2); // 2 bytes delay interval - htobe16buf (packet + size + 2, choking ? DELAY_CHOKING : 0); // set choking or immediate ack interval + if (m_IsChoking2) + htobe16buf (packet + size + 2, DELAY_CHOKING_2); // set choking2 + else + htobe16buf (packet + size + 2, choking ? DELAY_CHOKING : 0); // set choking or immediate ack interval size += 2; if (requestImmediateAck) // ack request sent { @@ -1059,6 +1102,7 @@ namespace stream SendPackets (std::vector { &p }); m_LastACKSendTime = ts; // for limit inbound speed m_LastConfirmedReceivedSequenceNumber = lastReceivedSeqn; // for limit inbound speed + m_IsChoking2 = false; LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs"); } @@ -1423,7 +1467,7 @@ namespace stream UpdatePacingTime (); } m_LastWindowIncTime = ts; - if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked) // resend packets + if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked || m_IsClientChoked2) // resend packets ResendPacket (); else if (m_WindowSize > int(m_SentPackets.size ())) // send packets SendBuffer (); @@ -1455,7 +1499,6 @@ namespace stream m_SendTimer.cancel (); // if no ack's in RTO, disable fast retransmit m_IsTimeOutResend = true; m_IsNAcked = false; - m_IsClientChoked = false; m_IsResendNeeded = false; m_NumPacketsToSend = 1; ResendPacket (); // send one packet per RTO, waiting for ack @@ -1483,7 +1526,7 @@ namespace stream // collect packets to resend auto ts = i2p::util::GetMillisecondsSinceEpoch (); std::vector packets; - if (m_IsNAcked) + if (m_IsNAcked && !m_IsClientChoked && !m_IsClientChoked2) { for (auto it : m_NACKedPackets) { @@ -1511,6 +1554,8 @@ namespace stream it->resent = false; it->sendTime = ts; packets.push_back (it); + if (m_IsClientChoked2 && it->GetSeqn () == m_DropWindowDelaySequenceNumber) + m_IsClientChoked2 = false; if ((int)packets.size () >= m_NumPacketsToSend) break; } } @@ -1531,7 +1576,7 @@ namespace stream m_LastWindowDropSize = m_WindowDropTargetSize; else m_LastWindowDropSize = m_WindowSize; - m_WindowDropTargetSize = m_LastWindowDropSize * 0.75; // -25% to drain queue + m_WindowDropTargetSize = m_LastWindowDropSize * 0.5; // -50% to drain queue if (m_WindowDropTargetSize < MIN_WINDOW_SIZE) m_WindowDropTargetSize = MIN_WINDOW_SIZE; m_WindowIncCounter = 0; // disable window growth @@ -1543,7 +1588,6 @@ namespace stream } else if (m_IsTimeOutResend) { - m_IsTimeOutResend = false; m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change m_WindowDropTargetSize = INITIAL_WINDOW_SIZE; m_LastWindowDropSize = 0; @@ -1574,12 +1618,12 @@ namespace stream SendPackets (packets); m_LastSendTime = ts; m_IsSendTime = false; - if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked) ScheduleSend (); } - else if (!m_IsClientChoked) + else if (!m_IsClientChoked && !m_IsClientChoked2) SendBuffer (); - if (!m_IsNAcked && !m_IsResendNeeded && !m_IsClientChoked) ScheduleResend (); - if (m_IsClientChoked) ScheduleSend (); + m_IsSendTime = false; + if (m_IsTimeOutResend) ScheduleResend (); // ^ m_IsTimeOutResend = false + else if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked || m_IsClientChoked2) ScheduleSend (); } void Stream::ScheduleAck (int timeout) @@ -1750,7 +1794,7 @@ namespace stream m_LastWindowDropSize = m_WindowDropTargetSize * ((m_MinRTT + m_Jitter*4) / m_FastRTT); else m_LastWindowDropSize = m_WindowSize * ((m_MinRTT + m_Jitter*4) / m_FastRTT); - m_WindowDropTargetSize = m_LastWindowDropSize * 0.75; // -25% to drain queue + m_WindowDropTargetSize = m_LastWindowDropSize * 0.5; // -50% to drain queue if (m_WindowDropTargetSize < MIN_WINDOW_SIZE) m_WindowDropTargetSize = MIN_WINDOW_SIZE; m_WindowIncCounter = 0; // disable window growth diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index eae43c0e..a3f9734e 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -76,6 +76,7 @@ namespace stream const int PENDING_INCOMING_TIMEOUT = 10; // in seconds const int MAX_RECEIVE_TIMEOUT = 20; // in seconds const uint16_t DELAY_CHOKING = 60000; // in milliseconds + const uint16_t DELAY_CHOKING_2 = 65535; // in milliseconds const uint64_t SEND_INTERVAL = 10000; // in microseconds const uint64_t SEND_INTERVAL_VARIANCE = 2000; // in microseconds const uint64_t REQUEST_IMMEDIATE_ACK_INTERVAL = 7500; // in milliseconds @@ -279,7 +280,9 @@ namespace stream bool m_IsFirstRttSample; bool m_IsSendTime; bool m_IsWinDropped; + bool m_IsChoking2; bool m_IsClientChoked; + bool m_IsClientChoked2; bool m_IsTimeOutResend; bool m_IsImmediateAckRequested; bool m_IsRemoteLeaseChangeInProgress;