mirror of
synced 2025-03-22 17:10:32 +01:00
Compare commits
No commits in common. "e695f1e060f5560676592ca8fc3db6eb259b9254" and "ecc635e9bce5deb48c3ff3fa37a50e172b0e7134" have entirely different histories.
34 changed files with 676 additions and 1038 deletions
@ -45,29 +45,15 @@ namespace client
i2pcp_crt = i2p::fs::DataDirPath(i2pcp_crt);
if (i2pcp_key.at(0) != '/')
i2pcp_key = i2p::fs::DataDirPath(i2pcp_key);
if (!i2p::fs::Exists (i2pcp_crt) || !i2p::fs::Exists (i2pcp_key))
if (!i2p::fs::Exists (i2pcp_crt) || !i2p::fs::Exists (i2pcp_key)) {
LogPrint (eLogInfo, "I2PControl: Creating new certificate for control connection");
CreateCertificate (i2pcp_crt.c_str(), i2pcp_key.c_str());
} else {
LogPrint(eLogDebug, "I2PControl: Using cert from ", i2pcp_crt);
m_SSLContext.set_options (boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::no_sslv2 | boost::asio::ssl::context::single_dh_use);
boost::system::error_code ec;
m_SSLContext.use_certificate_file (i2pcp_crt, boost::asio::ssl::context::pem, ec);
if (!ec)
m_SSLContext.use_private_key_file (i2pcp_key, boost::asio::ssl::context::pem, ec);
if (ec)
LogPrint (eLogInfo, "I2PControl: Failed to load ceritifcate: ", ec.message (), ". Recreating");
CreateCertificate (i2pcp_crt.c_str(), i2pcp_key.c_str());
m_SSLContext.use_certificate_file (i2pcp_crt, boost::asio::ssl::context::pem, ec);
if (!ec)
m_SSLContext.use_private_key_file (i2pcp_key, boost::asio::ssl::context::pem, ec);
if (ec)
// give up
LogPrint (eLogError, "I2PControl: Can't load certificates");
m_SSLContext.set_options (boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::no_sslv2 | boost::asio::ssl::context::single_dh_use);
m_SSLContext.use_certificate_file (i2pcp_crt, boost::asio::ssl::context::pem);
m_SSLContext.use_private_key_file (i2pcp_key, boost::asio::ssl::context::pem);
// handlers
m_MethodHandlers["Authenticate"] = &I2PControlService::AuthenticateHandler;
@ -417,7 +403,7 @@ namespace client
X509_NAME_add_entry_by_txt (name, "O", MBSTRING_ASC, (unsigned char *)I2P_CONTROL_CERTIFICATE_ORGANIZATION, -1, -1, 0); // organization
X509_NAME_add_entry_by_txt (name, "CN", MBSTRING_ASC, (unsigned char *)I2P_CONTROL_CERTIFICATE_COMMON_NAME, -1, -1, 0); // common name
X509_set_issuer_name (x509, name); // set issuer to ourselves
X509_sign (x509, pkey, EVP_sha1 ()); // sign, last param must be NULL for EdDSA
X509_sign (x509, pkey, EVP_sha1 ()); // sign
// save cert
if ((f = fopen (crt_path, "wb")) != NULL) {
@ -1,5 +1,5 @@
* Copyright (c) 2013-2024, The PurpleI2P Project
* Copyright (c) 2013-2020, The PurpleI2P Project
* This file is part of Purple i2pd project and licensed under BSD3
@ -25,7 +25,6 @@
#include "RouterContext.h"
#include "ClientContext.h"
#include "Transports.h"
#include "util.h"
void handle_signal(int sig)
@ -221,7 +220,6 @@ namespace i2p
void DaemonLinux::run ()
i2p::util::SetThreadName ("i2pd-daemon");
while (running)
std::this_thread::sleep_for (std::chrono::seconds(1));
@ -997,7 +997,7 @@ namespace crypto
std::vector<uint8_t> m(msgLen + 16);
if (msg == buf)
@ -588,11 +588,8 @@ namespace client
i2p::garlic::GarlicDestination::HandleDeliveryStatusMessage (msgID);
void LeaseSetDestination::SetLeaseSetUpdated (bool post)
void LeaseSetDestination::SetLeaseSetUpdated ()
if (post)
m_Service.post([s = shared_from_this ()]() { s->UpdateLeaseSet (); });
UpdateLeaseSet ();
@ -142,15 +142,15 @@ namespace client
void CancelDestinationRequestWithEncryptedLeaseSet (std::shared_ptr<const i2p::data::BlindedPublicKey> dest, bool notify = true);
// implements GarlicDestination
std::shared_ptr<const i2p::data::LocalLeaseSet> GetLeaseSet () override;
std::shared_ptr<i2p::tunnel::TunnelPool> GetTunnelPool () const override { return m_Pool; }
std::shared_ptr<const i2p::data::LocalLeaseSet> GetLeaseSet ();
std::shared_ptr<i2p::tunnel::TunnelPool> GetTunnelPool () const { return m_Pool; }
// override GarlicDestination
bool SubmitSessionKey (const uint8_t * key, const uint8_t * tag) override;
void SubmitECIESx25519Key (const uint8_t * key, uint64_t tag) override;
void ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg) override;
void ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg) override;
void SetLeaseSetUpdated (bool post) override;
bool SubmitSessionKey (const uint8_t * key, const uint8_t * tag);
void SubmitECIESx25519Key (const uint8_t * key, uint64_t tag);
void ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg);
void ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg);
void SetLeaseSetUpdated ();
bool IsPublic () const { return m_IsPublic; };
void SetPublic (bool pub) { m_IsPublic = pub; };
@ -158,8 +158,8 @@ namespace client
// implements GarlicDestination
void HandleI2NPMessage (const uint8_t * buf, size_t len) override;
bool HandleCloveI2NPMessage (I2NPMessageType typeID, const uint8_t * payload, size_t len, uint32_t msgID) override;
void HandleI2NPMessage (const uint8_t * buf, size_t len);
bool HandleCloveI2NPMessage (I2NPMessageType typeID, const uint8_t * payload, size_t len, uint32_t msgID);
void SetLeaseSet (std::shared_ptr<const i2p::data::LocalLeaseSet> newLeaseSet);
int GetLeaseSetType () const { return m_LeaseSetType; };
@ -1,5 +1,5 @@
* Copyright (c) 2013-2024, The PurpleI2P Project
* Copyright (c) 2013-2023, The PurpleI2P Project
* This file is part of Purple i2pd project and licensed under BSD3
@ -7,6 +7,7 @@
#include <string.h>
#include <openssl/evp.h>
#include <openssl/ssl.h>
#include "Crypto.h"
#include "FS.h"
@ -24,8 +25,6 @@ namespace data
Families::~Families ()
for (auto it : m_SigningKeys)
if (it.second.first) EVP_PKEY_free (it.second.first);
void Families::LoadCertificate (const std::string& filename)
@ -48,16 +47,48 @@ namespace data
cn += 3;
char * family = strstr (cn, ".family");
if (family) family[0] = 0;
auto pkey = X509_get_pubkey (cert);
if (pkey)
int keyType = EVP_PKEY_base_id (pkey);
switch (keyType)
if (!m_SigningKeys.emplace (cn, std::make_pair(pkey, (int)m_SigningKeys.size () + 1)).second)
// TODO:
EC_KEY * ecKey = EVP_PKEY_get1_EC_KEY (pkey);
if (ecKey)
auto group = EC_KEY_get0_group (ecKey);
if (group)
int curve = EC_GROUP_get_curve_name (group);
if (curve == NID_X9_62_prime256v1)
uint8_t signingKey[64];
BIGNUM * x = BN_new(), * y = BN_new();
EC_POINT_get_affine_coordinates_GFp (group,
EC_KEY_get0_public_key (ecKey), x, y, NULL);
i2p::crypto::bn2buf (x, signingKey, 32);
i2p::crypto::bn2buf (y, signingKey + 32, 32);
BN_free (x); BN_free (y);
verifier = std::make_shared<i2p::crypto::ECDSAP256Verifier>();
verifier->SetPublicKey (signingKey);
LogPrint (eLogWarning, "Family: elliptic curve ", curve, " is not supported");
EC_KEY_free (ecKey);
LogPrint (eLogWarning, "Family: Certificate key type ", keyType, " is not supported");
EVP_PKEY_free (pkey);
LogPrint (eLogError, "Family: Duplicated family name ", cn);
if (verifier && cn)
m_SigningKeys.emplace (cn, std::make_pair(verifier, (int)m_SigningKeys.size () + 1));
SSL_free (ssl);
@ -99,22 +130,14 @@ namespace data
LogPrint (eLogError, "Family: ", family, " is too long");
return false;
auto it = m_SigningKeys.find (family);
if (it != m_SigningKeys.end () && it->second.first)
memcpy (buf, family.c_str (), len);
memcpy (buf + len, (const uint8_t *)ident, 32);
len += 32;
auto signatureBufLen = Base64ToByteStream (signature, signatureLen, signatureBuf, 64);
if (signatureBufLen)
EVP_MD_CTX * ctx = EVP_MD_CTX_create ();
EVP_DigestVerifyInit (ctx, NULL, NULL, NULL, it->second.first);
auto ret = EVP_DigestVerify (ctx, signatureBuf, signatureBufLen, buf, len);
EVP_MD_CTX_destroy (ctx);
return ret;
Base64ToByteStream (signature, signatureLen, signatureBuf, 64);
auto it = m_SigningKeys.find (family);
if (it != m_SigningKeys.end ())
return it->second.first->Verify (buf, len, signatureBuf);
// TODO: process key
return true;
@ -1,5 +1,5 @@
* Copyright (c) 2013-2024, The PurpleI2P Project
* Copyright (c) 2013-2022, The PurpleI2P Project
* This file is part of Purple i2pd project and licensed under BSD3
@ -12,7 +12,7 @@
#include <map>
#include <string>
#include <memory>
#include <openssl/evp.h>
#include "Signature.h"
#include "Identity.h"
namespace i2p
@ -37,7 +37,7 @@ namespace data
std::map<std::string, std::pair<EVP_PKEY *, FamilyID> > m_SigningKeys; // family -> (verification pkey, id)
std::map<std::string, std::pair<std::shared_ptr<i2p::crypto::Verifier>, FamilyID> > m_SigningKeys; // family -> (verifier, id)
std::string CreateFamilySignature (const std::string& family, const IdentHash& ident);
@ -897,7 +897,7 @@ namespace garlic
void GarlicDestination::SetLeaseSetUpdated (bool post)
void GarlicDestination::SetLeaseSetUpdated ()
std::unique_lock<std::mutex> l(m_SessionsMutex);
@ -253,7 +253,7 @@ namespace garlic
virtual void ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg);
virtual void ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg);
virtual void SetLeaseSetUpdated (bool post = false);
virtual void SetLeaseSetUpdated ();
virtual std::shared_ptr<const i2p::data::LocalLeaseSet> GetLeaseSet () = 0; // TODO
virtual std::shared_ptr<i2p::tunnel::TunnelPool> GetTunnelPool () const = 0;
@ -396,26 +396,8 @@ namespace i2p
return false;
uint8_t retCode = 0;
// decide if we should accept tunnel
bool accept = i2p::context.AcceptsTunnels ();
if (accept)
auto congestionLevel = i2p::context.GetCongestionLevel (false);
if (congestionLevel >= CONGESTION_LEVEL_MEDIUM)
if (congestionLevel < CONGESTION_LEVEL_FULL)
// random reject depending on congestion level
if (congestionLevel > level)
accept = false;
accept = false;
// replace record to reply
if (accept)
if (i2p::context.AcceptsTunnels () && i2p::context.GetCongestionLevel (false) < CONGESTION_LEVEL_FULL)
auto transitTunnel = i2p::tunnel::CreateTransitTunnel (
@ -950,8 +932,14 @@ namespace i2p
void I2NPMessagesHandler::Flush ()
if (!m_TunnelMsgs.empty ())
i2p::tunnel::tunnels.PostTunnelData (m_TunnelMsgs);
m_TunnelMsgs.clear ();
if (!m_TunnelGatewayMsgs.empty ())
i2p::tunnel::tunnels.PostTunnelData (m_TunnelGatewayMsgs);
m_TunnelGatewayMsgs.clear ();
@ -13,7 +13,6 @@
#include <string.h>
#include <unordered_set>
#include <memory>
#include <list>
#include <functional>
#include "Crypto.h"
#include "I2PEndian.h"
@ -329,7 +328,7 @@ namespace tunnel
std::list<std::shared_ptr<I2NPMessage> > m_TunnelMsgs, m_TunnelGatewayMsgs;
std::vector<std::shared_ptr<I2NPMessage> > m_TunnelMsgs, m_TunnelGatewayMsgs;
@ -1822,7 +1822,7 @@ namespace transport
LogPrint(eLogError, "NTCP2: HTTP proxy write error ", ec.message());
auto readbuff = std::make_shared<boost::asio::streambuf>();
boost::asio::streambuf * readbuff = new boost::asio::streambuf;
boost::asio::async_read_until(conn->GetSocket(), *readbuff, "\r\n\r\n",
[readbuff, timer, conn] (const boost::system::error_code & ec, std::size_t transferred)
@ -1842,6 +1842,7 @@ namespace transport
delete readbuff;
@ -1851,6 +1852,7 @@ namespace transport
LogPrint(eLogError, "NTCP2: HTTP proxy gave malformed response");
delete readbuff;
@ -122,18 +122,16 @@ namespace data
uint64_t lastProfilesCleanup = i2p::util::GetMonotonicMilliseconds (), lastObsoleteProfilesCleanup = lastProfilesCleanup;
int16_t profilesCleanupVariance = 0, obsoleteProfilesCleanVariance = 0;
std::list<std::shared_ptr<const I2NPMessage> > msgs;
while (m_IsRunning)
if (m_Queue.Wait (1,0)) // 1 sec
auto msg = m_Queue.GetNextWithTimeout (1000); // 1 sec
if (msg)
m_Queue.GetWholeQueue (msgs);
while (!msgs.empty ())
int numMsgs = 0;
while (msg)
auto msg = msgs.front (); msgs.pop_front ();
if (!msg) continue;
LogPrint(eLogDebug, "NetDb: Got request with type ", (int) msg->GetTypeID ());
switch (msg->GetTypeID ())
@ -147,6 +145,9 @@ namespace data
LogPrint (eLogError, "NetDb: Unexpected message type ", (int) msg->GetTypeID ());
//i2p::HandleI2NPMessage (msg);
if (numMsgs > 100) break;
msg = m_Queue.Get ();
if (!m_IsRunning) break;
@ -638,74 +639,70 @@ namespace data
if (checkForExpiration && uptime > i2p::transport::SSU2_TO_INTRODUCER_SESSION_DURATION) // 1 hour
expirationTimeout = i2p::context.IsFloodfill () ? NETDB_FLOODFILL_EXPIRATION_TIMEOUT*1000LL :
bool isOffline = checkForExpiration && i2p::transport::transports.GetNumPeers () < NETDB_MIN_TRANSPORTS; // enough routers and uptime, but no tranports
std::list<std::pair<std::string, std::shared_ptr<RouterInfo::Buffer> > > saveToDisk;
std::list<std::string> removeFromDisk;
auto own = i2p::context.GetSharedRouterInfo ();
for (auto [ident, r]: m_RouterInfos)
for (auto& it: m_RouterInfos)
if (!r || r == own) continue; // skip own
if (r->IsBufferScheduledToDelete ()) // from previous SaveUpdated, we assume m_PersistingRouters complete
if (!it.second || it.second == own) continue; // skip own
std::string ident = it.second->GetIdentHashBase64();
if (it.second->IsUpdated ())
std::lock_guard<std::mutex> l(m_RouterInfosMutex); // possible collision between DeleteBuffer and Update
r->DeleteBuffer ();
if (r->IsUpdated ())
if (r->GetBuffer () && !r->IsUnreachable ())
if (it.second->GetBuffer ())
// we have something to save
std::shared_ptr<RouterInfo::Buffer> buffer;
std::lock_guard<std::mutex> l(m_RouterInfosMutex); // possible collision between DeleteBuffer and Update
buffer = r->CopyBuffer ();
r->ScheduleBufferToDelete ();
buffer = it.second->GetSharedBuffer ();
it.second->DeleteBuffer ();
if (buffer)
saveToDisk.push_back(std::make_pair(ident.ToBase64 (), buffer));
if (buffer && !it.second->IsUnreachable ()) // don't save bad router
saveToDisk.push_back(std::make_pair(ident, buffer));
it.second->SetUnreachable (false);
r->SetUpdated (false);
it.second->SetUpdated (false);
if (r->GetProfile ()->IsUnreachable ())
r->SetUnreachable (true);
if (it.second->GetProfile ()->IsUnreachable ())
it.second->SetUnreachable (true);
// make router reachable back if too few routers or floodfills
if (r->IsUnreachable () && (total - deletedCount < NETDB_MIN_ROUTERS || isLowRate || isOffline ||
(r->IsFloodfill () && totalFloodfills - deletedFloodfillsCount < NETDB_MIN_FLOODFILLS)))
r->SetUnreachable (false);
if (!r->IsUnreachable ())
if (it.second->IsUnreachable () && (total - deletedCount < NETDB_MIN_ROUTERS || isLowRate ||
(it.second->IsFloodfill () && totalFloodfills - deletedFloodfillsCount < NETDB_MIN_FLOODFILLS)))
it.second->SetUnreachable (false);
if (!it.second->IsUnreachable ())
// find & mark expired routers
if (!r->GetCompatibleTransports (true)) // non reachable by any transport
r->SetUnreachable (true);
else if (ts + NETDB_EXPIRATION_TIMEOUT_THRESHOLD*1000LL < r->GetTimestamp ())
if (!it.second->GetCompatibleTransports (true)) // non reachable by any transport
it.second->SetUnreachable (true);
else if (ts + NETDB_EXPIRATION_TIMEOUT_THRESHOLD*1000LL < it.second->GetTimestamp ())
LogPrint (eLogWarning, "NetDb: RouterInfo is from future for ", (r->GetTimestamp () - ts)/1000LL, " seconds");
r->SetUnreachable (true);
LogPrint (eLogWarning, "NetDb: RouterInfo is from future for ", (it.second->GetTimestamp () - ts)/1000LL, " seconds");
it.second->SetUnreachable (true);
else if (checkForExpiration)
if (ts > r->GetTimestamp () + expirationTimeout)
r->SetUnreachable (true);
else if ((ts > r->GetTimestamp () + expirationTimeout/2) && // more than half of expiration
total > NETDB_NUM_ROUTERS_THRESHOLD && !r->IsHighBandwidth() && // low bandwidth
!r->IsFloodfill() && (!i2p::context.IsFloodfill () || // non floodfill
(CreateRoutingKey (ident) ^ i2p::context.GetIdentHash ()).metric[0] >= 0x02)) // different first 7 bits
r->SetUnreachable (true);
if (ts > it.second->GetTimestamp () + expirationTimeout)
it.second->SetUnreachable (true);
else if ((ts > it.second->GetTimestamp () + expirationTimeout/2) && // more than half of expiration
total > NETDB_NUM_ROUTERS_THRESHOLD && !it.second->IsHighBandwidth() && // low bandwidth
!it.second->IsFloodfill() && (!i2p::context.IsFloodfill () || // non floodfill
(CreateRoutingKey (it.second->GetIdentHash ()) ^ i2p::context.GetIdentHash ()).metric[0] >= 0x02)) // different first 7 bits
it.second->SetUnreachable (true);
// make router reachable back if connected now
if (r->IsUnreachable () && i2p::transport::transports.IsConnected (ident))
r->SetUnreachable (false);
if (it.second->IsUnreachable () && i2p::transport::transports.IsConnected (it.second->GetIdentHash ()))
it.second->SetUnreachable (false);
if (r->IsUnreachable ())
if (it.second->IsUnreachable ())
if (r->IsFloodfill ()) deletedFloodfillsCount++;
if (it.second->IsFloodfill ()) deletedFloodfillsCount++;
// delete RI file
removeFromDisk.push_back (ident.ToBase64());
removeFromDisk.push_back (ident);
if (total - deletedCount < NETDB_MIN_ROUTERS) checkForExpiration = false;
@ -39,7 +39,6 @@ namespace data
const int NETDB_MIN_ROUTERS = 90;
const int NETDB_MIN_TRANSPORTS = 10 ; // otherwise assume offline
@ -11,7 +11,6 @@
#include <memory>
#include <future>
#include <boost/asio.hpp>
#include "Identity.h"
namespace i2p
@ -69,11 +68,6 @@ namespace data
bool IsUseful() const;
bool IsDuplicated () const { return m_IsDuplicated; };
const boost::asio::ip::udp::endpoint& GetLastEndpoint () const { return m_LastEndpoint; }
void SetLastEndpoint (const boost::asio::ip::udp::endpoint& ep) { m_LastEndpoint = ep; }
bool HasLastEndpoint (bool v4) const { return !m_LastEndpoint.address ().is_unspecified () && m_LastEndpoint.port () &&
((v4 && m_LastEndpoint.address ().is_v4 ()) || (!v4 && m_LastEndpoint.address ().is_v6 ())); }
void UpdateTime ();
@ -96,8 +90,6 @@ namespace data
uint32_t m_NumTimesRejected;
bool m_HasConnected; // successful trusted(incoming or NTCP2) connection
bool m_IsDuplicated;
// connectivity
boost::asio::ip::udp::endpoint m_LastEndpoint; // SSU2 for non-published addresses
std::shared_ptr<RouterProfile> GetRouterProfile (const IdentHash& identHash);
@ -1,5 +1,5 @@
* Copyright (c) 2013-2024, The PurpleI2P Project
* Copyright (c) 2013-2020, The PurpleI2P Project
* This file is part of Purple i2pd project and licensed under BSD3
@ -9,7 +9,8 @@
#ifndef QUEUE_H__
#define QUEUE_H__
#include <list>
#include <queue>
#include <vector>
#include <mutex>
#include <thread>
#include <condition_variable>
@ -28,16 +29,18 @@ namespace util
void Put (Element e)
std::unique_lock<std::mutex> l(m_QueueMutex);
m_Queue.push_back (std::move(e));
m_Queue.push (std::move(e));
m_NonEmpty.notify_one ();
void Put (std::list<Element>& list)
template<template<typename, typename...>class Container, typename... R>
void Put (const Container<Element, R...>& vec)
if (!list.empty ())
if (!vec.empty ())
std::unique_lock<std::mutex> l(m_QueueMutex);
m_Queue.splice (m_Queue.end (), list);
for (const auto& it: vec)
m_Queue.push (std::move(it));
m_NonEmpty.notify_one ();
@ -104,19 +107,6 @@ namespace util
return GetNonThreadSafe (true);
void GetWholeQueue (std::list<Element>& queue)
if (!queue.empty ())
std::list<Element> newQueue;
queue.swap (newQueue);
std::unique_lock<std::mutex> l(m_QueueMutex);
m_Queue.swap (queue);
Element GetNonThreadSafe (bool peek = false)
@ -125,7 +115,7 @@ namespace util
auto el = m_Queue.front ();
if (!peek)
m_Queue.pop_front ();
m_Queue.pop ();
return el;
return nullptr;
@ -133,7 +123,7 @@ namespace util
std::list<Element> m_Queue;
std::queue<Element> m_Queue;
std::mutex m_QueueMutex;
std::condition_variable m_NonEmpty;
@ -1489,7 +1489,7 @@ namespace i2p
void RouterContext::UpdateCongestion ()
auto c = i2p::data::RouterInfo::eLowCongestion;
if (!AcceptsTunnels () || !m_ShareRatio || (m_Error == eRouterErrorSymmetricNAT && !SupportsV6 () && !SupportsMesh ()))
if (!AcceptsTunnels () || !m_ShareRatio)
c = i2p::data::RouterInfo::eRejectAll;
@ -146,6 +146,7 @@ namespace garlic
void SetNetID (int netID) { m_NetID = netID; };
bool DecryptTunnelBuildRecord (const uint8_t * encrypted, uint8_t * data);
bool DecryptTunnelShortRequestRecord (const uint8_t * encrypted, uint8_t * data);
void SubmitECIESx25519Key (const uint8_t * key, uint64_t tag);
void UpdatePort (int port); // called from Daemon
void UpdateAddress (const boost::asio::ip::address& host); // called from SSU2 or Daemon
@ -185,24 +186,24 @@ namespace garlic
void UpdateTimestamp (uint64_t ts); // in seconds, called from NetDb before publishing
// implements LocalDestination
std::shared_ptr<const i2p::data::IdentityEx> GetIdentity () const override{ return m_Keys.GetPublic (); };
bool Decrypt (const uint8_t * encrypted, uint8_t * data, i2p::data::CryptoKeyType preferredCrypto) const override;
void SetLeaseSetUpdated (bool post) override {};
std::shared_ptr<const i2p::data::IdentityEx> GetIdentity () const { return m_Keys.GetPublic (); };
bool Decrypt (const uint8_t * encrypted, uint8_t * data, i2p::data::CryptoKeyType preferredCrypto) const;
void Sign (const uint8_t * buf, int len, uint8_t * signature) const { m_Keys.Sign (buf, len, signature); };
void SetLeaseSetUpdated () {};
// implements GarlicDestination
std::shared_ptr<const i2p::data::LocalLeaseSet> GetLeaseSet () override { return nullptr; };
std::shared_ptr<i2p::tunnel::TunnelPool> GetTunnelPool () const override;
std::shared_ptr<const i2p::data::LocalLeaseSet> GetLeaseSet () { return nullptr; };
std::shared_ptr<i2p::tunnel::TunnelPool> GetTunnelPool () const;
// override GarlicDestination
void ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg) override;
void ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg) override;
void SubmitECIESx25519Key (const uint8_t * key, uint64_t tag) override;
void ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg);
void ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg);
// implements GarlicDestination
void HandleI2NPMessage (const uint8_t * buf, size_t len) override;
bool HandleCloveI2NPMessage (I2NPMessageType typeID, const uint8_t * payload, size_t len, uint32_t msgID) override;
void HandleI2NPMessage (const uint8_t * buf, size_t len);
bool HandleCloveI2NPMessage (I2NPMessageType typeID, const uint8_t * payload, size_t len, uint32_t msgID);
@ -215,7 +216,6 @@ namespace garlic
void UpdateSSU2Keys ();
bool Load ();
void SaveKeys ();
void Sign (const uint8_t * buf, int len, uint8_t * signature) const { m_Keys.Sign (buf, len, signature); };
uint16_t SelectRandomPort () const;
void PublishNTCP2Address (std::shared_ptr<i2p::data::RouterInfo::Address> address, int port, bool publish) const;
@ -45,9 +45,8 @@ namespace data
RouterInfo::RouterInfo (const std::string& fullPath):
m_FamilyID (0), m_IsUpdated (false), m_IsUnreachable (false), m_IsFloodfill (false),
m_IsBufferScheduledToDelete (false), m_SupportedTransports (0),
m_ReachableTransports (0), m_PublishedTransports (0), m_Caps (0), m_Version (0),
m_Congestion (eLowCongestion)
m_SupportedTransports (0),m_ReachableTransports (0), m_PublishedTransports (0),
m_Caps (0), m_Version (0), m_Congestion (eLowCongestion)
m_Addresses = AddressesPtr(new Addresses ()); // create empty list
m_Buffer = RouterInfo::NewBuffer (); // always RouterInfo's
@ -56,7 +55,7 @@ namespace data
RouterInfo::RouterInfo (std::shared_ptr<Buffer>&& buf, size_t len):
m_FamilyID (0), m_IsUpdated (true), m_IsUnreachable (false), m_IsFloodfill (false),
m_IsBufferScheduledToDelete (false), m_SupportedTransports (0), m_ReachableTransports (0), m_PublishedTransports (0),
m_SupportedTransports (0), m_ReachableTransports (0), m_PublishedTransports (0),
m_Caps (0), m_Version (0), m_Congestion (eLowCongestion)
if (len <= MAX_RI_BUFFER_SIZE)
@ -1141,7 +1140,6 @@ namespace data
if (len > m_Buffer->size ()) len = m_Buffer->size ();
memcpy (m_Buffer->data (), buf, len);
m_Buffer->SetBufferLen (len);
m_IsBufferScheduledToDelete = false;
std::shared_ptr<RouterInfo::Buffer> RouterInfo::CopyBuffer () const
@ -290,11 +290,9 @@ namespace data
const uint8_t * GetBuffer () const { return m_Buffer ? m_Buffer->data () : nullptr; };
const uint8_t * LoadBuffer (const std::string& fullPath); // load if necessary
size_t GetBufferLen () const { return m_Buffer ? m_Buffer->GetBufferLen () : 0; };
void DeleteBuffer () { m_Buffer = nullptr; m_IsBufferScheduledToDelete = false; };
void DeleteBuffer () { m_Buffer = nullptr; };
std::shared_ptr<Buffer> GetSharedBuffer () const { return m_Buffer; };
std::shared_ptr<Buffer> CopyBuffer () const;
void ScheduleBufferToDelete () { m_IsBufferScheduledToDelete = true; };
bool IsBufferScheduledToDelete () const { return m_IsBufferScheduledToDelete; };
bool IsUpdated () const { return m_IsUpdated; };
void SetUpdated (bool updated) { m_IsUpdated = updated; };
@ -356,7 +354,7 @@ namespace data
AddressesPtr m_Addresses;
bool m_IsUpdated, m_IsUnreachable, m_IsFloodfill, m_IsBufferScheduledToDelete;
bool m_IsUpdated, m_IsUnreachable, m_IsFloodfill;
CompatibleTransports m_SupportedTransports, m_ReachableTransports, m_PublishedTransports;
uint8_t m_Caps;
char m_BandwidthCap;
@ -157,9 +157,6 @@ namespace transport
m_IntroducersV6.clear ();
m_ConnectedRecently.clear ();
m_RequestedPeerTests.clear ();
m_PacketsPool.ReleaseMt (m_ReceivedPacketsQueue);
m_ReceivedPacketsQueue.clear ();
void SSU2Server::SetLocalAddress (const boost::asio::ip::address& localAddress)
@ -216,16 +213,15 @@ namespace transport
return ep.port ();
bool SSU2Server::IsConnectedRecently (const boost::asio::ip::udp::endpoint& ep, bool max)
bool SSU2Server::IsConnectedRecently (const boost::asio::ip::udp::endpoint& ep)
if (!ep.port () || ep.address ().is_unspecified ()) return false;
std::lock_guard<std::mutex> l(m_ConnectedRecentlyMutex);
auto it = m_ConnectedRecently.find (ep);
if (it != m_ConnectedRecently.end ())
if (i2p::util::GetSecondsSinceEpoch () <= it->second + (max ? SSU2_MAX_HOLE_PUNCH_EXPIRATION : SSU2_MIN_HOLE_PUNCH_EXPIRATION))
if (i2p::util::GetSecondsSinceEpoch () <= it->second + SSU2_HOLE_PUNCH_EXPIRATION)
return true;
else if (max)
m_ConnectedRecently.erase (it);
return false;
@ -234,8 +230,7 @@ namespace transport
void SSU2Server::AddConnectedRecently (const boost::asio::ip::udp::endpoint& ep, uint64_t ts)
if (!ep.port () || ep.address ().is_unspecified () ||
i2p::util::GetSecondsSinceEpoch () > ts + SSU2_MAX_HOLE_PUNCH_EXPIRATION) return;
std::lock_guard<std::mutex> l(m_ConnectedRecentlyMutex);
i2p::util::GetSecondsSinceEpoch () > ts + SSU2_HOLE_PUNCH_EXPIRATION) return;
auto [it, added] = m_ConnectedRecently.try_emplace (ep, ts);
if (!added && ts > it->second)
it->second = ts; // renew timestamp of existing endpoint
@ -374,9 +369,9 @@ namespace transport
size_t moreBytes = socket.available (ec);
if (!ec && moreBytes)
std::list<Packet *> packets;
packets.push_back (packet);
while (moreBytes && packets.size () < SSU2_MAX_NUM_PACKETS_PER_BATCH)
auto packets = m_PacketsArrayPool.AcquireMt ();
packets->AddPacket (packet);
while (moreBytes && packets->numPackets < SSU2_MAX_NUM_PACKETS_PER_BATCH)
packet = m_PacketsPool.AcquireMt ();
packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from, 0, ec);
@ -384,7 +379,13 @@ namespace transport
i2p::transport::transports.UpdateReceivedBytes (packet->len);
if (packet->len >= SSU2_MIN_RECEIVED_PACKET_SIZE)
packets.push_back (packet);
if (!packets->AddPacket (packet))
LogPrint (eLogError, "SSU2: Received packets array is full");
m_PacketsPool.ReleaseMt (packet);
else // drop too short packets
m_PacketsPool.ReleaseMt (packet);
moreBytes = socket.available(ec);
@ -397,10 +398,10 @@ namespace transport
InsertToReceivedPacketsQueue (packets);
GetService ().post (std::bind (&SSU2Server::HandleReceivedPackets, this, packets));
InsertToReceivedPacketsQueue (packet);
GetService ().post (std::bind (&SSU2Server::HandleReceivedPacket, this, packet));
Receive (socket);
@ -427,69 +428,50 @@ namespace transport
void SSU2Server::HandleReceivedPackets (std::list<Packet *>&& packets)
void SSU2Server::HandleReceivedPacket (Packet * packet)
if (packet)
if (packets.empty ()) return;
if (m_IsThroughProxy)
for (auto it: packets)
ProcessNextPacketFromProxy (it->buf, it->len);
ProcessNextPacketFromProxy (packet->buf, packet->len);
for (auto it: packets)
ProcessNextPacket (it->buf, it->len, it->from);
m_PacketsPool.ReleaseMt (packets);
ProcessNextPacket (packet->buf, packet->len, packet->from);
m_PacketsPool.ReleaseMt (packet);
if (m_LastSession && m_LastSession->GetState () != eSSU2SessionStateTerminated)
m_LastSession->FlushData ();
void SSU2Server::HandleReceivedPackets (Packets * packets)
if (!packets) return;
if (m_IsThroughProxy)
for (size_t i = 0; i < packets->numPackets; i++)
auto& packet = (*packets)[i];
ProcessNextPacketFromProxy (packet->buf, packet->len);
for (size_t i = 0; i < packets->numPackets; i++)
auto& packet = (*packets)[i];
ProcessNextPacket (packet->buf, packet->len, packet->from);
m_PacketsPool.ReleaseMt (packets->data (), packets->numPackets);
m_PacketsArrayPool.ReleaseMt (packets);
if (m_LastSession && m_LastSession->GetState () != eSSU2SessionStateTerminated)
m_LastSession->FlushData ();
void SSU2Server::InsertToReceivedPacketsQueue (Packet * packet)
if (!packet) return;
bool empty = false;
std::lock_guard<std::mutex> l(m_ReceivedPacketsQueueMutex);
empty = m_ReceivedPacketsQueue.empty ();
m_ReceivedPacketsQueue.push_back (packet);
if (empty)
GetService ().post([this]() { HandleReceivedPacketsQueue (); });
void SSU2Server::InsertToReceivedPacketsQueue (std::list<Packet *>& packets)
if (packets.empty ()) return;
bool empty = false;
std::lock_guard<std::mutex> l(m_ReceivedPacketsQueueMutex);
empty = m_ReceivedPacketsQueue.empty ();
m_ReceivedPacketsQueue.splice (m_ReceivedPacketsQueue.end (), packets);
if (empty)
GetService ().post([this]() { HandleReceivedPacketsQueue (); });
void SSU2Server::HandleReceivedPacketsQueue ()
std::list<Packet *> receivedPackets;
std::lock_guard<std::mutex> l(m_ReceivedPacketsQueueMutex);
m_ReceivedPacketsQueue.swap (receivedPackets);
HandleReceivedPackets (std::move (receivedPackets));
bool SSU2Server::AddSession (std::shared_ptr<SSU2Session> session)
void SSU2Server::AddSession (std::shared_ptr<SSU2Session> session)
if (session)
if (m_Sessions.emplace (session->GetConnID (), session).second)
m_Sessions.emplace (session->GetConnID (), session);
if (session->GetState () != eSSU2SessionStatePeerTest)
AddSessionByRouterHash (session);
return true;
return false;
void SSU2Server::RemoveSession (uint64_t connID)
@ -721,9 +703,6 @@ namespace transport
m_LastSession->SetRemoteEndpoint (senderEndpoint);
m_LastSession->ProcessPeerTest (buf, len);
case eSSU2SessionStateHolePunch:
m_LastSession->ProcessFirstIncomingMessage (connID, buf, len); // SessionRequest
case eSSU2SessionStateClosing:
m_LastSession->ProcessData (buf, len, senderEndpoint); // we might receive termintaion block
if (m_LastSession && m_LastSession->GetState () == eSSU2SessionStateClosing)
@ -837,29 +816,6 @@ namespace transport
bool SSU2Server::CheckPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep, bool peerTest)
auto s = FindPendingOutgoingSession (ep);
if (s)
if (peerTest)
// if peer test requested add it to the list for pending session
auto onEstablished = s->GetOnEstablished ();
if (onEstablished)
s->SetOnEstablished ([s, onEstablished]()
onEstablished ();
s->SendPeerTest ();
s->SetOnEstablished ([s]() { s->SendPeerTest (); });
return true;
return false;
bool SSU2Server::CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router,
std::shared_ptr<const i2p::data::RouterInfo::Address> address, bool peerTest)
@ -879,28 +835,34 @@ namespace transport
if (isValidEndpoint)
if (i2p::transport::transports.IsInReservedRange(address->host)) return false;
if (CheckPendingOutgoingSession (boost::asio::ip::udp::endpoint (address->host, address->port), peerTest)) return false;
auto s = FindPendingOutgoingSession (boost::asio::ip::udp::endpoint (address->host, address->port));
if (s)
if (peerTest)
// if peer test requested add it to the list for pending session
auto onEstablished = s->GetOnEstablished ();
if (onEstablished)
s->SetOnEstablished ([s, onEstablished]()
onEstablished ();
s->SendPeerTest ();
s->SetOnEstablished ([s]() { s->SendPeerTest (); });
return false;
auto session = std::make_shared<SSU2Session> (*this, router, address);
if (!isValidEndpoint && router->GetProfile ()->HasLastEndpoint (address->IsV4 ()))
// router doesn't publish endpoint, but we connected before and hole punch might be alive
auto ep = router->GetProfile ()->GetLastEndpoint ();
if (IsConnectedRecently (ep, false))
if (CheckPendingOutgoingSession (ep, peerTest)) return false;
session->SetRemoteEndpoint (ep);
isValidEndpoint = true;
if (peerTest)
session->SetOnEstablished ([session]() {session->SendPeerTest (); });
if (isValidEndpoint) // we know endpoint
GetService ().post ([session]() { session->Connect (); });
else if (address->UsesIntroducer ()) // we don't know endpoint yet
if (address->UsesIntroducer ())
GetService ().post (std::bind (&SSU2Server::ConnectThroughIntroducer, this, session));
else if (isValidEndpoint) // we can't connect without endpoint
GetService ().post ([session]() { session->Connect (); });
return false;
@ -1150,7 +1112,7 @@ namespace transport
for (auto it = m_ConnectedRecently.begin (); it != m_ConnectedRecently.end (); )
if (ts > it->second + SSU2_MAX_HOLE_PUNCH_EXPIRATION)
if (ts > it->second + SSU2_HOLE_PUNCH_EXPIRATION)
it = m_ConnectedRecently.erase (it);
@ -1176,6 +1138,7 @@ namespace transport
m_PacketsPool.CleanUpMt ();
m_PacketsArrayPool.CleanUpMt ();
m_SentPacketsPool.CleanUp ();
m_IncompleteMessagesPool.CleanUp ();
m_FragmentsPool.CleanUp ();
@ -12,13 +12,11 @@
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <list>
#include <array>
#include <mutex>
#include <random>
#include "util.h"
#include "SSU2Session.h"
#include "SSU2OutOfSession.h"
#include "Socks5.h"
namespace i2p
@ -42,9 +40,8 @@ namespace transport
const int SSU2_KEEP_ALIVE_INTERVAL = 15; // in seconds
const int SSU2_KEEP_ALIVE_INTERVAL_VARIANCE = 4; // in seconds
const int SSU2_PROXY_CONNECT_RETRY_TIMEOUT = 30; // in seconds
const int SSU2_MIN_HOLE_PUNCH_EXPIRATION = 30; // in seconds
const int SSU2_MAX_HOLE_PUNCH_EXPIRATION = 160; // in seconds
const size_t SSU2_MAX_NUM_PACKETS_PER_BATCH = 64;
const int SSU2_HOLE_PUNCH_EXPIRATION = 150; // in seconds
const size_t SSU2_MAX_NUM_PACKETS_PER_BATCH = 32;
class SSU2Server: private i2p::util::RunnableServiceWithWork
@ -55,6 +52,20 @@ namespace transport
boost::asio::ip::udp::endpoint from;
struct Packets: public std::array<Packet *, SSU2_MAX_NUM_PACKETS_PER_BATCH>
size_t numPackets = 0;
bool AddPacket (Packet *p)
if (p && numPackets < size ())
data()[numPackets] = p; numPackets++;
return true;
return false;
class ReceiveService: public i2p::util::RunnableService
@ -78,14 +89,14 @@ namespace transport
bool UsesProxy () const { return m_IsThroughProxy; };
bool IsSupported (const boost::asio::ip::address& addr) const;
uint16_t GetPort (bool v4) const;
bool IsConnectedRecently (const boost::asio::ip::udp::endpoint& ep, bool max = true);
bool IsConnectedRecently (const boost::asio::ip::udp::endpoint& ep);
void AddConnectedRecently (const boost::asio::ip::udp::endpoint& ep, uint64_t ts);
std::mt19937& GetRng () { return m_Rng; }
bool IsMaxNumIntroducers (bool v4) const { return (v4 ? m_Introducers.size () : m_IntroducersV6.size ()) >= SSU2_MAX_NUM_INTRODUCERS; }
bool IsSyncClockFromPeers () const { return m_IsSyncClockFromPeers; };
void AdjustTimeOffset (int64_t offset, std::shared_ptr<const i2p::data::IdentityEx> from);
bool AddSession (std::shared_ptr<SSU2Session> session);
void AddSession (std::shared_ptr<SSU2Session> session);
void RemoveSession (uint64_t connID);
void RequestRemoveSession (uint64_t connID);
void AddSessionByRouterHash (std::shared_ptr<SSU2Session> session);
@ -133,11 +144,9 @@ namespace transport
void Receive (boost::asio::ip::udp::socket& socket);
void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred,
Packet * packet, boost::asio::ip::udp::socket& socket);
void HandleReceivedPackets (std::list<Packet *>&& packets);
void HandleReceivedPacket (Packet * packet);
void HandleReceivedPackets (Packets * packets);
void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
void InsertToReceivedPacketsQueue (Packet * packet);
void InsertToReceivedPacketsQueue (std::list<Packet *>& packets);
void HandleReceivedPacketsQueue ();
void ScheduleTermination ();
void HandleTerminationTimer (const boost::system::error_code& ecode);
@ -148,7 +157,6 @@ namespace transport
void ScheduleResend (bool more);
void HandleResendTimer (const boost::system::error_code& ecode);
bool CheckPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep, bool peerTest);
void ConnectThroughIntroducer (std::shared_ptr<SSU2Session> session);
std::vector<std::shared_ptr<SSU2Session> > FindIntroducers (int maxNumIntroducers,
bool v4, const std::unordered_set<i2p::data::IdentHash>& excluded);
@ -183,6 +191,7 @@ namespace transport
std::unordered_map<uint32_t, std::pair <std::weak_ptr<SSU2Session>, uint64_t > > m_PeerTests; // nonce->(Alice, timestamp). We are Bob
std::list<std::pair<i2p::data::IdentHash, uint32_t> > m_Introducers, m_IntroducersV6; // introducers we are connected to
i2p::util::MemoryPoolMt<Packet> m_PacketsPool;
i2p::util::MemoryPoolMt<Packets> m_PacketsArrayPool;
i2p::util::MemoryPool<SSU2SentPacket> m_SentPacketsPool;
i2p::util::MemoryPool<SSU2IncompleteMessage> m_IncompleteMessagesPool;
i2p::util::MemoryPool<SSU2IncompleteMessage::Fragment> m_FragmentsPool;
@ -195,10 +204,7 @@ namespace transport
std::shared_ptr<const i2p::data::IdentityEx> m_PendingTimeOffsetFrom;
std::mt19937 m_Rng;
std::map<boost::asio::ip::udp::endpoint, uint64_t> m_ConnectedRecently; // endpoint -> last activity time in seconds
mutable std::mutex m_ConnectedRecentlyMutex;
std::unordered_map<uint32_t, std::pair <std::weak_ptr<SSU2PeerTestSession>, uint64_t > > m_RequestedPeerTests; // nonce->(Alice, timestamp)
std::list<Packet *> m_ReceivedPacketsQueue;
mutable std::mutex m_ReceivedPacketsQueueMutex;
// proxy
bool m_IsThroughProxy;
@ -1,355 +0,0 @@
* Copyright (c) 2024, The PurpleI2P Project
* This file is part of Purple i2pd project and licensed under BSD3
* See full license text in LICENSE file at top of project tree
#include "Log.h"
#include "SSU2.h"
#include "SSU2OutOfSession.h"
namespace i2p
namespace transport
SSU2PeerTestSession::SSU2PeerTestSession (SSU2Server& server, uint64_t sourceConnID, uint64_t destConnID):
SSU2Session (server, nullptr, nullptr, false),
m_MsgNumReceived (0), m_NumResends (0),m_IsConnectedRecently (false), m_IsStatusChanged (false),
m_PeerTestResendTimer (server.GetService ())
if (!sourceConnID) sourceConnID = ~destConnID;
if (!destConnID) destConnID = ~sourceConnID;
SetSourceConnID (sourceConnID);
SetDestConnID (destConnID);
SetState (eSSU2SessionStatePeerTest);
bool SSU2PeerTestSession::ProcessPeerTest (uint8_t * buf, size_t len)
// we are Alice or Charlie, msgs 5,6,7
Header header;
memcpy (header.buf, buf, 16);
header.ll[0] ^= CreateHeaderMask (i2p::context.GetSSU2IntroKey (), buf + (len - 24));
header.ll[1] ^= CreateHeaderMask (i2p::context.GetSSU2IntroKey (), buf + (len - 12));
if (header.h.type != eSSU2PeerTest)
LogPrint (eLogWarning, "SSU2: Unexpected message type ", (int)header.h.type, " instead ", (int)eSSU2PeerTest);
return false;
if (len < 48)
LogPrint (eLogWarning, "SSU2: PeerTest message too short ", len);
return false;
uint8_t nonce[12] = {0};
uint64_t headerX[2]; // sourceConnID, token
i2p::crypto::ChaCha20 (buf + 16, 16, i2p::context.GetSSU2IntroKey (), nonce, (uint8_t *)headerX);
SetDestConnID (headerX[0]);
// decrypt and handle payload
uint8_t * payload = buf + 32;
CreateNonce (be32toh (header.h.packetNum), nonce);
uint8_t h[32];
memcpy (h, header.buf, 16);
memcpy (h + 16, &headerX, 16);
if (!i2p::crypto::AEADChaCha20Poly1305 (payload, len - 48, h, 32,
i2p::context.GetSSU2IntroKey (), nonce, payload, len - 48, false))
LogPrint (eLogWarning, "SSU2: PeerTest AEAD verification failed ");
return false;
HandlePayload (payload, len - 48);
SetIsDataReceived (false);
return true;
void SSU2PeerTestSession::HandleAddress (const uint8_t * buf, size_t len)
if (!ExtractEndpoint (buf, len, m_OurEndpoint))
LogPrint (eLogWarning, "SSU2: Can't hanlde address block from peer test message");
void SSU2PeerTestSession::HandlePeerTest (const uint8_t * buf, size_t len)
// msgs 5-7
if (len < 8) return;
uint8_t msg = buf[0];
if (msg <= m_MsgNumReceived)
LogPrint (eLogDebug, "SSU2: PeerTest msg num ", msg, " received after ", m_MsgNumReceived, ". Ignored");
size_t offset = 3; // points to signed data after msg + code + flag
uint32_t nonce = bufbe32toh (buf + offset + 1); // 1 - ver
switch (msg) // msg
case 5: // Alice from Charlie 1
if (htobe64 (((uint64_t)nonce << 32) | nonce) == GetSourceConnID ())
m_PeerTestResendTimer.cancel (); // calcel delayed msg 6 if any
m_IsConnectedRecently = GetServer ().IsConnectedRecently (GetRemoteEndpoint ());
if (GetAddress ())
if (!m_IsConnectedRecently)
SetRouterStatus (eRouterStatusOK);
else if (m_IsStatusChanged && GetRouterStatus () == eRouterStatusFirewalled)
SetRouterStatus (eRouterStatusUnknown);
SendPeerTest (6, buf + offset, len - offset);
LogPrint (eLogWarning, "SSU2: Peer test 5 nonce mismatch ", nonce, " connID=", GetSourceConnID ());
case 6: // Charlie from Alice
m_PeerTestResendTimer.cancel (); // no more msg 5 resends
if (GetAddress ())
SendPeerTest (7, buf + offset, len - offset);
LogPrint (eLogWarning, "SSU2: Unknown address for peer test 6");
GetServer ().AddConnectedRecently (GetRemoteEndpoint (), i2p::util::GetSecondsSinceEpoch ());
GetServer ().RequestRemoveSession (GetConnID ());
case 7: // Alice from Charlie 2
m_PeerTestResendTimer.cancel (); // no more msg 6 resends
if (m_MsgNumReceived < 5 && m_OurEndpoint.port ()) // msg 5 was not received
if (m_OurEndpoint.address ().is_v4 ()) // ipv4
if (i2p::context.GetStatus () == eRouterStatusFirewalled)
if (m_OurEndpoint.port () != GetServer ().GetPort (true))
i2p::context.SetError (eRouterErrorSymmetricNAT);
else if (i2p::context.GetError () == eRouterErrorSymmetricNAT)
i2p::context.SetError (eRouterErrorNone);
if (i2p::context.GetStatusV6 () == eRouterStatusFirewalled)
if (m_OurEndpoint.port () != GetServer ().GetPort (false))
i2p::context.SetErrorV6 (eRouterErrorSymmetricNAT);
else if (i2p::context.GetErrorV6 () == eRouterErrorSymmetricNAT)
i2p::context.SetErrorV6 (eRouterErrorNone);
GetServer ().AddConnectedRecently (GetRemoteEndpoint (), i2p::util::GetSecondsSinceEpoch ());
GetServer ().RequestRemoveSession (GetConnID ());
LogPrint (eLogWarning, "SSU2: PeerTest unexpected msg num ", msg);
m_MsgNumReceived = msg;
void SSU2PeerTestSession::SendPeerTest (uint8_t msg)
auto addr = GetAddress ();
if (!addr) return;
Header header;
uint8_t h[32], payload[SSU2_MAX_PACKET_SIZE];
// fill packet
header.h.connID = GetDestConnID (); // dest id
RAND_bytes (header.buf + 8, 4); // random packet num
header.h.type = eSSU2PeerTest;
header.h.flags[0] = 2; // ver
header.h.flags[1] = (uint8_t)i2p::context.GetNetID (); // netID
header.h.flags[2] = 0; // flag
memcpy (h, header.buf, 16);
htobuf64 (h + 16, GetSourceConnID ()); // source id
// payload
payload[0] = eSSU2BlkDateTime;
htobe16buf (payload + 1, 4);
htobe32buf (payload + 3, (i2p::util::GetMillisecondsSinceEpoch () + 500)/1000);
size_t payloadSize = 7;
if (msg == 6 || msg == 7)
payloadSize += CreateAddressBlock (payload + payloadSize, GetMaxPayloadSize () - payloadSize, GetRemoteEndpoint ());
payloadSize += CreatePeerTestBlock (payload + payloadSize, GetMaxPayloadSize () - payloadSize,
msg, eSSU2PeerTestCodeAccept, nullptr, m_SignedData.data (), m_SignedData.size ());
payloadSize += CreatePaddingBlock (payload + payloadSize, GetMaxPayloadSize () - payloadSize);
// encrypt
uint8_t n[12];
CreateNonce (be32toh (header.h.packetNum), n);
i2p::crypto::AEADChaCha20Poly1305 (payload, payloadSize, h, 32, addr->i, n, payload, payloadSize + 16, true);
payloadSize += 16;
header.ll[0] ^= CreateHeaderMask (addr->i, payload + (payloadSize - 24));
header.ll[1] ^= CreateHeaderMask (addr->i, payload + (payloadSize - 12));
memset (n, 0, 12);
i2p::crypto::ChaCha20 (h + 16, 16, addr->i, n, h + 16);
// send
GetServer ().Send (header.buf, 16, h + 16, 16, payload, payloadSize, GetRemoteEndpoint ());
void SSU2PeerTestSession::SendPeerTest (uint8_t msg, const uint8_t * signedData, size_t signedDataLen, bool delayed)
#if __cplusplus >= 202002L // C++20
m_SignedData.assign (signedData, signedData + signedDataLen);
m_SignedData.resize (signedDataLen);
memcpy (m_SignedData.data (), signedData, signedDataLen);
if (!delayed)
SendPeerTest (msg);
// schedule resend for msgs 5 or 6
if (msg == 5 || msg == 6)
ScheduleResend (msg);
void SSU2PeerTestSession::SendPeerTest (uint8_t msg, const uint8_t * signedData, size_t signedDataLen,
std::shared_ptr<const i2p::data::RouterInfo::Address> addr, bool delayed)
if (!addr) return;
SetAddress (addr);
SendPeerTest (msg, signedData, signedDataLen, delayed);
void SSU2PeerTestSession::Connect ()
LogPrint (eLogError, "SSU2: Can't connect peer test session");
bool SSU2PeerTestSession::ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len)
LogPrint (eLogError, "SSU2: Can't handle incoming message in peer test session");
return false;
void SSU2PeerTestSession::ScheduleResend (uint8_t msg)
m_PeerTestResendTimer.expires_from_now (boost::posix_time::milliseconds(
std::weak_ptr<SSU2PeerTestSession> s(std::static_pointer_cast<SSU2PeerTestSession>(shared_from_this ()));
m_PeerTestResendTimer.async_wait ([s, msg](const boost::system::error_code& ecode)
if (ecode != boost::asio::error::operation_aborted)
auto s1 = s.lock ();
if (s1)
if (msg > s1->m_MsgNumReceived)
s1->SendPeerTest (msg);
s1->ScheduleResend (msg);
SSU2HolePunchSession::SSU2HolePunchSession (SSU2Server& server, uint32_t nonce,
const boost::asio::ip::udp::endpoint& remoteEndpoint,
std::shared_ptr<const i2p::data::RouterInfo::Address> addr):
SSU2Session (server), // we create full incoming session
m_NumResends (0), m_HolePunchResendTimer (server.GetService ())
// we are Charlie
uint64_t destConnID = htobe64 (((uint64_t)nonce << 32) | nonce); // dest id
uint32_t sourceConnID = ~destConnID;
SetSourceConnID (sourceConnID);
SetDestConnID (destConnID);
SetState (eSSU2SessionStateHolePunch);
SetRemoteEndpoint (remoteEndpoint);
SetAddress (addr);
void SSU2HolePunchSession::SendHolePunch ()
auto addr = GetAddress ();
if (!addr) return;
auto& ep = GetRemoteEndpoint ();
LogPrint (eLogDebug, "SSU2: Sending HolePunch to ", ep);
Header header;
uint8_t h[32], payload[SSU2_MAX_PACKET_SIZE];
// fill packet
header.h.connID = GetDestConnID (); // dest id
RAND_bytes (header.buf + 8, 4); // random packet num
header.h.type = eSSU2HolePunch;
header.h.flags[0] = 2; // ver
header.h.flags[1] = (uint8_t)i2p::context.GetNetID (); // netID
header.h.flags[2] = 0; // flag
memcpy (h, header.buf, 16);
htobuf64 (h + 16, GetSourceConnID ()); // source id
RAND_bytes (h + 24, 8); // header token, to be ignored by Alice
// payload
payload[0] = eSSU2BlkDateTime;
htobe16buf (payload + 1, 4);
htobe32buf (payload + 3, (i2p::util::GetMillisecondsSinceEpoch () + 500)/1000);
size_t payloadSize = 7;
payloadSize += CreateAddressBlock (payload + payloadSize, GetMaxPayloadSize () - payloadSize, ep);
// relay response block
if (payloadSize + m_RelayResponseBlock.size () < GetMaxPayloadSize ())
memcpy (payload + payloadSize, m_RelayResponseBlock.data (), m_RelayResponseBlock.size ());
payloadSize += m_RelayResponseBlock.size ();
payloadSize += CreatePaddingBlock (payload + payloadSize, GetMaxPayloadSize () - payloadSize);
// encrypt
uint8_t n[12];
CreateNonce (be32toh (header.h.packetNum), n);
i2p::crypto::AEADChaCha20Poly1305 (payload, payloadSize, h, 32, addr->i, n, payload, payloadSize + 16, true);
payloadSize += 16;
header.ll[0] ^= CreateHeaderMask (addr->i, payload + (payloadSize - 24));
header.ll[1] ^= CreateHeaderMask (addr->i, payload + (payloadSize - 12));
memset (n, 0, 12);
i2p::crypto::ChaCha20 (h + 16, 16, addr->i, n, h + 16);
// send
GetServer ().Send (header.buf, 16, h + 16, 16, payload, payloadSize, ep);
void SSU2HolePunchSession::SendHolePunch (const uint8_t * relayResponseBlock, size_t relayResponseBlockLen)
#if __cplusplus >= 202002L // C++20
m_RelayResponseBlock.assign (relayResponseBlock, relayResponseBlock + relayResponseBlockLen);
m_RelayResponseBlock.resize (relayResponseBlockLen);
memcpy (m_RelayResponseBlock.data (), relayResponseBlock, relayResponseBlockLen);
SendHolePunch ();
ScheduleResend ();
void SSU2HolePunchSession::ScheduleResend ()
m_HolePunchResendTimer.expires_from_now (boost::posix_time::milliseconds(
std::weak_ptr<SSU2HolePunchSession> s(std::static_pointer_cast<SSU2HolePunchSession>(shared_from_this ()));
m_HolePunchResendTimer.async_wait ([s](const boost::system::error_code& ecode)
if (ecode != boost::asio::error::operation_aborted)
auto s1 = s.lock ();
if (s1 && s1->GetState () == eSSU2SessionStateHolePunch)
s1->SendHolePunch ();
s1->ScheduleResend ();
bool SSU2HolePunchSession::ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len)
m_HolePunchResendTimer.cancel ();
return SSU2Session::ProcessFirstIncomingMessage (connID, buf, len);
@ -1,86 +0,0 @@
* Copyright (c) 2024, The PurpleI2P Project
* This file is part of Purple i2pd project and licensed under BSD3
* See full license text in LICENSE file at top of project tree
#include <vector>
#include "SSU2Session.h"
namespace i2p
namespace transport
const int SSU2_PEER_TEST_RESEND_INTERVAL = 3000; // in milliseconds
const int SSU2_PEER_TEST_RESEND_INTERVAL_VARIANCE = 2000; // in milliseconds
class SSU2PeerTestSession: public SSU2Session // for PeerTest msgs 5,6,7
SSU2PeerTestSession (SSU2Server& server, uint64_t sourceConnID, uint64_t destConnID);
uint8_t GetMsgNumReceived () const { return m_MsgNumReceived; }
bool IsConnectedRecently () const { return m_IsConnectedRecently; }
void SetStatusChanged () { m_IsStatusChanged = true; }
void SendPeerTest (uint8_t msg, const uint8_t * signedData, size_t signedDataLen,
std::shared_ptr<const i2p::data::RouterInfo::Address> addr, bool delayed = false);
bool ProcessPeerTest (uint8_t * buf, size_t len) override;
void Connect () override; // outgoing
bool ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len) override; // incoming
void SendPeerTest (uint8_t msg, const uint8_t * signedData, size_t signedDataLen, bool delayed = false); // PeerTest message
void SendPeerTest (uint8_t msg); // send or resend m_SignedData
void HandlePeerTest (const uint8_t * buf, size_t len) override;
void HandleAddress (const uint8_t * buf, size_t len) override;
void ScheduleResend (uint8_t msg);
uint8_t m_MsgNumReceived, m_NumResends;
bool m_IsConnectedRecently, m_IsStatusChanged;
std::vector<uint8_t> m_SignedData; // for resends
boost::asio::deadline_timer m_PeerTestResendTimer;
boost::asio::ip::udp::endpoint m_OurEndpoint; // as seen by peer
const int SSU2_HOLE_PUNCH_RESEND_INTERVAL = 1000; // in milliseconds
const int SSU2_HOLE_PUNCH_RESEND_INTERVAL_VARIANCE = 500; // in milliseconds
class SSU2HolePunchSession: public SSU2Session // Charlie
SSU2HolePunchSession (SSU2Server& server, uint32_t nonce, const boost::asio::ip::udp::endpoint& remoteEndpoint,
std::shared_ptr<const i2p::data::RouterInfo::Address> addr);
void SendHolePunch (const uint8_t * relayResponseBlock, size_t relayResponseBlockLen);
bool ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len) override; // SessionRequest
void SendHolePunch ();
void ScheduleResend ();
int m_NumResends;
std::vector<uint8_t> m_RelayResponseBlock;
boost::asio::deadline_timer m_HolePunchResendTimer;
@ -13,12 +13,17 @@
#include "Gzip.h"
#include "NetDb.hpp"
#include "SSU2.h"
#include "SSU2Session.h"
namespace i2p
namespace transport
static inline void CreateNonce (uint64_t seqn, uint8_t * nonce)
memset (nonce, 0, 4);
htole64buf (nonce + 4, seqn);
void SSU2IncompleteMessage::AttachNextFragment (const uint8_t * fragment, size_t fragmentSize)
if (msg->len + fragmentSize > msg->maxLen)
@ -83,7 +88,7 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo::Address> addr, bool noise):
TransportSession (in_RemoteRouter, SSU2_CONNECT_TIMEOUT),
m_Server (server), m_Address (addr), m_RemoteTransports (0), m_RemotePeerTestTransports (0),
m_RemoteVersion (0), m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown),
m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown),
m_SendPacketNum (0), m_ReceivePacketNum (0), m_LastDatetimeSentPacketNum (0),
m_IsDataReceived (false), m_RTT (SSU2_UNKNOWN_RTT),
@ -103,7 +108,6 @@ namespace transport
InitNoiseXKState1 (*m_NoiseState, m_Address->s);
m_RemoteEndpoint = boost::asio::ip::udp::endpoint (m_Address->host, m_Address->port);
m_RemoteTransports = in_RemoteRouter->GetCompatibleTransports (false);
m_RemoteVersion = in_RemoteRouter->GetVersion ();
if (in_RemoteRouter->IsSSU2PeerTesting (true)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V4;
if (in_RemoteRouter->IsSSU2PeerTesting (false)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V6;
RAND_bytes ((uint8_t *)&m_DestConnID, 8);
@ -227,13 +231,6 @@ namespace transport
if (m_Server.AddPendingOutgoingSession (shared_from_this ()))
m_Server.RemoveSession (GetConnID ());
// update endpoint in profile because we know it now
auto identity = GetRemoteIdentity ();
if (identity)
auto profile = i2p::data::GetRouterProfile (identity->GetIdentHash ());
if (profile) profile->SetLastEndpoint (m_RemoteEndpoint);
// connect
LogPrint (eLogDebug, "SSU2: Connecting after introduction to ", GetIdentHashBase64());
Connect ();
@ -1177,8 +1174,6 @@ namespace transport
" and actual endpoint ", m_RemoteEndpoint.address (), " from ", i2p::data::GetIdentHashAbbreviation (ri->GetIdentHash ()));
return false;
if (!m_Address->published)
ri->GetProfile ()->SetLastEndpoint (m_RemoteEndpoint);
SetRemoteIdentity (ri->GetRouterIdentity ());
AdjustMaxPayloadSize ();
m_Server.AddSessionByRouterHash (shared_from_this ()); // we know remote router now
@ -1186,7 +1181,6 @@ namespace transport
m_RemotePeerTestTransports = 0;
if (ri->IsSSU2PeerTesting (true)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V4;
if (ri->IsSSU2PeerTesting (false)) m_RemotePeerTestTransports |= i2p::data::RouterInfo::eSSU2V6;
m_RemoteVersion = ri->GetVersion ();
// handle other blocks
HandlePayload (decryptedPayload.data () + riSize + 3, decryptedPayload.size () - riSize - 3);
@ -1363,6 +1357,46 @@ namespace transport
return true;
void SSU2Session::SendHolePunch (uint32_t nonce, const boost::asio::ip::udp::endpoint& ep,
const uint8_t * introKey, uint64_t token)
// we are Charlie
LogPrint (eLogDebug, "SSU2: Sending HolePunch to ", ep);
Header header;
uint8_t h[32], payload[SSU2_MAX_PACKET_SIZE];
// fill packet
header.h.connID = htobe64 (((uint64_t)nonce << 32) | nonce); // dest id
RAND_bytes (header.buf + 8, 4); // random packet num
header.h.type = eSSU2HolePunch;
header.h.flags[0] = 2; // ver
header.h.flags[1] = (uint8_t)i2p::context.GetNetID (); // netID
header.h.flags[2] = 0; // flag
memcpy (h, header.buf, 16);
uint64_t c = ~header.h.connID;
memcpy (h + 16, &c, 8); // source id
RAND_bytes (h + 24, 8); // token
// payload
payload[0] = eSSU2BlkDateTime;
htobe16buf (payload + 1, 4);
htobe32buf (payload + 3, (i2p::util::GetMillisecondsSinceEpoch () + 500)/1000);
size_t payloadSize = 7;
payloadSize += CreateAddressBlock (payload + payloadSize, m_MaxPayloadSize - payloadSize, ep);
payloadSize += CreateRelayResponseBlock (payload + payloadSize, m_MaxPayloadSize - payloadSize,
eSSU2RelayResponseCodeAccept, nonce, token, ep.address ().is_v4 ());
payloadSize += CreatePaddingBlock (payload + payloadSize, m_MaxPayloadSize - payloadSize);
// encrypt
uint8_t n[12];
CreateNonce (be32toh (header.h.packetNum), n);
i2p::crypto::AEADChaCha20Poly1305 (payload, payloadSize, h, 32, introKey, n, payload, payloadSize + 16, true);
payloadSize += 16;
header.ll[0] ^= CreateHeaderMask (introKey, payload + (payloadSize - 24));
header.ll[1] ^= CreateHeaderMask (introKey, payload + (payloadSize - 12));
memset (n, 0, 12);
i2p::crypto::ChaCha20 (h + 16, 16, introKey, n, h + 16);
// send
m_Server.Send (header.buf, 16, h + 16, 16, payload, payloadSize, ep);
bool SSU2Session::ProcessHolePunch (uint8_t * buf, size_t len)
// we are Alice
@ -1931,9 +1965,9 @@ namespace transport
auto mts = i2p::util::GetMillisecondsSinceEpoch ();
uint32_t nonce = bufbe32toh (buf + 1);
if (session->m_RelaySessions.emplace (nonce, std::make_pair (shared_from_this (), mts/1000)).second)
session->m_RelaySessions.emplace (bufbe32toh (buf + 1), // nonce
std::make_pair (shared_from_this (), mts/1000) );
// send relay intro to Charlie
auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ()); // Alice's RI
if (r && (r->IsUnreachable () || !i2p::data::netdb.PopulateRouterInfoBuffer (r))) r = nullptr;
@ -1951,16 +1985,14 @@ namespace transport
// Charlie always responds with RelayResponse
session->m_SentPackets.emplace (packetNum, packet);
LogPrint (eLogInfo, "SSU2: Relay request nonce ", nonce, " already exists. Ignore");
void SSU2Session::HandleRelayIntro (const uint8_t * buf, size_t len, int attempts)
// we are Charlie
auto mts = i2p::util::GetMillisecondsSinceEpoch ();
SSU2RelayResponseCode code = eSSU2RelayResponseCodeAccept;
boost::asio::ip::udp::endpoint ep;
std::shared_ptr<const i2p::data::RouterInfo::Address> addr;
uint64_t token = 0;
bool isV4 = false;
auto r = i2p::data::netdb.FindRouter (buf + 1); // Alice
if (r)
@ -1973,19 +2005,21 @@ namespace transport
s.Insert (buf + 47, asz); // Alice Port, Alice IP
if (s.Verify (r->GetIdentity (), buf + 47 + asz))
// obtain and check endpoint and address for HolePunch
// send HolePunch
boost::asio::ip::udp::endpoint ep;
if (ExtractEndpoint (buf + 47, asz, ep))
std::shared_ptr<const i2p::data::RouterInfo::Address> addr;
if (!ep.address ().is_unspecified () && ep.port ())
addr = ep.address ().is_v6 () ? r->GetSSU2V6Address () : r->GetSSU2V4Address ();
if (addr)
if (m_Server.IsSupported (ep.address ()))
addr = ep.address ().is_v6 () ? r->GetSSU2V6Address () : r->GetSSU2V4Address ();
if (!addr)
LogPrint (eLogWarning, "SSU2: RelayIntro address for endpoint not found");
code = eSSU2RelayResponseCodeCharlieAliceIsUnknown;
token = m_Server.GetIncomingToken (ep);
isV4 = ep.address ().is_v4 ();
SendHolePunch (bufbe32toh (buf + 33), ep, addr->i, token);
m_Server.AddConnectedRecently (ep, mts/1000);
@ -1995,7 +2029,7 @@ namespace transport
LogPrint (eLogWarning, "SSU2: RelayIntro invalid endpoint");
LogPrint (eLogWarning, "SSU2: RelayIntro unknown address");
code = eSSU2RelayResponseCodeCharlieAliceIsUnknown;
@ -2031,29 +2065,14 @@ namespace transport
// send relay response to Bob
auto packet = m_Server.GetSentPacketsPool ().AcquireShared ();
uint32_t nonce = bufbe32toh (buf + 33);
packet->payloadSize = CreateRelayResponseBlock (packet->payload, m_MaxPayloadSize,
code, nonce, m_Server.GetIncomingToken (ep), ep.address ().is_v4 ());
if (code == eSSU2RelayResponseCodeAccept && addr)
// send HolePunch
auto holePunchSession = std::make_shared<SSU2HolePunchSession>(m_Server, nonce, ep, addr);
if (m_Server.AddSession (holePunchSession))
holePunchSession->SendHolePunch (packet->payload, packet->payloadSize); // relay response block
LogPrint (eLogInfo, "SSU2: Relay intro nonce ", nonce, " already exists. Ignore");
code, bufbe32toh (buf + 33), token, isV4);
packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize);
uint32_t packetNum = SendData (packet->payload, packet->payloadSize);
// sometimes Bob doesn't ack this RelayResponse in older versions
packet->sendTime = i2p::util::GetMillisecondsSinceEpoch ();
m_SentPackets.emplace (packetNum, packet);
/*uint32_t packetNum = */SendData (packet->payload, packet->payloadSize);
// sometimes Bob doesn't ack this RelayResponse
// TODO: uncomment line below once the problem is resolved
//packet->sendTime = mts;
//m_SentPackets.emplace (packetNum, packet);
void SSU2Session::HandleRelayResponse (const uint8_t * buf, size_t len)
@ -2088,13 +2107,11 @@ namespace transport
memcpy (payload + 3, buf, len); // forward to Alice as is
packet->payloadSize = len + 3;
packet->payloadSize += CreatePaddingBlock (payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize);
uint32_t packetNum = it->second.first->SendData (packet->payload, packet->payloadSize);
// sometimes Alice doesn't ack this RelayResponse in older versions
packet->sendTime = i2p::util::GetMillisecondsSinceEpoch ();
it->second.first->m_SentPackets.emplace (packetNum, packet);
/*uint32_t packetNum = */it->second.first->SendData (packet->payload, packet->payloadSize);
// sometimes Alice doesn't ack this RelayResponse
// TODO: uncomment line below once the problem is resolved
//packet->sendTime = i2p::util::GetMillisecondsSinceEpoch ();
//it->second.first->m_SentPackets.emplace (packetNum, packet);
@ -2162,8 +2179,7 @@ namespace transport
GetRemoteIdentity ()->GetIdentHash ());
if (session) // session with Charlie
if (m_Server.AddPeerTest (nonce, shared_from_this (), ts/1000))
m_Server.AddPeerTest (nonce, shared_from_this (), ts/1000);
auto packet = m_Server.GetSentPacketsPool ().AcquireShared ();
// Alice's RouterInfo
auto r = i2p::data::netdb.FindRouter (GetRemoteIdentity ()->GetIdentHash ());
@ -2187,9 +2203,6 @@ namespace transport
packet->sendTime = ts;
session->m_SentPackets.emplace (packetNum, packet);
LogPrint (eLogInfo, "SSU2: Peer test 1 nonce ", nonce, " already exists. Ignored");
// Charlie not found, send error back to Alice
@ -2330,16 +2343,16 @@ namespace transport
session->SetRemoteIdentity (r->GetIdentity ());
auto addr = r->GetSSU2Address (m_Address->IsV4 ());
if (addr && addr->IsPeerTesting ())
if (addr)
if (session->GetMsgNumReceived () >= 5)
// msg 5 already received and we know remote endpoint
// msg 5 already received
if (session->GetMsgNumReceived () == 5)
if (!session->IsConnectedRecently ())
SetRouterStatus (eRouterStatusOK);
// send msg 6 immeditely
// send msg 6
session->SendPeerTest (6, buf + offset, len - offset, addr);
@ -2350,12 +2363,6 @@ namespace transport
session->m_Address = addr;
if (GetTestingState ())
// schedule msg 6 with delay
if (!addr->host.is_unspecified () && addr->port)
session->SetRemoteEndpoint (boost::asio::ip::udp::endpoint (addr->host, addr->port));
session->SendPeerTest (6, buf + offset, len - offset, addr, true);
SetTestingState (false);
if (GetRouterStatus () != eRouterStatusFirewalled && addr->IsPeerTesting ())
@ -2373,7 +2380,7 @@ namespace transport
LogPrint (eLogWarning, "SSU2: Peer test 4 address not found or not supported");
LogPrint (eLogWarning, "SSU2: Peer test 4 address not found");
session->Done ();
@ -3055,7 +3062,7 @@ namespace transport
if (ts > it->second.second + SSU2_RELAY_NONCE_EXPIRATION_TIMEOUT)
LogPrint (eLogInfo, "SSU2: Relay nonce ", it->first, " was not responded in ", SSU2_RELAY_NONCE_EXPIRATION_TIMEOUT, " seconds, deleted");
LogPrint (eLogWarning, "SSU2: Relay nonce ", it->first, " was not responded in ", SSU2_RELAY_NONCE_EXPIRATION_TIMEOUT, " seconds, deleted");
it = m_RelaySessions.erase (it);
@ -3079,5 +3086,216 @@ namespace transport
else if (!sent && !m_SentPackets.empty ()) // if only acks received, nothing sent and we still have something to resend
Resend (i2p::util::GetMillisecondsSinceEpoch ()); // than right time to resend
SSU2PeerTestSession::SSU2PeerTestSession (SSU2Server& server, uint64_t sourceConnID, uint64_t destConnID):
SSU2Session (server, nullptr, nullptr, false),
m_MsgNumReceived (0), m_NumResends (0),m_IsConnectedRecently (false), m_IsStatusChanged (false),
m_PeerTestResendTimer (server.GetService ())
if (!sourceConnID) sourceConnID = ~destConnID;
if (!destConnID) destConnID = ~sourceConnID;
SetSourceConnID (sourceConnID);
SetDestConnID (destConnID);
SetState (eSSU2SessionStatePeerTest);
bool SSU2PeerTestSession::ProcessPeerTest (uint8_t * buf, size_t len)
// we are Alice or Charlie, msgs 5,6,7
Header header;
memcpy (header.buf, buf, 16);
header.ll[0] ^= CreateHeaderMask (i2p::context.GetSSU2IntroKey (), buf + (len - 24));
header.ll[1] ^= CreateHeaderMask (i2p::context.GetSSU2IntroKey (), buf + (len - 12));
if (header.h.type != eSSU2PeerTest)
LogPrint (eLogWarning, "SSU2: Unexpected message type ", (int)header.h.type, " instead ", (int)eSSU2PeerTest);
return false;
if (len < 48)
LogPrint (eLogWarning, "SSU2: PeerTest message too short ", len);
return false;
uint8_t nonce[12] = {0};
uint64_t headerX[2]; // sourceConnID, token
i2p::crypto::ChaCha20 (buf + 16, 16, i2p::context.GetSSU2IntroKey (), nonce, (uint8_t *)headerX);
SetDestConnID (headerX[0]);
// decrypt and handle payload
uint8_t * payload = buf + 32;
CreateNonce (be32toh (header.h.packetNum), nonce);
uint8_t h[32];
memcpy (h, header.buf, 16);
memcpy (h + 16, &headerX, 16);
if (!i2p::crypto::AEADChaCha20Poly1305 (payload, len - 48, h, 32,
i2p::context.GetSSU2IntroKey (), nonce, payload, len - 48, false))
LogPrint (eLogWarning, "SSU2: PeerTest AEAD verification failed ");
return false;
HandlePayload (payload, len - 48);
SetIsDataReceived (false);
return true;
void SSU2PeerTestSession::HandlePeerTest (const uint8_t * buf, size_t len)
// msgs 5-7
if (len < 8) return;
uint8_t msg = buf[0];
if (msg <= m_MsgNumReceived)
LogPrint (eLogDebug, "SSU2: PeerTest msg num ", msg, " received after ", m_MsgNumReceived, ". Ignored");
size_t offset = 3; // points to signed data after msg + code + flag
uint32_t nonce = bufbe32toh (buf + offset + 1); // 1 - ver
switch (msg) // msg
case 5: // Alice from Charlie 1
if (htobe64 (((uint64_t)nonce << 32) | nonce) == GetSourceConnID ())
m_IsConnectedRecently = GetServer ().IsConnectedRecently (GetRemoteEndpoint ());
if (GetAddress ())
if (!m_IsConnectedRecently)
SetRouterStatus (eRouterStatusOK);
else if (m_IsStatusChanged && GetRouterStatus () == eRouterStatusFirewalled)
SetRouterStatus (eRouterStatusUnknown);
SendPeerTest (6, buf + offset, len - offset);
LogPrint (eLogWarning, "SSU2: Peer test 5 nonce mismatch ", nonce, " connID=", GetSourceConnID ());
case 6: // Charlie from Alice
m_PeerTestResendTimer.cancel (); // no more msg 5 resends
if (GetAddress ())
SendPeerTest (7, buf + offset, len - offset);
LogPrint (eLogWarning, "SSU2: Unknown address for peer test 6");
GetServer ().AddConnectedRecently (GetRemoteEndpoint (), i2p::util::GetSecondsSinceEpoch ());
GetServer ().RequestRemoveSession (GetConnID ());
case 7: // Alice from Charlie 2
m_PeerTestResendTimer.cancel (); // no more msg 6 resends
auto addr = GetAddress ();
if (addr && addr->IsV6 ())
i2p::context.SetStatusV6 (eRouterStatusOK); // set status OK for ipv6 even if from SSU2
GetServer ().AddConnectedRecently (GetRemoteEndpoint (), i2p::util::GetSecondsSinceEpoch ());
GetServer ().RequestRemoveSession (GetConnID ());
LogPrint (eLogWarning, "SSU2: PeerTest unexpected msg num ", msg);
m_MsgNumReceived = msg;
void SSU2PeerTestSession::SendPeerTest (uint8_t msg)
auto addr = GetAddress ();
if (!addr) return;
Header header;
uint8_t h[32], payload[SSU2_MAX_PACKET_SIZE];
// fill packet
header.h.connID = GetDestConnID (); // dest id
RAND_bytes (header.buf + 8, 4); // random packet num
header.h.type = eSSU2PeerTest;
header.h.flags[0] = 2; // ver
header.h.flags[1] = (uint8_t)i2p::context.GetNetID (); // netID
header.h.flags[2] = 0; // flag
memcpy (h, header.buf, 16);
htobuf64 (h + 16, GetSourceConnID ()); // source id
// payload
payload[0] = eSSU2BlkDateTime;
htobe16buf (payload + 1, 4);
htobe32buf (payload + 3, (i2p::util::GetMillisecondsSinceEpoch () + 500)/1000);
size_t payloadSize = 7;
if (msg == 6 || msg == 7)
payloadSize += CreateAddressBlock (payload + payloadSize, GetMaxPayloadSize () - payloadSize, GetRemoteEndpoint ());
payloadSize += CreatePeerTestBlock (payload + payloadSize, GetMaxPayloadSize () - payloadSize,
msg, eSSU2PeerTestCodeAccept, nullptr, m_SignedData.data (), m_SignedData.size ());
payloadSize += CreatePaddingBlock (payload + payloadSize, GetMaxPayloadSize () - payloadSize);
// encrypt
uint8_t n[12];
CreateNonce (be32toh (header.h.packetNum), n);
i2p::crypto::AEADChaCha20Poly1305 (payload, payloadSize, h, 32, addr->i, n, payload, payloadSize + 16, true);
payloadSize += 16;
header.ll[0] ^= CreateHeaderMask (addr->i, payload + (payloadSize - 24));
header.ll[1] ^= CreateHeaderMask (addr->i, payload + (payloadSize - 12));
memset (n, 0, 12);
i2p::crypto::ChaCha20 (h + 16, 16, addr->i, n, h + 16);
// send
GetServer ().Send (header.buf, 16, h + 16, 16, payload, payloadSize, GetRemoteEndpoint ());
void SSU2PeerTestSession::SendPeerTest (uint8_t msg, const uint8_t * signedData, size_t signedDataLen)
#if __cplusplus >= 202002L // C++20
m_SignedData.assign (signedData, signedData + signedDataLen);
m_SignedData.resize (signedDataLen);
memcpy (m_SignedData.data (), signedData, signedDataLen);
SendPeerTest (msg);
// schedule resend for msgs 5 or 6
if (msg == 5 || msg == 6)
ScheduleResend ();
void SSU2PeerTestSession::SendPeerTest (uint8_t msg, const uint8_t * signedData, size_t signedDataLen,
std::shared_ptr<const i2p::data::RouterInfo::Address> addr)
if (!addr) return;
SetAddress (addr);
SendPeerTest (msg, signedData, signedDataLen);
void SSU2PeerTestSession::Connect ()
LogPrint (eLogError, "SSU2: Can't connect peer test session");
bool SSU2PeerTestSession::ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len)
LogPrint (eLogError, "SSU2: Can't handle incoming message in peer test session");
return false;
void SSU2PeerTestSession::ScheduleResend ()
m_PeerTestResendTimer.expires_from_now (boost::posix_time::milliseconds(
std::weak_ptr<SSU2PeerTestSession> s(std::static_pointer_cast<SSU2PeerTestSession>(shared_from_this ()));
m_PeerTestResendTimer.async_wait ([s](const boost::system::error_code& ecode)
if (ecode != boost::asio::error::operation_aborted)
auto s1 = s.lock ();
if (s1)
int msg = 0;
if (s1->m_MsgNumReceived < 6)
msg = (s1->m_MsgNumReceived == 5) ? 6 : 5;
if (msg) // 5 or 6
s1->SendPeerTest (msg);
s1->ScheduleResend ();
@ -15,7 +15,6 @@
#include <set>
#include <list>
#include <boost/asio.hpp>
#include "version.h"
#include "Crypto.h"
#include "RouterInfo.h"
#include "RouterContext.h"
@ -56,7 +55,6 @@ namespace transport
const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send
const uint8_t SSU2_MAX_NUM_FRAGMENTS = 64;
// flags
@ -114,7 +112,6 @@ namespace transport
@ -299,8 +296,6 @@ namespace transport
size_t CreatePaddingBlock (uint8_t * buf, size_t len, size_t minSize = 0);
size_t CreatePeerTestBlock (uint8_t * buf, size_t len, uint8_t msg, SSU2PeerTestCode code, const uint8_t * routerHash, const uint8_t * signedData, size_t signedDataLen);
bool ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep);
void Terminate ();
@ -325,6 +320,7 @@ namespace transport
uint32_t SendData (const uint8_t * buf, size_t len, uint8_t flags = 0); // returns packet num
void SendQuickAck ();
void SendTermination ();
void SendHolePunch (uint32_t nonce, const boost::asio::ip::udp::endpoint& ep, const uint8_t * introKey, uint64_t token);
void SendPathResponse (const uint8_t * data, size_t len);
void SendPathChallenge ();
@ -332,7 +328,8 @@ namespace transport
void HandleRouterInfo (const uint8_t * buf, size_t len);
void HandleAck (const uint8_t * buf, size_t len);
void HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum, uint64_t ts);
virtual void HandleAddress (const uint8_t * buf, size_t len);
void HandleAddress (const uint8_t * buf, size_t len);
bool ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep);
size_t CreateEndpoint (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& ep);
std::shared_ptr<const i2p::data::RouterInfo::Address> FindLocalAddress () const;
void AdjustMaxPayloadSize ();
@ -356,7 +353,6 @@ namespace transport
size_t CreateFollowOnFragmentBlock (uint8_t * buf, size_t len, std::shared_ptr<I2NPMessage> msg, uint8_t& fragmentNum, uint32_t msgID);
size_t CreateRelayIntroBlock (uint8_t * buf, size_t len, const uint8_t * introData, size_t introDataLen);
size_t CreateRelayResponseBlock (uint8_t * buf, size_t len, SSU2RelayResponseCode code, uint32_t nonce, uint64_t token, bool v4);
size_t CreatePeerTestBlock (uint8_t * buf, size_t len, uint32_t nonce); // Alice
size_t CreateTerminationBlock (uint8_t * buf, size_t len);
@ -370,7 +366,6 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo::Address> m_Address;
boost::asio::ip::udp::endpoint m_RemoteEndpoint;
i2p::data::RouterInfo::CompatibleTransports m_RemoteTransports, m_RemotePeerTestTransports;
int m_RemoteVersion;
uint64_t m_DestConnID, m_SourceConnID;
SSU2SessionState m_State;
uint8_t m_KeyDataSend[64], m_KeyDataReceive[64];
@ -396,18 +391,49 @@ namespace transport
uint64_t m_LastResendTime, m_LastResendAttemptTime; // in milliseconds
const int SSU2_PEER_TEST_RESEND_INTERVAL = 3000; // in milliseconds
const int SSU2_PEER_TEST_RESEND_INTERVAL_VARIANCE = 2000; // in milliseconds
class SSU2PeerTestSession: public SSU2Session // for PeerTest msgs 5,6,7
SSU2PeerTestSession (SSU2Server& server, uint64_t sourceConnID, uint64_t destConnID);
uint8_t GetMsgNumReceived () const { return m_MsgNumReceived; }
bool IsConnectedRecently () const { return m_IsConnectedRecently; }
void SetStatusChanged () { m_IsStatusChanged = true; }
void SendPeerTest (uint8_t msg, const uint8_t * signedData, size_t signedDataLen,
std::shared_ptr<const i2p::data::RouterInfo::Address> addr);
bool ProcessPeerTest (uint8_t * buf, size_t len) override;
void Connect () override; // outgoing
bool ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len) override; // incoming
void SendPeerTest (uint8_t msg, const uint8_t * signedData, size_t signedDataLen); // PeerTest message
void SendPeerTest (uint8_t msg); // send or resend m_SignedData
void HandlePeerTest (const uint8_t * buf, size_t len) override;
void ScheduleResend ();
uint8_t m_MsgNumReceived, m_NumResends;
bool m_IsConnectedRecently, m_IsStatusChanged;
std::vector<uint8_t> m_SignedData; // for resends
boost::asio::deadline_timer m_PeerTestResendTimer;
inline uint64_t CreateHeaderMask (const uint8_t * kh, const uint8_t * nonce)
uint64_t data = 0;
i2p::crypto::ChaCha20 ((uint8_t *)&data, 8, kh, nonce, (uint8_t *)&data);
return data;
inline void CreateNonce (uint64_t seqn, uint8_t * nonce)
memset (nonce, 0, 4);
htole64buf (nonce + 4, seqn);
@ -73,14 +73,14 @@ namespace stream
m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed
m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false),
m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (false),
m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_LocalDestination (local),
m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_LocalDestination (local),
m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service),
m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port),
m_RTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_SlowRTT2 (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0),
m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO),
m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT),
m_Jitter (0), m_MinPacingTime (0),
m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_RemoteLeaseChangeTime (0),
m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0),
m_LastACKSendTime (0), m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed
m_NumResendAttempts (0), m_NumPacketsToSend (0), m_MTU (STREAMING_MTU)
@ -101,13 +101,13 @@ namespace stream
m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed
m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false),
m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), m_IsWinDropped (false),
m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), m_LocalDestination (local),
m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_LocalDestination (local),
m_ReceiveTimer (m_Service), m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_SlowRTT2 (INITIAL_RTT),
m_WindowSize (INITIAL_WINDOW_SIZE), m_LastWindowDropSize (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0),
m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()),
m_PrevRTTSample (INITIAL_RTT), m_Jitter (0), m_MinPacingTime (0),
m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), m_RemoteLeaseChangeTime (0),
m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0),
m_LastACKSendTime (0), m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed
m_NumResendAttempts (0), m_NumPacketsToSend (0), m_MTU (STREAMING_MTU)
@ -256,7 +256,6 @@ namespace stream
if (receivedSeqn <= m_PreviousReceivedSequenceNumber || receivedSeqn == m_LastReceivedSequenceNumber)
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
CancelRemoteLeaseChange ();
UpdateCurrentRemoteLease ();
m_PreviousReceivedSequenceNumber = receivedSeqn;
@ -1105,7 +1104,6 @@ namespace stream
if (!m_RemoteLeaseSet)
CancelRemoteLeaseChange ();
UpdateCurrentRemoteLease ();
if (!m_RemoteLeaseSet)
@ -1129,30 +1127,9 @@ namespace stream
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
if (!m_CurrentRemoteLease || !m_CurrentRemoteLease->endDate) // excluded from LeaseSet
CancelRemoteLeaseChange ();
if (!m_CurrentRemoteLease || !m_CurrentRemoteLease->endDate || // excluded from LeaseSet
ts >= m_CurrentRemoteLease->endDate - i2p::data::LEASE_ENDDATE_THRESHOLD)
UpdateCurrentRemoteLease (true);
if (m_RemoteLeaseChangeTime && m_IsRemoteLeaseChangeInProgress && ts > m_RemoteLeaseChangeTime + INITIAL_RTT)
CancelRemoteLeaseChange ();
m_CurrentRemoteLease = m_NextRemoteLease;
HalveWindowSize ();
auto currentRemoteLease = m_CurrentRemoteLease;
if (!m_IsRemoteLeaseChangeInProgress && m_RemoteLeaseSet && m_CurrentRemoteLease && ts >= m_CurrentRemoteLease->endDate - i2p::data::LEASE_ENDDATE_THRESHOLD)
auto leases = m_RemoteLeaseSet->GetNonExpiredLeases (false);
if (leases.size ())
m_IsRemoteLeaseChangeInProgress = true;
UpdateCurrentRemoteLease (true);
m_NextRemoteLease = m_CurrentRemoteLease;
UpdateCurrentRemoteLease (true);
if (m_CurrentRemoteLease && ts < m_CurrentRemoteLease->endDate + i2p::data::LEASE_ENDDATE_THRESHOLD)
bool freshTunnel = false;
@ -1189,11 +1166,6 @@ namespace stream
m_NumSentBytes += it->GetLength ();
if (m_IsRemoteLeaseChangeInProgress && !m_RemoteLeaseChangeTime)
m_RemoteLeaseChangeTime = ts;
m_CurrentRemoteLease = currentRemoteLease; // change it back before new lease is confirmed
m_CurrentOutboundTunnel->SendTunnelDataMsgs (msgs);
@ -1237,8 +1209,7 @@ namespace stream
if (m_Status != eStreamStatusTerminated)
m_SendTimer.cancel ();
m_SendTimer.expires_from_now (boost::posix_time::microseconds(
SEND_INTERVAL + m_LocalDestination.GetRandom () % SEND_INTERVAL_VARIANCE));
m_SendTimer.expires_from_now (boost::posix_time::microseconds(SEND_INTERVAL));
m_SendTimer.async_wait (std::bind (&Stream::HandleSendTimer,
shared_from_this (), std::placeholders::_1));
@ -1251,19 +1222,10 @@ namespace stream
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
if (m_LastSendTime && ts*1000 > m_LastSendTime*1000 + m_PacingTime)
if (m_PacingTime)
auto numPackets = std::lldiv (m_PacingTimeRem + ts*1000 - m_LastSendTime*1000, m_PacingTime);
m_NumPacketsToSend = numPackets.quot;
m_PacingTimeRem = numPackets.rem;
LogPrint (eLogError, "Streaming: pacing time is zero");
m_NumPacketsToSend = 1; m_PacingTimeRem = 0;
m_NumPacketsToSend = ((ts*1000 - m_LastSendTime*1000) + m_PacingTimeRem) / m_PacingTime;
m_PacingTimeRem = ((ts*1000 - m_LastSendTime*1000) + m_PacingTimeRem) - (m_NumPacketsToSend * m_PacingTime);
m_IsSendTime = true;
if (m_WindowIncCounter && m_WindowSize < MAX_WINDOW_SIZE && !m_SendBuffer.IsEmpty () && m_PacingTime > m_MinPacingTime)
if (m_WindowIncCounter && m_WindowSize < MAX_WINDOW_SIZE && !m_SendBuffer.IsEmpty ())
for (int i = 0; i < m_NumPacketsToSend; i++)
@ -1276,13 +1238,11 @@ namespace stream
m_WindowSize += (m_WindowSize - (1 - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowSize;
if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE;
m_WindowIncCounter --;
UpdatePacingTime ();
if (m_IsNAcked)
ResendPacket ();
else if (m_IsResendNeeded) // resend packets
@ -1406,7 +1366,6 @@ namespace stream
CancelRemoteLeaseChange ();
UpdateCurrentRemoteLease (); // pick another lease
LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts,
", another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
@ -1547,9 +1506,22 @@ namespace stream
LogPrint (eLogWarning, "Streaming: Remote LeaseSet not found");
m_CurrentRemoteLease = nullptr;
if (isLeaseChanged && !m_IsRemoteLeaseChangeInProgress)
if (isLeaseChanged)
HalveWindowSize ();
// drop window to initial upon RemoteLease change
if (m_WindowSize > INITIAL_WINDOW_SIZE)
m_WindowDropTargetSize = std::max (m_WindowSize/2, (float)INITIAL_WINDOW_SIZE);
m_IsWinDropped = true;
m_LastWindowDropSize = 0;
m_WindowIncCounter = 0;
m_IsFirstRttSample = true;
m_IsFirstACK = true;
UpdatePacingTime ();
@ -1588,29 +1560,6 @@ namespace stream
UpdatePacingTime ();
void Stream::HalveWindowSize ()
if (m_WindowSize > INITIAL_WINDOW_SIZE)
m_WindowDropTargetSize = std::max (m_WindowSize/2, (float)INITIAL_WINDOW_SIZE);
m_IsWinDropped = true;
m_LastWindowDropSize = 0;
m_WindowIncCounter = 0;
m_IsFirstRttSample = true;
m_IsFirstACK = true;
UpdatePacingTime ();
void Stream::CancelRemoteLeaseChange ()
m_RemoteLeaseChangeTime = 0;
m_IsRemoteLeaseChangeInProgress = false;
StreamingDestination::StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort, bool gzip):
m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip),
m_PendingIncomingTimer (m_Owner->GetService ()),
@ -69,8 +69,7 @@ namespace stream
const int PENDING_INCOMING_TIMEOUT = 10; // in seconds
const int MAX_RECEIVE_TIMEOUT = 20; // in seconds
const uint16_t DELAY_CHOKING = 60000; // in milliseconds
const uint64_t SEND_INTERVAL = 10000; // in microseconds
const uint64_t SEND_INTERVAL_VARIANCE = 2000; // in microseconds
const uint64_t SEND_INTERVAL = 1000; // in microseconds
const uint64_t REQUEST_IMMEDIATE_ACK_INTERVAL = 7500; // in milliseconds
const uint64_t REQUEST_IMMEDIATE_ACK_INTERVAL_VARIANCE = 3200; // in milliseconds
const bool LOSS_BASED_CONTROL_ENABLED = 1; // 0/1
@ -249,8 +248,6 @@ namespace stream
void UpdatePacingTime ();
void ProcessWindowDrop ();
void HalveWindowSize ();
void CancelRemoteLeaseChange ();
@ -271,14 +268,12 @@ namespace stream
bool m_IsWinDropped;
bool m_IsTimeOutResend;
bool m_IsImmediateAckRequested;
bool m_IsRemoteLeaseChangeInProgress;
StreamingDestination& m_LocalDestination;
std::shared_ptr<const i2p::data::IdentityEx> m_RemoteIdentity;
std::shared_ptr<const i2p::crypto::Verifier> m_TransientVerifier; // in case of offline key
std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet;
std::shared_ptr<i2p::garlic::GarlicRoutingSession> m_RoutingSession;
std::shared_ptr<const i2p::data::Lease> m_CurrentRemoteLease;
std::shared_ptr<const i2p::data::Lease> m_NextRemoteLease;
std::shared_ptr<i2p::tunnel::OutboundTunnel> m_CurrentOutboundTunnel;
std::queue<Packet *> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets;
@ -294,7 +289,7 @@ namespace stream
int m_WindowIncCounter, m_RTO, m_AckDelay, m_PrevRTTSample;
double m_Jitter;
uint64_t m_MinPacingTime, m_PacingTime, m_PacingTimeRem, // microseconds
m_LastSendTime, m_RemoteLeaseChangeTime; // miliseconds
m_LastSendTime; // miliseconds
uint64_t m_LastACKSendTime, m_PacketACKInterval, m_PacketACKIntervalRem; // for limit inbound speed
int m_NumResendAttempts, m_NumPacketsToSend;
size_t m_MTU;
@ -672,31 +672,6 @@ namespace transport
if (transport & compatibleTransports)
peer->priority.push_back (transport);
if (peer->priority.empty ())
// try recently connected SSU2 if any
auto supportedTransports = context.GetRouterInfo ().GetCompatibleTransports (false) &
peer->router->GetCompatibleTransports (false);
if (supportedTransports & (i2p::data::RouterInfo::eSSU2V4 | i2p::data::RouterInfo::eSSU2V6))
auto ep = peer->router->GetProfile ()->GetLastEndpoint ();
if (!ep.address ().is_unspecified () && ep.port ())
if (ep.address ().is_v4 ())
if ((supportedTransports & i2p::data::RouterInfo::eSSU2V4) &&
m_SSU2Server->IsConnectedRecently (ep, false))
peer->priority.push_back (i2p::data::RouterInfo::eSSU2V4);
else if (ep.address ().is_v6 ())
if ((supportedTransports & i2p::data::RouterInfo::eSSU2V6) &&
m_SSU2Server->IsConnectedRecently (ep))
peer->priority.push_back (i2p::data::RouterInfo::eSSU2V6);
void Transports::RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident)
@ -250,18 +250,7 @@ namespace tunnel
void InboundTunnel::HandleTunnelDataMsg (std::shared_ptr<I2NPMessage>&& msg)
if (!IsEstablished () && GetState () != eTunnelStateExpiring)
// incoming messages means a tunnel is alive
SetState (eTunnelStateEstablished);
auto pool = GetTunnelPool ();
if (pool)
// update LeaseSet
auto dest = pool->GetLocalDestination ();
if (dest) dest->SetLeaseSetUpdated (true);
if (GetState () != eTunnelStateExpiring) SetState (eTunnelStateEstablished); // incoming messages means a tunnel is alive
EncryptTunnelMsg (msg, msg);
msg->from = GetSharedFromThis ();
m_Endpoint.HandleDecryptedTunnelDataMsg (msg);
@ -350,8 +339,7 @@ namespace tunnel
Tunnels::Tunnels (): m_IsRunning (false), m_Thread (nullptr), m_MaxNumTransitTunnels (DEFAULT_MAX_NUM_TRANSIT_TUNNELS),
m_TotalNumSuccesiveTunnelCreations (0), m_TotalNumFailedTunnelCreations (0), // for normal average
m_TunnelCreationSuccessRate (TCSR_START_VALUE), m_TunnelCreationAttemptsNum(0),
m_Rng(i2p::util::GetMonotonicMicroseconds ()%1000000LL)
m_TunnelCreationSuccessRate (TCSR_START_VALUE), m_TunnelCreationAttemptsNum(0)
@ -491,21 +479,18 @@ namespace tunnel
std::this_thread::sleep_for (std::chrono::seconds(1)); // wait for other parts are ready
uint64_t lastTs = 0, lastPoolsTs = 0, lastMemoryPoolTs = 0;
std::list<std::shared_ptr<I2NPMessage> > msgs;
while (m_IsRunning)
if (m_Queue.Wait (1,0)) // 1 sec
auto msg = m_Queue.GetNextWithTimeout (1000); // 1 sec
if (msg)
m_Queue.GetWholeQueue (msgs);
int numMsgs = 0;
uint32_t prevTunnelID = 0, tunnelID = 0;
std::shared_ptr<TunnelBase> prevTunnel;
while (!msgs.empty ())
auto msg = msgs.front (); msgs.pop_front ();
if (!msg) continue;
std::shared_ptr<TunnelBase> tunnel;
uint8_t typeID = msg->GetTypeID ();
switch (typeID)
@ -545,18 +530,17 @@ namespace tunnel
LogPrint (eLogWarning, "Tunnel: Unexpected message type ", (int) typeID);
msg = (numMsgs <= MAX_TUNNEL_MSGS_BATCH_SIZE) ? m_Queue.Get () : nullptr;
if (msg)
prevTunnelID = tunnelID;
prevTunnel = tunnel;
if (msgs.empty ())
if (numMsgs < MAX_TUNNEL_MSGS_BATCH_SIZE && !m_Queue.IsEmpty ())
m_Queue.GetWholeQueue (msgs); // try more
else if (tunnel)
tunnel->FlushTunnelDataMsgs (); // otherwise flush last
tunnel->FlushTunnelDataMsgs ();
while (msg);
if (i2p::transport::transports.IsOnline())
@ -842,7 +826,7 @@ namespace tunnel
if (msg) m_Queue.Put (msg);
void Tunnels::PostTunnelData (std::list<std::shared_ptr<I2NPMessage> >& msgs)
void Tunnels::PostTunnelData (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
m_Queue.Put (msgs);
@ -18,7 +18,6 @@
#include <thread>
#include <mutex>
#include <memory>
#include <random>
#include "util.h"
#include "Queue.h"
#include "Crypto.h"
@ -230,7 +229,7 @@ namespace tunnel
std::shared_ptr<InboundTunnel> CreateInboundTunnel (std::shared_ptr<TunnelConfig> config, std::shared_ptr<TunnelPool> pool, std::shared_ptr<OutboundTunnel> outboundTunnel);
std::shared_ptr<OutboundTunnel> CreateOutboundTunnel (std::shared_ptr<TunnelConfig> config, std::shared_ptr<TunnelPool> pool);
void PostTunnelData (std::shared_ptr<I2NPMessage> msg);
void PostTunnelData (std::list<std::shared_ptr<I2NPMessage> >& msgs); // and cleanup msgs
void PostTunnelData (const std::vector<std::shared_ptr<I2NPMessage> >& msgs);
void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr<InboundTunnel> tunnel);
void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr<OutboundTunnel> tunnel);
std::shared_ptr<TunnelPool> CreateTunnelPool (int numInboundHops,
@ -245,8 +244,6 @@ namespace tunnel
uint32_t GetMaxNumTransitTunnels () const { return m_MaxNumTransitTunnels; };
int GetCongestionLevel() const { return m_MaxNumTransitTunnels ? CONGESTION_LEVEL_FULL * m_TransitTunnels.size() / m_MaxNumTransitTunnels : CONGESTION_LEVEL_FULL; }
std::mt19937& GetRng () { return m_Rng; };
template<class TTunnel>
@ -310,7 +307,6 @@ namespace tunnel
int m_TotalNumSuccesiveTunnelCreations, m_TotalNumFailedTunnelCreations;
double m_TunnelCreationSuccessRate;
int m_TunnelCreationAttemptsNum;
std::mt19937 m_Rng;
@ -141,7 +141,7 @@ namespace tunnel
m_InboundTunnels.insert (createdTunnel);
if (m_LocalDestination)
m_LocalDestination->SetLeaseSetUpdated (true);
m_LocalDestination->SetLeaseSetUpdated ();
void TunnelPool::TunnelExpired (std::shared_ptr<InboundTunnel> expiredTunnel)
@ -330,7 +330,7 @@ namespace tunnel
if (num < m_NumInboundTunnels && m_NumInboundHops <= 0 && m_LocalDestination) // zero hops IB
m_LocalDestination->SetLeaseSetUpdated (true); // update LeaseSet immediately
m_LocalDestination->SetLeaseSetUpdated (); // update LeaseSet immediately
void TunnelPool::TestTunnels ()
@ -377,10 +377,10 @@ namespace tunnel
it.second.second->SetState (eTunnelStateTestFailed);
if (failed && m_LocalDestination)
m_LocalDestination->SetLeaseSetUpdated (true);
m_LocalDestination->SetLeaseSetUpdated ();
if (m_LocalDestination)
m_LocalDestination->SetLeaseSetUpdated (true);
m_LocalDestination->SetLeaseSetUpdated ();
else if (it.second.second->GetState () != eTunnelStateExpiring)
it.second.second->SetState (eTunnelStateTestFailed);
@ -1,5 +1,5 @@
* Copyright (c) 2013-2024, The PurpleI2P Project
* Copyright (c) 2013-2023, The PurpleI2P Project
* This file is part of Purple i2pd project and licensed under BSD3
@ -784,7 +784,7 @@ namespace client
if (!found)
LogPrint (eLogError, "I2PTunnel: Unable to resolve ", m_Address, " to compatible address");
LogPrint (eLogError, "I2PTunnel: Unable to resolve to compatible address");
@ -794,7 +794,7 @@ namespace client
Accept ();
LogPrint (eLogError, "I2PTunnel: Unable to resolve server tunnel address ", m_Address, ": ", ecode.message ());
LogPrint (eLogError, "I2PTunnel: Unable to resolve server tunnel address: ", ecode.message ());
void I2PServerTunnel::SetAccessList (const std::set<i2p::data::IdentHash>& accessList)
@ -203,7 +203,7 @@ namespace client
std::vector<std::shared_ptr<DatagramSessionInfo> > sessions;
std::lock_guard<std::mutex> lock (m_SessionsMutex);
for (const auto &it: m_Sessions)
for (auto it: m_Sessions)
auto s = it.second;
if (!s->m_Destination) continue;
Add table
Reference in a new issue