diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 8a5c355c..570fdd1d 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -326,6 +326,7 @@ namespace stream void SendPing (std::shared_ptr remote); void DeleteStream (std::shared_ptr stream); bool DeleteStream (uint32_t recvStreamID); + size_t GetNumStreams () const { return m_Streams.size (); }; void SetAcceptor (const Acceptor& acceptor); void ResetAcceptor (); bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 3114bb5a..2c0f8d92 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -43,23 +43,21 @@ namespace client m_Stream->AsyncClose (); m_Stream = nullptr; } - auto Session = m_Owner.FindSession(m_ID); switch (m_SocketType) { case eSAMSocketTypeSession: m_Owner.CloseSession (m_ID); break; case eSAMSocketTypeStream: - { - break; - } + break; case eSAMSocketTypeAcceptor: case eSAMSocketTypeForward: { - if (Session) + auto session = m_Owner.FindSession(m_ID); + if (session) { - if (m_IsAccepting && Session->GetLocalDestination ()) - Session->GetLocalDestination ()->StopAcceptingStreams (); + if (m_IsAccepting && session->GetLocalDestination ()) + session->GetLocalDestination ()->StopAcceptingStreams (); } break; } @@ -1479,17 +1477,39 @@ namespace client session->StopLocalDestination (); session->Close (); if (m_IsSingleThread) - { - auto timer = std::make_shared(GetService ()); - timer->expires_from_now (boost::posix_time::seconds(5)); // postpone destination clean for 5 seconds - timer->async_wait ([timer, session](const boost::system::error_code& ecode) - { - // session's destructor is called here - }); - } + ScheduleSessionCleanupTimer (session); // let all session's streams close } } + void SAMBridge::ScheduleSessionCleanupTimer (std::shared_ptr session) + { + auto timer = std::make_shared(GetService ()); + timer->expires_from_now (boost::posix_time::seconds(5)); // postpone destination clean for 5 seconds + timer->async_wait (std::bind (&SAMBridge::HandleSessionCleanupTimer, this, std::placeholders::_1, session, timer)); + } + + void SAMBridge::HandleSessionCleanupTimer (const boost::system::error_code& ecode, + std::shared_ptr session, std::shared_ptr timer) + { + if (ecode != boost::asio::error::operation_aborted && session) + { + auto dest = session->GetLocalDestination (); + if (dest) + { + auto streamingDest = dest->GetStreamingDestination (); + auto numStreams = streamingDest->GetNumStreams (); + if (numStreams > 0) + { + LogPrint (eLogInfo, "SAM: Session ", session->Name, " still has ", numStreams, " streams"); + ScheduleSessionCleanupTimer (session); + } + else + LogPrint (eLogDebug, "SAM: Session ", session->Name, " terminated"); + } + } + // session's destructor is called here unless rescheduled + } + std::shared_ptr SAMBridge::FindSession (const std::string& id) const { std::unique_lock l(m_SessionsMutex); diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index 10ef4957..1886324a 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2024, The PurpleI2P Project +* Copyright (c) 2013-2025, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -271,6 +271,10 @@ namespace client void ReceiveDatagram (); void HandleReceivedDatagram (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void ScheduleSessionCleanupTimer (std::shared_ptr session); + void HandleSessionCleanupTimer (const boost::system::error_code& ecode, + std::shared_ptr session, std::shared_ptr timer); + private: bool m_IsSingleThread;