add expiration for messages in SSU2 send queue

This commit is contained in:
Vort 2024-03-09 19:05:02 +02:00
parent 66d0b7aec4
commit 2d06c0cbe6
3 changed files with 28 additions and 16 deletions

View file

@ -353,25 +353,32 @@ namespace transport
void SSU2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
{
if (m_State == eSSU2SessionStateTerminated) return;
bool isSemiFull = m_SendQueue.size () > SSU2_MAX_OUTGOING_QUEUE_SIZE/2;
uint64_t mts = i2p::util::GetMonotonicMicroseconds ();
uint64_t localExpiration = mts + I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT;
bool isSemiFull = false;
if (m_SendQueue.size ())
{
isSemiFull = m_SendQueue.front ()->IsLocalSemiExpired (mts);
if (isSemiFull)
{
LogPrint (eLogWarning, "SSU2: Outgoing messages queue to ",
GetIdentHashBase64 (), " is semi-full (", m_SendQueue.size (), ")");
}
}
for (auto it: msgs)
{
if (isSemiFull && it->onDrop)
it->Drop (); // drop earlier because we can handle it
else
{
it->SetLocalExpiration (localExpiration);
m_SendQueue.push_back (std::move (it));
}
}
SendQueue ();
if (m_SendQueue.size () > 0) // windows is full
{
if (m_SendQueue.size () <= SSU2_MAX_OUTGOING_QUEUE_SIZE)
Resend (i2p::util::GetMillisecondsSinceEpoch ());
else
{
LogPrint (eLogWarning, "SSU2: Outgoing messages queue size to ",
GetIdentHashBase64(), " exceeds ", SSU2_MAX_OUTGOING_QUEUE_SIZE);
RequestTermination (eSSU2TerminationReasonTimeout);
}
}
Resend (i2p::util::GetMillisecondsSinceEpoch ());
SetSendQueueSize (m_SendQueue.size ());
}
@ -380,6 +387,7 @@ namespace transport
if (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize)
{
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
uint64_t mts = i2p::util::GetMonotonicMicroseconds ();
auto packet = m_Server.GetSentPacketsPool ().AcquireShared ();
size_t ackBlockSize = CreateAckBlock (packet->payload, m_MaxPayloadSize);
bool ackBlockSent = false;
@ -387,7 +395,7 @@ namespace transport
while (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize)
{
auto msg = m_SendQueue.front ();
if (!msg || msg->IsExpired (ts))
if (!msg || msg->IsExpired (ts) || msg->IsLocalExpired (mts))
{
// drop null or expired message
if (msg) msg->Drop ();