drop is choked
Some checks failed
Build Debian packages / bookworm (push) Has been cancelled
Build Debian packages / bullseye (push) Has been cancelled
Build Debian packages / trixie (push) Has been cancelled
Build on FreeBSD / with UPnP (push) Has been cancelled
Build on OSX / With USE_UPNP=no (push) Has been cancelled
Build on OSX / With USE_UPNP=yes (push) Has been cancelled
Build on Windows / clang-x86_64 (push) Has been cancelled
Build on Windows / i686 (push) Has been cancelled
Build on Windows / ucrt-x86_64 (push) Has been cancelled
Build on Windows / x86_64 (push) Has been cancelled
Build on Windows / CMake clang-x86_64 (push) Has been cancelled
Build on Windows / CMake i686 (push) Has been cancelled
Build on Windows / CMake ucrt-x86_64 (push) Has been cancelled
Build on Windows / CMake x86_64 (push) Has been cancelled
Build on Windows / XP (push) Has been cancelled
Build on Ubuntu / Make with USE_UPNP=no (push) Has been cancelled
Build on Ubuntu / Make with USE_UPNP=yes (push) Has been cancelled
Build on Ubuntu / CMake with -DWITH_UPNP=OFF (push) Has been cancelled
Build on Ubuntu / CMake with -DWITH_UPNP=ON (push) Has been cancelled
Build containers / Building container for linux/amd64 (push) Has been cancelled
Build containers / Building container for linux/arm64 (push) Has been cancelled
Build containers / Building container for linux/arm/v7 (push) Has been cancelled
Build containers / Building container for linux/386 (push) Has been cancelled
Build containers / Pushing merged manifest (push) Has been cancelled

This commit is contained in:
orignal 2025-07-14 19:23:26 -04:00
parent 2c2452cd3b
commit edc27d5bcb
2 changed files with 65 additions and 18 deletions

View file

@ -88,7 +88,7 @@ namespace stream
m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1),
m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed
m_Status (eStreamStatusNew), m_IsIncoming (false), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), m_Status (eStreamStatusNew), m_IsIncoming (false), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false),
m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsClientChoked (false), m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsChoking2 (false), m_IsClientChoked (false), m_IsClientChoked2 (false),
m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_IsBufferEmpty (false), m_LocalDestination (local), m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_IsBufferEmpty (false), m_LocalDestination (local),
m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service),
m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port),
@ -116,7 +116,7 @@ namespace stream
m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1),
m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed
m_Status (eStreamStatusNew), m_IsIncoming (true), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), m_Status (eStreamStatusNew), m_IsIncoming (true), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false),
m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsClientChoked (false), m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (true), m_IsChoking2 (false), m_IsClientChoked (false), m_IsClientChoked2 (false),
m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_IsBufferEmpty (false), m_LocalDestination (local), m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_IsBufferEmpty (false), m_LocalDestination (local),
m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), m_MinRTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_FastRTT (INITIAL_RTT), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), m_MinRTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_FastRTT (INITIAL_RTT),
@ -288,8 +288,41 @@ namespace stream
else else
{ {
LogPrint (eLogInfo, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); LogPrint (eLogInfo, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1);
// save message and wait for missing message again if ((receivedSeqn - m_LastReceivedSequenceNumber) >= MAX_WINDOW_SIZE*3)
SavePacket (packet); {
m_LocalDestination.DeletePacket (packet);
m_IsChoking2 = true;
}
else if (m_SavedPackets.empty () && (receivedSeqn - m_LastReceivedSequenceNumber) >= 256)
{
m_LocalDestination.DeletePacket (packet);
m_IsChoking2 = true;
}
else if (!m_SavedPackets.empty ())
{
uint8_t numNacks = 0;
auto lastSavedSeq = 0;
auto nextSeqn = m_LastReceivedSequenceNumber + 1;
for (auto it: m_SavedPackets)
{
auto seqn = it->GetSeqn ();
if ((int)seqn > lastSavedSeq) lastSavedSeq = seqn;
for (uint32_t i = nextSeqn; i < seqn; i++) numNacks++;
nextSeqn = seqn + 1;
}
if (numNacks + (receivedSeqn - lastSavedSeq) >= 256)
{
m_LocalDestination.DeletePacket (packet);
m_IsChoking2 = true;
}
else
// save message and wait for missing message again
SavePacket (packet);
}
else
// save message and wait for missing message again
SavePacket (packet);
if (m_LastReceivedSequenceNumber >= 0) if (m_LastReceivedSequenceNumber >= 0)
{ {
if (!m_IsAckSendScheduled) if (!m_IsAckSendScheduled)
@ -380,7 +413,12 @@ namespace stream
} }
if (delayRequested >= DELAY_CHOKING) if (delayRequested >= DELAY_CHOKING)
{ {
if (!m_IsClientChoked) if (delayRequested == 65535)
{
m_IsClientChoked2 = true;
m_DropWindowDelaySequenceNumber = m_SequenceNumber-1;
}
else if (!m_IsClientChoked)
{ {
LogPrint (eLogDebug, "Streaming: Client choked, set min. window size"); LogPrint (eLogDebug, "Streaming: Client choked, set min. window size");
m_WindowDropTargetSize = MIN_WINDOW_SIZE; m_WindowDropTargetSize = MIN_WINDOW_SIZE;
@ -388,7 +426,7 @@ namespace stream
m_WindowIncCounter = 0; m_WindowIncCounter = 0;
m_IsClientChoked = true; m_IsClientChoked = true;
m_IsWinDropped = false; m_IsWinDropped = false;
m_DropWindowDelaySequenceNumber = m_SequenceNumber; m_DropWindowDelaySequenceNumber = m_SequenceNumber-1;
m_IsFirstRttSample = true; m_IsFirstRttSample = true;
UpdatePacingTime (); UpdatePacingTime ();
} }
@ -690,6 +728,8 @@ namespace stream
} }
if (m_IsClientChoked && (ackThrough >= m_DropWindowDelaySequenceNumber || m_SentPackets.empty ())) if (m_IsClientChoked && (ackThrough >= m_DropWindowDelaySequenceNumber || m_SentPackets.empty ()))
m_IsClientChoked = false; m_IsClientChoked = false;
if (m_IsClientChoked2 && (ackThrough >= m_DropWindowDelaySequenceNumber || m_SentPackets.empty ()))
m_IsClientChoked2 = false;
if (m_IsWinDropped && ackThrough > m_DropWindowDelaySequenceNumber) if (m_IsWinDropped && ackThrough > m_DropWindowDelaySequenceNumber)
{ {
m_IsFirstRttSample = true; m_IsFirstRttSample = true;
@ -993,7 +1033,7 @@ namespace stream
htobe32buf (packet + size, lastReceivedSeqn); htobe32buf (packet + size, lastReceivedSeqn);
size += 4; // ack Through size += 4; // ack Through
uint8_t numNacks = 0; uint8_t numNacks = 0;
bool choking = false; bool choking = m_IsChoking2;
if (lastReceivedSeqn > m_LastReceivedSequenceNumber) if (lastReceivedSeqn > m_LastReceivedSequenceNumber)
{ {
// fill NACKs // fill NACKs
@ -1043,7 +1083,10 @@ namespace stream
if (choking || requestImmediateAck) if (choking || requestImmediateAck)
{ {
htobe16buf (packet + size, 2); // 2 bytes delay interval htobe16buf (packet + size, 2); // 2 bytes delay interval
htobe16buf (packet + size + 2, choking ? DELAY_CHOKING : 0); // set choking or immediate ack interval if (m_IsChoking2)
htobe16buf (packet + size + 2, DELAY_CHOKING_2); // set choking2
else
htobe16buf (packet + size + 2, choking ? DELAY_CHOKING : 0); // set choking or immediate ack interval
size += 2; size += 2;
if (requestImmediateAck) // ack request sent if (requestImmediateAck) // ack request sent
{ {
@ -1059,6 +1102,7 @@ namespace stream
SendPackets (std::vector<Packet *> { &p }); SendPackets (std::vector<Packet *> { &p });
m_LastACKSendTime = ts; // for limit inbound speed m_LastACKSendTime = ts; // for limit inbound speed
m_LastConfirmedReceivedSequenceNumber = lastReceivedSeqn; // for limit inbound speed m_LastConfirmedReceivedSequenceNumber = lastReceivedSeqn; // for limit inbound speed
m_IsChoking2 = false;
LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs"); LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs");
} }
@ -1423,7 +1467,7 @@ namespace stream
UpdatePacingTime (); UpdatePacingTime ();
} }
m_LastWindowIncTime = ts; m_LastWindowIncTime = ts;
if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked) // resend packets if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked || m_IsClientChoked2) // resend packets
ResendPacket (); ResendPacket ();
else if (m_WindowSize > int(m_SentPackets.size ())) // send packets else if (m_WindowSize > int(m_SentPackets.size ())) // send packets
SendBuffer (); SendBuffer ();
@ -1455,7 +1499,6 @@ namespace stream
m_SendTimer.cancel (); // if no ack's in RTO, disable fast retransmit m_SendTimer.cancel (); // if no ack's in RTO, disable fast retransmit
m_IsTimeOutResend = true; m_IsTimeOutResend = true;
m_IsNAcked = false; m_IsNAcked = false;
m_IsClientChoked = false;
m_IsResendNeeded = false; m_IsResendNeeded = false;
m_NumPacketsToSend = 1; m_NumPacketsToSend = 1;
ResendPacket (); // send one packet per RTO, waiting for ack ResendPacket (); // send one packet per RTO, waiting for ack
@ -1483,7 +1526,7 @@ namespace stream
// collect packets to resend // collect packets to resend
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
std::vector<Packet *> packets; std::vector<Packet *> packets;
if (m_IsNAcked) if (m_IsNAcked && !m_IsClientChoked && !m_IsClientChoked2)
{ {
for (auto it : m_NACKedPackets) for (auto it : m_NACKedPackets)
{ {
@ -1511,6 +1554,8 @@ namespace stream
it->resent = false; it->resent = false;
it->sendTime = ts; it->sendTime = ts;
packets.push_back (it); packets.push_back (it);
if (m_IsClientChoked2 && it->GetSeqn () == m_DropWindowDelaySequenceNumber)
m_IsClientChoked2 = false;
if ((int)packets.size () >= m_NumPacketsToSend) break; if ((int)packets.size () >= m_NumPacketsToSend) break;
} }
} }
@ -1531,7 +1576,7 @@ namespace stream
m_LastWindowDropSize = m_WindowDropTargetSize; m_LastWindowDropSize = m_WindowDropTargetSize;
else else
m_LastWindowDropSize = m_WindowSize; m_LastWindowDropSize = m_WindowSize;
m_WindowDropTargetSize = m_LastWindowDropSize * 0.75; // -25% to drain queue m_WindowDropTargetSize = m_LastWindowDropSize * 0.5; // -50% to drain queue
if (m_WindowDropTargetSize < MIN_WINDOW_SIZE) if (m_WindowDropTargetSize < MIN_WINDOW_SIZE)
m_WindowDropTargetSize = MIN_WINDOW_SIZE; m_WindowDropTargetSize = MIN_WINDOW_SIZE;
m_WindowIncCounter = 0; // disable window growth m_WindowIncCounter = 0; // disable window growth
@ -1543,7 +1588,6 @@ namespace stream
} }
else if (m_IsTimeOutResend) else if (m_IsTimeOutResend)
{ {
m_IsTimeOutResend = false;
m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change
m_WindowDropTargetSize = INITIAL_WINDOW_SIZE; m_WindowDropTargetSize = INITIAL_WINDOW_SIZE;
m_LastWindowDropSize = 0; m_LastWindowDropSize = 0;
@ -1574,12 +1618,12 @@ namespace stream
SendPackets (packets); SendPackets (packets);
m_LastSendTime = ts; m_LastSendTime = ts;
m_IsSendTime = false; m_IsSendTime = false;
if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked) ScheduleSend ();
} }
else if (!m_IsClientChoked) else if (!m_IsClientChoked && !m_IsClientChoked2)
SendBuffer (); SendBuffer ();
if (!m_IsNAcked && !m_IsResendNeeded && !m_IsClientChoked) ScheduleResend (); m_IsSendTime = false;
if (m_IsClientChoked) ScheduleSend (); if (m_IsTimeOutResend) ScheduleResend (); // ^ m_IsTimeOutResend = false
else if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked || m_IsClientChoked2) ScheduleSend ();
} }
void Stream::ScheduleAck (int timeout) void Stream::ScheduleAck (int timeout)
@ -1750,7 +1794,7 @@ namespace stream
m_LastWindowDropSize = m_WindowDropTargetSize * ((m_MinRTT + m_Jitter*4) / m_FastRTT); m_LastWindowDropSize = m_WindowDropTargetSize * ((m_MinRTT + m_Jitter*4) / m_FastRTT);
else else
m_LastWindowDropSize = m_WindowSize * ((m_MinRTT + m_Jitter*4) / m_FastRTT); m_LastWindowDropSize = m_WindowSize * ((m_MinRTT + m_Jitter*4) / m_FastRTT);
m_WindowDropTargetSize = m_LastWindowDropSize * 0.75; // -25% to drain queue m_WindowDropTargetSize = m_LastWindowDropSize * 0.5; // -50% to drain queue
if (m_WindowDropTargetSize < MIN_WINDOW_SIZE) if (m_WindowDropTargetSize < MIN_WINDOW_SIZE)
m_WindowDropTargetSize = MIN_WINDOW_SIZE; m_WindowDropTargetSize = MIN_WINDOW_SIZE;
m_WindowIncCounter = 0; // disable window growth m_WindowIncCounter = 0; // disable window growth

View file

@ -76,6 +76,7 @@ namespace stream
const int PENDING_INCOMING_TIMEOUT = 10; // in seconds const int PENDING_INCOMING_TIMEOUT = 10; // in seconds
const int MAX_RECEIVE_TIMEOUT = 20; // in seconds const int MAX_RECEIVE_TIMEOUT = 20; // in seconds
const uint16_t DELAY_CHOKING = 60000; // in milliseconds const uint16_t DELAY_CHOKING = 60000; // in milliseconds
const uint16_t DELAY_CHOKING_2 = 65535; // in milliseconds
const uint64_t SEND_INTERVAL = 10000; // in microseconds const uint64_t SEND_INTERVAL = 10000; // in microseconds
const uint64_t SEND_INTERVAL_VARIANCE = 2000; // in microseconds const uint64_t SEND_INTERVAL_VARIANCE = 2000; // in microseconds
const uint64_t REQUEST_IMMEDIATE_ACK_INTERVAL = 7500; // in milliseconds const uint64_t REQUEST_IMMEDIATE_ACK_INTERVAL = 7500; // in milliseconds
@ -279,7 +280,9 @@ namespace stream
bool m_IsFirstRttSample; bool m_IsFirstRttSample;
bool m_IsSendTime; bool m_IsSendTime;
bool m_IsWinDropped; bool m_IsWinDropped;
bool m_IsChoking2;
bool m_IsClientChoked; bool m_IsClientChoked;
bool m_IsClientChoked2;
bool m_IsTimeOutResend; bool m_IsTimeOutResend;
bool m_IsImmediateAckRequested; bool m_IsImmediateAckRequested;
bool m_IsRemoteLeaseChangeInProgress; bool m_IsRemoteLeaseChangeInProgress;