R4SAS 3f45a11f12
[SSU] handle ICMP responses
Windows network stack can forward ICMP to socket and simple deleting of
packet can cause socket death. Same thing can happen on other systems
but without socket death.

Signed-off-by: R4SAS <r4sas@i2pmail.org>
2020-10-13 15:22:39 +03:00

854 lines
26 KiB

* Copyright (c) 2013-2020, The PurpleI2P Project
* This file is part of Purple i2pd project and licensed under BSD3
* See full license text in LICENSE file at top of project tree
#include <string.h>
#include "Log.h"
#include "Timestamp.h"
#include "RouterContext.h"
#include "NetDb.hpp"
#include "SSU.h"
namespace i2p
namespace transport
SSUServer::SSUServer (const boost::asio::ip::address & addr, int port):
m_OnlyV6(true), m_IsRunning(false),
m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr),
m_ReceiversThreadV6 (nullptr), m_Work (m_Service), m_WorkV6 (m_ServiceV6),
m_ReceiversWork (m_ReceiversService), m_ReceiversWorkV6 (m_ReceiversServiceV6),
m_EndpointV6 (addr, port), m_Socket (m_ReceiversService, m_Endpoint),
m_SocketV6 (m_ReceiversServiceV6), m_IntroducersUpdateTimer (m_Service),
m_PeerTestsCleanupTimer (m_Service), m_TerminationTimer (m_Service),
m_TerminationTimerV6 (m_ServiceV6)
OpenSocketV6 ();
SSUServer::SSUServer (int port):
m_OnlyV6(false), m_IsRunning(false),
m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr),
m_ReceiversThreadV6 (nullptr), m_Work (m_Service), m_WorkV6 (m_ServiceV6),
m_ReceiversWork (m_ReceiversService), m_ReceiversWorkV6 (m_ReceiversServiceV6),
m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port),
m_Socket (m_ReceiversService), m_SocketV6 (m_ReceiversServiceV6),
m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service),
m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6)
OpenSocket ();
if (context.SupportsV6 ())
OpenSocketV6 ();
SSUServer::~SSUServer ()
void SSUServer::OpenSocket ()
m_Socket.open (boost::asio::ip::udp::v4());
m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (SSU_SOCKET_RECEIVE_BUFFER_SIZE));
m_Socket.set_option (boost::asio::socket_base::send_buffer_size (SSU_SOCKET_SEND_BUFFER_SIZE));
m_Socket.bind (m_Endpoint);
LogPrint (eLogInfo, "SSU: Start listening v4 port ", m_Endpoint.port());
catch ( std::exception & ex )
LogPrint (eLogError, "SSU: failed to bind to v4 port ", m_Endpoint.port(), ": ", ex.what());
ThrowFatal ("Unable to start IPv4 SSU transport at port ", m_Endpoint.port(), ": ", ex.what ());
void SSUServer::OpenSocketV6 ()
m_SocketV6.open (boost::asio::ip::udp::v6());
m_SocketV6.set_option (boost::asio::ip::v6_only (true));
m_SocketV6.set_option (boost::asio::socket_base::receive_buffer_size (SSU_SOCKET_RECEIVE_BUFFER_SIZE));
m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (SSU_SOCKET_SEND_BUFFER_SIZE));
m_SocketV6.bind (m_EndpointV6);
LogPrint (eLogInfo, "SSU: Start listening v6 port ", m_EndpointV6.port());
catch ( std::exception & ex )
LogPrint (eLogError, "SSU: failed to bind to v6 port ", m_EndpointV6.port(), ": ", ex.what());
ThrowFatal ("Unable to start IPv6 SSU transport at port ", m_Endpoint.port(), ": ", ex.what ());
void SSUServer::Start ()
m_IsRunning = true;
if (!m_OnlyV6)
m_ReceiversThread = new std::thread (std::bind (&SSUServer::RunReceivers, this));
m_Thread = new std::thread (std::bind (&SSUServer::Run, this));
m_ReceiversService.post (std::bind (&SSUServer::Receive, this));
ScheduleTermination ();
if (context.SupportsV6 ())
m_ReceiversThreadV6 = new std::thread (std::bind (&SSUServer::RunReceiversV6, this));
m_ThreadV6 = new std::thread (std::bind (&SSUServer::RunV6, this));
m_ReceiversServiceV6.post (std::bind (&SSUServer::ReceiveV6, this));
ScheduleTerminationV6 ();
SchedulePeerTestsCleanupTimer ();
ScheduleIntroducersUpdateTimer (); // wait for 30 seconds and decide if we need introducers
void SSUServer::Stop ()
DeleteAllSessions ();
m_IsRunning = false;
m_TerminationTimer.cancel ();
m_TerminationTimerV6.cancel ();
m_Service.stop ();
m_Socket.close ();
m_ServiceV6.stop ();
m_SocketV6.close ();
m_ReceiversService.stop ();
m_ReceiversServiceV6.stop ();
if (m_ReceiversThread)
m_ReceiversThread->join ();
delete m_ReceiversThread;
m_ReceiversThread = nullptr;
if (m_Thread)
m_Thread->join ();
delete m_Thread;
m_Thread = nullptr;
if (m_ReceiversThreadV6)
m_ReceiversThreadV6->join ();
delete m_ReceiversThreadV6;
m_ReceiversThreadV6 = nullptr;
if (m_ThreadV6)
m_ThreadV6->join ();
delete m_ThreadV6;
m_ThreadV6 = nullptr;
void SSUServer::Run ()
while (m_IsRunning)
m_Service.run ();
catch (std::exception& ex)
LogPrint (eLogError, "SSU: server runtime exception: ", ex.what ());
void SSUServer::RunV6 ()
while (m_IsRunning)
m_ServiceV6.run ();
catch (std::exception& ex)
LogPrint (eLogError, "SSU: v6 server runtime exception: ", ex.what ());
void SSUServer::RunReceivers ()
while (m_IsRunning)
m_ReceiversService.run ();
catch (std::exception& ex)
LogPrint (eLogError, "SSU: receivers runtime exception: ", ex.what ());
if (m_IsRunning)
// restart socket
m_Socket.close ();
OpenSocket ();
Receive ();
void SSUServer::RunReceiversV6 ()
while (m_IsRunning)
m_ReceiversServiceV6.run ();
catch (std::exception& ex)
LogPrint (eLogError, "SSU: v6 receivers runtime exception: ", ex.what ());
if (m_IsRunning)
m_SocketV6.close ();
OpenSocketV6 ();
ReceiveV6 ();
void SSUServer::AddRelay (uint32_t tag, std::shared_ptr<SSUSession> relay)
m_Relays[tag] = relay;
void SSUServer::RemoveRelay (uint32_t tag)
m_Relays.erase (tag);
std::shared_ptr<SSUSession> SSUServer::FindRelaySession (uint32_t tag)
auto it = m_Relays.find (tag);
if (it != m_Relays.end ())
if (it->second->GetState () == eSessionStateEstablished)
return it->second;
m_Relays.erase (it);
return nullptr;
void SSUServer::Send (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& to)
boost::system::error_code ec;
if (to.protocol () == boost::asio::ip::udp::v4())
m_Socket.send_to (boost::asio::buffer (buf, len), to, 0, ec);
m_SocketV6.send_to (boost::asio::buffer (buf, len), to, 0, ec);
if (ec)
LogPrint (eLogError, "SSU: send exception: ", ec.message (), " while trying to send data to ", to.address (), ":", to.port (), " (length: ", len, ")");
void SSUServer::Receive ()
SSUPacket * packet = new SSUPacket ();
m_Socket.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from,
std::bind (&SSUServer::HandleReceivedFrom, this, std::placeholders::_1, std::placeholders::_2, packet));
void SSUServer::ReceiveV6 ()
SSUPacket * packet = new SSUPacket ();
m_SocketV6.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from,
std::bind (&SSUServer::HandleReceivedFromV6, this, std::placeholders::_1, std::placeholders::_2, packet));
void SSUServer::HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet)
if (!ecode ||
ecode == boost::asio::error::connection_refused ||
ecode == boost::asio::error::connection_reset ||
ecode == boost::asio::error::network_unreachable ||
ecode == boost::asio::error::host_unreachable)
// just try continue reading when received ICMP response otherwise socket can crash,
// but better to find out which host were sent it and mark that router as unreachable
packet->len = bytes_transferred;
std::vector<SSUPacket *> packets;
packets.push_back (packet);
boost::system::error_code ec;
size_t moreBytes = m_Socket.available(ec);
if (!ec)
while (moreBytes && packets.size () < 25)
packet = new SSUPacket ();
packet->len = m_Socket.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from, 0, ec);
if (!ec)
packets.push_back (packet);
moreBytes = m_Socket.available(ec);
if (ec) break;
LogPrint (eLogError, "SSU: receive_from error: code ", ec.value(), ": ", ec.message ());
delete packet;
m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_Sessions));
Receive ();
delete packet;
if (ecode != boost::asio::error::operation_aborted)
LogPrint (eLogError, "SSU: receive error: code ", ecode.value(), ": ", ecode.message ());
m_Socket.close ();
OpenSocket ();
Receive ();
void SSUServer::HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet)
if (!ecode ||
ecode == boost::asio::error::connection_refused ||
ecode == boost::asio::error::connection_reset ||
ecode == boost::asio::error::network_unreachable ||
ecode == boost::asio::error::host_unreachable)
// just try continue reading when received ICMP response otherwise socket can crash,
// but better to find out which host were sent it and mark that router as unreachable
packet->len = bytes_transferred;
std::vector<SSUPacket *> packets;
packets.push_back (packet);
boost::system::error_code ec;
size_t moreBytes = m_SocketV6.available (ec);
if (!ec)
while (moreBytes && packets.size () < 25)
packet = new SSUPacket ();
packet->len = m_SocketV6.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from, 0, ec);
if (!ec)
packets.push_back (packet);
moreBytes = m_SocketV6.available(ec);
if (ec) break;
LogPrint (eLogError, "SSU: v6 receive_from error: code ", ec.value(), ": ", ec.message ());
delete packet;
m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_SessionsV6));
ReceiveV6 ();
delete packet;
if (ecode != boost::asio::error::operation_aborted)
LogPrint (eLogError, "SSU: v6 receive error: code ", ecode.value(), ": ", ecode.message ());
m_SocketV6.close ();
OpenSocketV6 ();
ReceiveV6 ();
void SSUServer::HandleReceivedPackets (std::vector<SSUPacket *> packets,
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > * sessions)
std::shared_ptr<SSUSession> session;
for (auto& packet: packets)
if (!session || session->GetRemoteEndpoint () != packet->from) // we received packet for other session than previous
if (session)
session->FlushData ();
session = nullptr;
auto it = sessions->find (packet->from);
if (it != sessions->end ())
session = it->second;
if (!session)
session = std::make_shared<SSUSession> (*this, packet->from);
session->WaitForConnect ();
(*sessions)[packet->from] = session;
LogPrint (eLogDebug, "SSU: new session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created");
session->ProcessNextMessage (packet->buf, packet->len, packet->from);
catch (std::exception& ex)
LogPrint (eLogError, "SSU: HandleReceivedPackets ", ex.what ());
if (session) session->FlushData ();
session = nullptr;
delete packet;
if (session) session->FlushData ();
std::shared_ptr<SSUSession> SSUServer::FindSession (std::shared_ptr<const i2p::data::RouterInfo> router) const
if (!router) return nullptr;
auto address = router->GetSSUAddress (true); // v4 only
if (!address) return nullptr;
auto session = FindSession (boost::asio::ip::udp::endpoint (address->host, address->port));
if (session || !context.SupportsV6 ())
return session;
// try v6
address = router->GetSSUV6Address ();
if (!address) return nullptr;
return FindSession (boost::asio::ip::udp::endpoint (address->host, address->port));
std::shared_ptr<SSUSession> SSUServer::FindSession (const boost::asio::ip::udp::endpoint& e) const
auto& sessions = e.address ().is_v6 () ? m_SessionsV6 : m_Sessions;
auto it = sessions.find (e);
if (it != sessions.end ())
return it->second;
return nullptr;
void SSUServer::CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest, bool v4only)
auto address = router->GetSSUAddress (v4only || !context.SupportsV6 ());
if (address)
CreateSession (router, address->host, address->port, peerTest);
LogPrint (eLogWarning, "SSU: Router ", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), " doesn't have SSU address");
void SSUServer::CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router,
const boost::asio::ip::address& addr, int port, bool peerTest)
if (router)
if (router->UsesIntroducer ())
m_Service.post (std::bind (&SSUServer::CreateSessionThroughIntroducer, this, router, peerTest)); // always V4 thread
boost::asio::ip::udp::endpoint remoteEndpoint (addr, port);
auto& s = addr.is_v6 () ? m_ServiceV6 : m_Service;
s.post (std::bind (&SSUServer::CreateDirectSession, this, router, remoteEndpoint, peerTest));
void SSUServer::CreateDirectSession (std::shared_ptr<const i2p::data::RouterInfo> router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest)
auto& sessions = remoteEndpoint.address ().is_v6 () ? m_SessionsV6 : m_Sessions;
auto it = sessions.find (remoteEndpoint);
if (it != sessions.end ())
auto session = it->second;
if (peerTest && session->GetState () == eSessionStateEstablished)
session->SendPeerTest ();
// otherwise create new session
auto session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest);
sessions[remoteEndpoint] = session;
// connect
LogPrint (eLogDebug, "SSU: Creating new session to [", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), "] ",
remoteEndpoint.address ().to_string (), ":", remoteEndpoint.port ());
session->Connect ();
void SSUServer::CreateSessionThroughIntroducer (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest)
if (router && router->UsesIntroducer ())
auto address = router->GetSSUAddress (true); // v4 only for now
if (address)
boost::asio::ip::udp::endpoint remoteEndpoint (address->host, address->port);
auto it = m_Sessions.find (remoteEndpoint);
// check if session is presented already
if (it != m_Sessions.end ())
auto session = it->second;
if (peerTest && session->GetState () == eSessionStateEstablished)
session->SendPeerTest ();
// create new session
int numIntroducers = address->ssu->introducers.size ();
if (numIntroducers > 0)
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
std::shared_ptr<SSUSession> introducerSession;
const i2p::data::RouterInfo::Introducer * introducer = nullptr;
// we might have a session to introducer already
for (int i = 0; i < numIntroducers; i++)
auto intr = &(address->ssu->introducers[i]);
if (intr->iExp > 0 && ts > intr->iExp) continue; // skip expired introducer
boost::asio::ip::udp::endpoint ep (intr->iHost, intr->iPort);
if (ep.address ().is_v4 ()) // ipv4 only
if (!introducer) introducer = intr; // we pick first one for now
it = m_Sessions.find (ep);
if (it != m_Sessions.end ())
introducerSession = it->second;
if (!introducer)
LogPrint (eLogWarning, "SSU: Can't connect to unreachable router and no ipv4 non-expired introducers presented");
if (introducerSession) // session found
LogPrint (eLogWarning, "SSU: Session to introducer already exists");
else // create new
LogPrint (eLogDebug, "SSU: Creating new session to introducer ", introducer->iHost);
boost::asio::ip::udp::endpoint introducerEndpoint (introducer->iHost, introducer->iPort);
introducerSession = std::make_shared<SSUSession> (*this, introducerEndpoint, router);
m_Sessions[introducerEndpoint] = introducerSession;
#if BOOST_VERSION >= 104900
if (!address->host.is_unspecified () && address->port)
// create session
auto session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest);
m_Sessions[remoteEndpoint] = session;
// introduce
LogPrint (eLogInfo, "SSU: Introduce new session to [", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()),
"] through introducer ", introducer->iHost, ":", introducer->iPort);
session->WaitForIntroduction ();
if (i2p::context.GetRouterInfo ().UsesIntroducer ()) // if we are unreachable
uint8_t buf[1];
Send (buf, 0, remoteEndpoint); // send HolePunch
introducerSession->Introduce (*introducer, router);
LogPrint (eLogWarning, "SSU: Can't connect to unreachable router and no introducers present");
LogPrint (eLogWarning, "SSU: Router ", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), " doesn't have SSU address");
void SSUServer::DeleteSession (std::shared_ptr<SSUSession> session)
if (session)
session->Close ();
auto& ep = session->GetRemoteEndpoint ();
if (ep.address ().is_v6 ())
m_SessionsV6.erase (ep);
m_Sessions.erase (ep);
void SSUServer::DeleteAllSessions ()
for (auto& it: m_Sessions)
it.second->Close ();
m_Sessions.clear ();
for (auto& it: m_SessionsV6)
it.second->Close ();
m_SessionsV6.clear ();
template<typename Filter>
std::shared_ptr<SSUSession> SSUServer::GetRandomV4Session (Filter filter) // v4 only
std::vector<std::shared_ptr<SSUSession> > filteredSessions;
for (const auto& s :m_Sessions)
if (filter (s.second)) filteredSessions.push_back (s.second);
if (filteredSessions.size () > 0)
auto ind = rand () % filteredSessions.size ();
return filteredSessions[ind];
return nullptr;
std::shared_ptr<SSUSession> SSUServer::GetRandomEstablishedV4Session (std::shared_ptr<const SSUSession> excluded) // v4 only
return GetRandomV4Session (
[excluded](std::shared_ptr<SSUSession> session)->bool
return session->GetState () == eSessionStateEstablished && session != excluded;
template<typename Filter>
std::shared_ptr<SSUSession> SSUServer::GetRandomV6Session (Filter filter) // v6 only
std::vector<std::shared_ptr<SSUSession> > filteredSessions;
for (const auto& s :m_SessionsV6)
if (filter (s.second)) filteredSessions.push_back (s.second);
if (filteredSessions.size () > 0)
auto ind = rand () % filteredSessions.size ();
return filteredSessions[ind];
return nullptr;
std::shared_ptr<SSUSession> SSUServer::GetRandomEstablishedV6Session (std::shared_ptr<const SSUSession> excluded) // v6 only
return GetRandomV6Session (
[excluded](std::shared_ptr<SSUSession> session)->bool
return session->GetState () == eSessionStateEstablished && session != excluded;
std::set<SSUSession *> SSUServer::FindIntroducers (int maxNumIntroducers)
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
std::set<SSUSession *> ret;
for (int i = 0; i < maxNumIntroducers; i++)
auto session = GetRandomV4Session (
[&ret, ts](std::shared_ptr<SSUSession> session)->bool
return session->GetRelayTag () && !ret.count (session.get ()) &&
session->GetState () == eSessionStateEstablished &&
ts < session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_DURATION;
if (session)
ret.insert (session.get ());
return ret;
void SSUServer::ScheduleIntroducersUpdateTimer ()
m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds(SSU_KEEP_ALIVE_INTERVAL));
m_IntroducersUpdateTimer.async_wait (std::bind (&SSUServer::HandleIntroducersUpdateTimer,
this, std::placeholders::_1));
void SSUServer::HandleIntroducersUpdateTimer (const boost::system::error_code& ecode)
if (ecode != boost::asio::error::operation_aborted)
// timeout expired
if (i2p::context.GetStatus () == eRouterStatusTesting)
// we still don't know if we need introducers
ScheduleIntroducersUpdateTimer ();
if (i2p::context.GetStatus () == eRouterStatusOK) return; // we don't need introducers anymore
// we are firewalled
if (!i2p::context.IsUnreachable ()) i2p::context.SetUnreachable ();
std::list<boost::asio::ip::udp::endpoint> newList;
size_t numIntroducers = 0;
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
for (const auto& it : m_Introducers)
auto session = FindSession (it);
if (session && ts < session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_DURATION)
session->SendKeepAlive ();
newList.push_back (it);
i2p::context.RemoveIntroducer (it);
if (numIntroducers < SSU_MAX_NUM_INTRODUCERS)
// create new
auto introducers = FindIntroducers (SSU_MAX_NUM_INTRODUCERS);
for (const auto& it1: introducers)
const auto& ep = it1->GetRemoteEndpoint ();
i2p::data::RouterInfo::Introducer introducer;
introducer.iHost = ep.address ();
introducer.iPort = ep.port ();
introducer.iTag = it1->GetRelayTag ();
introducer.iKey = it1->GetIntroKey ();
if (i2p::context.AddIntroducer (introducer))
newList.push_back (ep);
if (newList.size () >= SSU_MAX_NUM_INTRODUCERS) break;
m_Introducers = newList;
if (m_Introducers.size () < SSU_MAX_NUM_INTRODUCERS)
auto introducer = i2p::data::netdb.GetRandomIntroducer ();
if (introducer)
CreateSession (introducer);
ScheduleIntroducersUpdateTimer ();
void SSUServer::NewPeerTest (uint32_t nonce, PeerTestParticipant role, std::shared_ptr<SSUSession> session)
m_PeerTests[nonce] = { i2p::util::GetMillisecondsSinceEpoch (), role, session };
PeerTestParticipant SSUServer::GetPeerTestParticipant (uint32_t nonce)
auto it = m_PeerTests.find (nonce);
if (it != m_PeerTests.end ())
return it->second.role;
return ePeerTestParticipantUnknown;
std::shared_ptr<SSUSession> SSUServer::GetPeerTestSession (uint32_t nonce)
auto it = m_PeerTests.find (nonce);
if (it != m_PeerTests.end ())
return it->second.session;
return nullptr;
void SSUServer::UpdatePeerTest (uint32_t nonce, PeerTestParticipant role)
auto it = m_PeerTests.find (nonce);
if (it != m_PeerTests.end ())
it->second.role = role;
void SSUServer::RemovePeerTest (uint32_t nonce)
m_PeerTests.erase (nonce);
void SSUServer::SchedulePeerTestsCleanupTimer ()
m_PeerTestsCleanupTimer.expires_from_now (boost::posix_time::seconds(SSU_PEER_TEST_TIMEOUT));
m_PeerTestsCleanupTimer.async_wait (std::bind (&SSUServer::HandlePeerTestsCleanupTimer,
this, std::placeholders::_1));
void SSUServer::HandlePeerTestsCleanupTimer (const boost::system::error_code& ecode)
if (ecode != boost::asio::error::operation_aborted)
int numDeleted = 0;
uint64_t ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it = m_PeerTests.begin (); it != m_PeerTests.end ();)
if (ts > it->second.creationTime + SSU_PEER_TEST_TIMEOUT*1000LL)
it = m_PeerTests.erase (it);
if (numDeleted > 0)
LogPrint (eLogDebug, "SSU: ", numDeleted, " peer tests have been expired");
SchedulePeerTestsCleanupTimer ();
void SSUServer::ScheduleTermination ()
m_TerminationTimer.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_CHECK_TIMEOUT));
m_TerminationTimer.async_wait (std::bind (&SSUServer::HandleTerminationTimer,
this, std::placeholders::_1));
void SSUServer::HandleTerminationTimer (const boost::system::error_code& ecode)
if (ecode != boost::asio::error::operation_aborted)
auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto& it: m_Sessions)
if (it.second->IsTerminationTimeoutExpired (ts))
auto session = it.second;
if (it.first != session->GetRemoteEndpoint ())
LogPrint (eLogWarning, "SSU: remote endpoint ", session->GetRemoteEndpoint (), " doesn't match key ", it.first, " adjusted");
m_Service.post ([session]
LogPrint (eLogWarning, "SSU: no activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds");
session->Failed ();
ScheduleTermination ();
void SSUServer::ScheduleTerminationV6 ()
m_TerminationTimerV6.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_CHECK_TIMEOUT));
m_TerminationTimerV6.async_wait (std::bind (&SSUServer::HandleTerminationTimerV6,
this, std::placeholders::_1));
void SSUServer::HandleTerminationTimerV6 (const boost::system::error_code& ecode)
if (ecode != boost::asio::error::operation_aborted)
auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto& it: m_SessionsV6)
if (it.second->IsTerminationTimeoutExpired (ts))
auto session = it.second;
if (it.first != session->GetRemoteEndpoint ())
LogPrint (eLogWarning, "SSU: remote endpoint ", session->GetRemoteEndpoint (), " doesn't match key ", it.first);
m_ServiceV6.post ([session]
LogPrint (eLogWarning, "SSU: no activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds");
session->Failed ();
ScheduleTerminationV6 ();