From 9acbb2203cca1c4e6c3de4e9abe1f22582a54523 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sat, 3 Sep 2016 11:46:47 -0400 Subject: [PATCH] Update Indentation and don't spam route changes in datagram sessions --- ClientContext.cpp | 177 ++++++++--------- ClientContext.h | 33 ++-- Datagram.cpp | 490 +++++++++++++++++++++++++--------------------- Datagram.h | 117 ++++++----- LeaseSet.cpp | 3 +- LeaseSet.h | 11 +- Streaming.cpp | 12 +- 7 files changed, 461 insertions(+), 382 deletions(-) diff --git a/ClientContext.cpp b/ClientContext.cpp index 3e33ae07..3c4d63c0 100644 --- a/ClientContext.cpp +++ b/ClientContext.cpp @@ -17,8 +17,8 @@ namespace client ClientContext::ClientContext (): m_SharedLocalDestination (nullptr), m_HttpProxy (nullptr), m_SocksProxy (nullptr), m_SamBridge (nullptr), - m_BOBCommandChannel (nullptr), m_I2CPServer (nullptr), - m_CleanupUDPTimer(m_Service, boost::posix_time::seconds(1)) + m_BOBCommandChannel (nullptr), m_I2CPServer (nullptr), + m_CleanupUDPTimer(m_Service, boost::posix_time::seconds(1)) { } @@ -88,14 +88,14 @@ namespace client } } - if ( m_ServiceThread == nullptr ) { - m_ServiceThread = new std::thread([&] () { - LogPrint(eLogInfo, "ClientContext: starting service"); - m_Service.run(); - LogPrint(eLogError, "ClientContext: service died"); - }); - ScheduleCleanupUDP(); - } + if ( m_ServiceThread == nullptr ) { + m_ServiceThread = new std::thread([&] () { + LogPrint(eLogInfo, "ClientContext: starting service"); + m_Service.run(); + LogPrint(eLogError, "ClientContext: service died"); + }); + ScheduleCleanupUDP(); + } // I2P tunnels @@ -211,23 +211,24 @@ namespace client m_AddressBook.Stop (); { - std::lock_guard lock(m_ForwardsMutex); - m_ServerForwards.clear(); - m_ClientForwards.clear(); - } - - + std::lock_guard lock(m_ForwardsMutex); + m_ServerForwards.clear(); + m_ClientForwards.clear(); + } + + for (auto& it: m_Destinations) it.second->Stop (); m_Destinations.clear (); m_SharedLocalDestination = nullptr; - // stop io service thread - if(m_ServiceThread) { - m_Service.stop(); - m_ServiceThread->join(); - delete m_ServiceThread; - m_ServiceThread = nullptr; - } + // stop io service thread + if(m_ServiceThread) + { + m_Service.stop(); + m_ServiceThread->join(); + delete m_ServiceThread; + m_ServiceThread = nullptr; + } } void ClientContext::ReloadConfig () @@ -402,34 +403,34 @@ namespace client localDestination = CreateNewLocalDestination (k, type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT, &options); } } - if (type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { - // udp client - // TODO: hostnames - boost::asio::ip::udp::endpoint end(boost::asio::ip::address::from_string(address), port); - if (!localDestination) - { - localDestination = m_SharedLocalDestination; - } - auto clientTunnel = new I2PUDPClientTunnel(name, dest, end, localDestination, destinationPort); - if(m_ClientForwards.insert(std::make_pair(end, std::unique_ptr(clientTunnel))).second) - { - clientTunnel->Start(); - } - else - LogPrint(eLogError, "Clients: I2P Client forward for endpoint ", end, " already exists"); + if (type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { + // udp client + // TODO: hostnames + boost::asio::ip::udp::endpoint end(boost::asio::ip::address::from_string(address), port); + if (!localDestination) + { + localDestination = m_SharedLocalDestination; + } + auto clientTunnel = new I2PUDPClientTunnel(name, dest, end, localDestination, destinationPort); + if(m_ClientForwards.insert(std::make_pair(end, std::unique_ptr(clientTunnel))).second) + { + clientTunnel->Start(); + } + else + LogPrint(eLogError, "Clients: I2P Client forward for endpoint ", end, " already exists"); - } else { - // tcp client - auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); - if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (), - std::unique_ptr(clientTunnel))).second) - { - clientTunnel->Start (); - numClientTunnels++; - } - else - LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists"); - } + } else { + // tcp client + auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); + if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (), + std::unique_ptr(clientTunnel))).second) + { + clientTunnel->Start (); + numClientTunnels++; + } + else + LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists"); + } } else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC || type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) { @@ -458,28 +459,28 @@ namespace client localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ()); if (!localDestination) localDestination = CreateNewLocalDestination (k, true, &options); - if (type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) - { - // udp server tunnel - // TODO: hostnames - auto localAddress = boost::asio::ip::address::from_string(address); - boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port); - I2PUDPServerTunnel * serverTunnel = new I2PUDPServerTunnel(name, localDestination, localAddress, endpoint, port); - std::lock_guard lock(m_ForwardsMutex); - if(m_ServerForwards.insert( - std::make_pair( - std::make_pair( - localDestination->GetIdentHash(), port), - std::unique_ptr(serverTunnel))).second) - { - serverTunnel->Start(); - LogPrint(eLogInfo, "Clients: I2P Server Forward created for UDP Endpoint ", host, ":", port, " bound on ", address, " for ",localDestination->GetIdentHash().ToBase32()); - } - else - LogPrint(eLogError, "Clients: I2P Server Forward for destination/port ", m_AddressBook.ToAddress(localDestination->GetIdentHash()), "/", port, "already exists"); - - continue; - } + if (type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) + { + // udp server tunnel + // TODO: hostnames + auto localAddress = boost::asio::ip::address::from_string(address); + boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port); + I2PUDPServerTunnel * serverTunnel = new I2PUDPServerTunnel(name, localDestination, localAddress, endpoint, port); + std::lock_guard lock(m_ForwardsMutex); + if(m_ServerForwards.insert( + std::make_pair( + std::make_pair( + localDestination->GetIdentHash(), port), + std::unique_ptr(serverTunnel))).second) + { + serverTunnel->Start(); + LogPrint(eLogInfo, "Clients: I2P Server Forward created for UDP Endpoint ", host, ":", port, " bound on ", address, " for ",localDestination->GetIdentHash().ToBase32()); + } + else + LogPrint(eLogError, "Clients: I2P Server Forward for destination/port ", m_AddressBook.ToAddress(localDestination->GetIdentHash()), "/", port, "already exists"); + + continue; + } I2PServerTunnel * serverTunnel; if (type == I2P_TUNNELS_SECTION_TYPE_HTTP) @@ -512,7 +513,7 @@ namespace client std::make_pair (localDestination->GetIdentHash (), inPort), std::unique_ptr(serverTunnel))).second) { - serverTunnel->Start (); + serverTunnel->Start (); numServerTunnels++; } else @@ -532,21 +533,21 @@ namespace client LogPrint (eLogInfo, "Clients: ", numServerTunnels, " I2P server tunnels created"); } - void ClientContext::ScheduleCleanupUDP() - { - // schedule cleanup in 1 second - m_CleanupUDPTimer.expires_at(m_CleanupUDPTimer.expires_at() + boost::posix_time::seconds(1)); - m_CleanupUDPTimer.async_wait(std::bind(&ClientContext::CleanupUDP, this, std::placeholders::_1)); - } + void ClientContext::ScheduleCleanupUDP() + { + // schedule cleanup in 1 second + m_CleanupUDPTimer.expires_at(m_CleanupUDPTimer.expires_at() + boost::posix_time::seconds(1)); + m_CleanupUDPTimer.async_wait(std::bind(&ClientContext::CleanupUDP, this, std::placeholders::_1)); + } - void ClientContext::CleanupUDP(const boost::system::error_code & ecode) - { - if(!ecode) - { - std::lock_guard lock(m_ForwardsMutex); - for ( auto & s : m_ServerForwards ) s.second->ExpireStale(); - ScheduleCleanupUDP(); - } - } + void ClientContext::CleanupUDP(const boost::system::error_code & ecode) + { + if(!ecode) + { + std::lock_guard lock(m_ForwardsMutex); + for ( auto & s : m_ServerForwards ) s.second->ExpireStale(); + ScheduleCleanupUDP(); + } + } } } diff --git a/ClientContext.h b/ClientContext.h index 0ff7993f..87dbec0f 100644 --- a/ClientContext.h +++ b/ClientContext.h @@ -24,8 +24,8 @@ namespace client const char I2P_TUNNELS_SECTION_TYPE_SERVER[] = "server"; const char I2P_TUNNELS_SECTION_TYPE_HTTP[] = "http"; const char I2P_TUNNELS_SECTION_TYPE_IRC[] = "irc"; - const char I2P_TUNNELS_SECTION_TYPE_UDPCLIENT[] = "udpclient"; - const char I2P_TUNNELS_SECTION_TYPE_UDPSERVER[] = "udpserver"; + const char I2P_TUNNELS_SECTION_TYPE_UDPCLIENT[] = "udpclient"; + const char I2P_TUNNELS_SECTION_TYPE_UDPSERVER[] = "udpserver"; const char I2P_CLIENT_TUNNEL_PORT[] = "port"; const char I2P_CLIENT_TUNNEL_ADDRESS[] = "address"; const char I2P_CLIENT_TUNNEL_DESTINATION[] = "destination"; @@ -75,10 +75,10 @@ namespace client template void ReadI2CPOptions (const Section& section, std::map& options) const; - void CleanupUDP(const boost::system::error_code & ecode); - void ScheduleCleanupUDP(); - - private: + void CleanupUDP(const boost::system::error_code & ecode); + void ScheduleCleanupUDP(); + + private: std::mutex m_DestinationsMutex; std::map > m_Destinations; @@ -91,25 +91,26 @@ namespace client std::map > m_ClientTunnels; // local endpoint->tunnel std::map, std::unique_ptr > m_ServerTunnels; // ->tunnel - std::mutex m_ForwardsMutex; - - std::map > m_ClientForwards; // local endpoint -> udp tunnel - std::map, std::unique_ptr > m_ServerForwards; // -> udp tunnel - - SAMBridge * m_SamBridge; + std::mutex m_ForwardsMutex; + std::map > m_ClientForwards; // local endpoint -> udp tunnel + std::map, std::unique_ptr > m_ServerForwards; // -> udp tunnel + + SAMBridge * m_SamBridge; BOBCommandChannel * m_BOBCommandChannel; I2CPServer * m_I2CPServer; - boost::asio::io_service m_Service; - std::thread * m_ServiceThread; + boost::asio::io_service m_Service; + std::thread * m_ServiceThread; - boost::asio::deadline_timer m_CleanupUDPTimer; - + boost::asio::deadline_timer m_CleanupUDPTimer; + public: // for HTTP const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; }; const decltype(m_ClientTunnels)& GetClientTunnels () const { return m_ClientTunnels; }; const decltype(m_ServerTunnels)& GetServerTunnels () const { return m_ServerTunnels; }; + const decltype(m_ClientForwards)& GetClientForwards () const { return m_ClientForwards; } + const decltype(m_ServerForwards)& GetServerForwards () const { return m_ServerForwards; } }; extern ClientContext context; diff --git a/Datagram.cpp b/Datagram.cpp index cdd468b1..37559922 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -13,22 +13,22 @@ namespace datagram { DatagramDestination::DatagramDestination (std::shared_ptr owner): m_Owner (owner.get()), - m_CleanupTimer(owner->GetService()), - m_Receiver (nullptr) + m_CleanupTimer(owner->GetService()), + m_Receiver (nullptr) { - ScheduleCleanup(); + ScheduleCleanup(); } DatagramDestination::~DatagramDestination () { - m_CleanupTimer.cancel(); - m_Sessions.clear(); + m_CleanupTimer.cancel(); + m_Sessions.clear(); } void DatagramDestination::SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort, uint16_t toPort) { - auto owner = m_Owner; - auto i = owner->GetIdentity(); + auto owner = m_Owner; + auto i = owner->GetIdentity(); uint8_t buf[MAX_DATAGRAM_SIZE]; auto identityLen = i->ToBuffer (buf, MAX_DATAGRAM_SIZE); uint8_t * signature = buf + identityLen; @@ -41,15 +41,15 @@ namespace datagram { uint8_t hash[32]; SHA256(buf1, len, hash); - owner->Sign (hash, 32, signature); + owner->Sign (hash, 32, signature); } else owner->Sign (buf1, len, signature); auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); - auto session = ObtainSession(ident); - session->SendMsg(msg); - } + auto session = ObtainSession(ident); + session->SendMsg(msg); + } void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) @@ -120,224 +120,274 @@ namespace datagram return msg; } - void DatagramDestination::ScheduleCleanup() - { - m_CleanupTimer.expires_from_now(boost::posix_time::seconds(DATAGRAM_SESSION_CLEANUP_INTERVAL)); - m_CleanupTimer.async_wait(std::bind(&DatagramDestination::HandleCleanUp, this, std::placeholders::_1)); - } + void DatagramDestination::ScheduleCleanup() + { + m_CleanupTimer.expires_from_now(boost::posix_time::seconds(DATAGRAM_SESSION_CLEANUP_INTERVAL)); + m_CleanupTimer.async_wait(std::bind(&DatagramDestination::HandleCleanUp, this, std::placeholders::_1)); + } - void DatagramDestination::HandleCleanUp(const boost::system::error_code & ecode) - { - if(ecode) - return; - std::lock_guard lock(m_SessionsMutex); - auto now = i2p::util::GetMillisecondsSinceEpoch(); - LogPrint(eLogDebug, "DatagramDestination: clean up sessions"); - std::vector expiredSessions; - // for each session ... - for (auto & e : m_Sessions) { - // check if expired - if(now - e.second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE) - expiredSessions.push_back(e.first); // we are expired - } - // for each expired session ... - for (auto & ident : expiredSessions) { - // remove the expired session - LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", ident.ToBase32()); - m_Sessions.erase(ident); - } - m_Owner->CleanupExpiredTags(); - ScheduleCleanup(); - } - - std::shared_ptr DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident) - { - std::shared_ptr session = nullptr; - std::lock_guard lock(m_SessionsMutex); - auto itr = m_Sessions.find(ident); - if (itr == m_Sessions.end()) { - // not found, create new session - session = std::make_shared(m_Owner, ident); - m_Sessions[ident] = session; - } else { - session = itr->second; - } - return session; - } + void DatagramDestination::HandleCleanUp(const boost::system::error_code & ecode) + { + if(ecode) + return; + std::lock_guard lock(m_SessionsMutex); + auto now = i2p::util::GetMillisecondsSinceEpoch(); + LogPrint(eLogDebug, "DatagramDestination: clean up sessions"); + std::vector expiredSessions; + // for each session ... + for (auto & e : m_Sessions) { + // check if expired + if(now - e.second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE) + expiredSessions.push_back(e.first); // we are expired + } + // for each expired session ... + for (auto & ident : expiredSessions) { + // remove the expired session + LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", ident.ToBase32()); + m_Sessions.erase(ident); + } + m_Owner->CleanupExpiredTags(); + ScheduleCleanup(); + } + + std::shared_ptr DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident) + { + std::shared_ptr session = nullptr; + std::lock_guard lock(m_SessionsMutex); + auto itr = m_Sessions.find(ident); + if (itr == m_Sessions.end()) { + // not found, create new session + session = std::make_shared(m_Owner, ident); + m_Sessions[ident] = session; + } else { + session = itr->second; + } + return session; + } - DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination, - const i2p::data::IdentHash & remoteIdent) : - m_LocalDestination(localDestination), - m_RemoteIdentity(remoteIdent), - m_LastUse(i2p::util::GetMillisecondsSinceEpoch()) - { - } + DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination, + const i2p::data::IdentHash & remoteIdent) : + m_LocalDestination(localDestination), + m_RemoteIdentity(remoteIdent), + m_LastUse(i2p::util::GetMillisecondsSinceEpoch ()), + m_LastPathChange(0), + m_LastSuccess(0) + { + } - void DatagramSession::SendMsg(std::shared_ptr msg) - { - // we used this session - m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); - // schedule send - m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg)); - } + void DatagramSession::SendMsg(std::shared_ptr msg) + { + // we used this session + m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); + // schedule send + m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg)); + } - void DatagramSession::HandleSend(std::shared_ptr msg) - { - // do we have a routing session? - if(m_RoutingSession) - { - // do we have a routing path ? - auto routingPath = m_RoutingSession->GetSharedRoutingPath(); - if(!routingPath) - { - LogPrint(eLogDebug, "DatagramSession: try getting new routing path"); - // no routing path, try getting one - routingPath = GetNextRoutingPath(); - if(routingPath) // remember the routing path if we got one - m_RoutingSession->SetSharedRoutingPath(routingPath); - } - // make sure we have a routing path - if (routingPath) - { - auto outboundTunnel = routingPath->outboundTunnel; - if (outboundTunnel) - { - if(outboundTunnel->IsEstablished()) - { - // we have a routing path and routing session and the outbound tunnel we are using is good - // wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW - auto m = m_RoutingSession->WrapSingleMessage(msg); - routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{ - i2p::tunnel::eDeliveryTypeTunnel, - routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, - m - }}); - return; - } - } - } - } - // we couldn't send so let's try resetting the routing path and updating lease set - ResetRoutingPath(); - UpdateLeaseSet(msg); - - } + void DatagramSession::HandleSend(std::shared_ptr msg) + { + // do we have a routing session? + if(m_RoutingSession) + { + // should we switch paths? + if(ShouldUpdateRoutingPath ()) + { + LogPrint(eLogDebug, "DatagramSession: try getting new routing path"); + // try switching paths + UpdateRoutingPath (GetNextRoutingPath ()); + } + auto routingPath = m_RoutingSession->GetSharedRoutingPath (); + // make sure we have a routing path + if (routingPath) + { + auto outboundTunnel = routingPath->outboundTunnel; + if (outboundTunnel) + { + if(outboundTunnel->IsEstablished()) + { + m_LastSuccess = i2p::util::GetMillisecondsSinceEpoch (); + // we have a routing path and routing session and the outbound tunnel we are using is good + // wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW + auto m = m_RoutingSession->WrapSingleMessage(msg); + routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{ + i2p::tunnel::eDeliveryTypeTunnel, + routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, + m + }}); + return; + } + } + } + } + auto now = i2p::util::GetMillisecondsSinceEpoch (); + // if this path looks dead reset the routing path since we didn't seem to be able to get a path in time + if (now - m_LastPathChange >= DATAGRAM_SESSION_PATH_TIMEOUT ) ResetRoutingPath(); + UpdateLeaseSet(msg); + + } - std::shared_ptr DatagramSession::GetNextRoutingPath() - { - std::shared_ptr outboundTunnel = nullptr; - std::shared_ptr routingPath = nullptr; - // get existing routing path if we have one - if(m_RoutingSession) - routingPath = m_RoutingSession->GetSharedRoutingPath(); - // do we have an existing outbound tunnel and routing path? - if(routingPath && routingPath->outboundTunnel) - { - // is the outbound tunnel we are using good? - if (routingPath->outboundTunnel->IsEstablished()) - { - // ya so let's stick with it - outboundTunnel = routingPath->outboundTunnel; - } - else - outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels - // don't reuse the old path as we are making a new one - routingPath = nullptr; - } - // do we have an outbound tunnel that works already ? - if(!outboundTunnel) - outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started + void DatagramSession::UpdateRoutingPath(const std::shared_ptr & path) + { + // we can't update routing path because we have no routing session + if(!m_RoutingSession) return; + // set routing path and update time we last updated the routing path + m_RoutingSession->SetSharedRoutingPath (path); + m_LastPathChange = i2p::util::GetMillisecondsSinceEpoch (); + } - if(outboundTunnel) - { - // get next available lease - auto lease = GetNextLease(); - if(lease) - { - // we have a valid lease to use and an outbound tunnel - // create new routing path - uint32_t now = i2p::util::GetSecondsSinceEpoch(); - routingPath = std::make_shared(i2p::garlic::GarlicRoutingPath{ - outboundTunnel, - lease, - 0, - now, - 0 - }); - } - } - return routingPath; - } + bool DatagramSession::ShouldUpdateRoutingPath() const + { + auto now = i2p::util::GetMillisecondsSinceEpoch (); + // we need to rotate paths becuase the routing path is too old + if (now - m_LastPathChange >= DATAGRAM_SESSION_PATH_SWITCH_INTERVAL) return true; + // our path looks dead so we need to rotate paths + if (now - m_LastSuccess >= DATAGRAM_SESSION_PATH_TIMEOUT) return true; + // if we have a routing session and routing path we don't need to switch paths + return m_RoutingSession != nullptr && m_RoutingSession->GetSharedRoutingPath () != nullptr; + } - void DatagramSession::ResetRoutingPath() - { - if(m_RoutingSession) - { - auto routingPath = m_RoutingSession->GetSharedRoutingPath(); - if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path - { - // get outbound tunnel on this path - auto outboundTunnel = routingPath->outboundTunnel; - // is this outbound tunnel there and established - if (outboundTunnel && outboundTunnel->IsEstablished()) - m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine - } - // reset the routing path - m_RoutingSession->SetSharedRoutingPath(nullptr); - } - } - std::shared_ptr DatagramSession::GetNextLease() - { - std::shared_ptr next = nullptr; - if(m_RemoteLeaseSet) - { - std::vector exclude; - for(const auto & ident : m_InvalidIBGW) - exclude.push_back(ident); - // find get all leases that are not in our ban list - auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude] (const i2p::data::Lease & l) -> bool { - if(exclude.size()) - { - auto end = std::end(exclude); - return std::find_if(exclude.begin(), end, [l] ( const i2p::data::IdentHash & ident) -> bool { - return ident == l.tunnelGateway; - }) != end; - } - else - return false; - }); - if(leases.size()) - { - // pick random valid next lease - uint32_t idx = rand() % leases.size(); - next = leases[idx]; - } - } - return next; - } - - void DatagramSession::UpdateLeaseSet(std::shared_ptr msg) - { - LogPrint(eLogInfo, "DatagramSession: updating lease set"); - m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg)); - } + bool DatagramSession::ShouldSwitchLease() const + { + auto now = i2p::util::GetMillisecondsSinceEpoch (); + std::shared_ptr routingPath = nullptr; + std::shared_ptr currentLease = nullptr; + if(m_RoutingSession) + routingPath = m_RoutingSession->GetSharedRoutingPath (); + if(routingPath) + currentLease = routingPath->remoteLease; + if(currentLease) // if we have a lease return true if it's about to expire otherwise return false + return now - currentLease->ExpiresWithin( DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE ); + // we have no current lease, we should switch + return true; + } + + std::shared_ptr DatagramSession::GetNextRoutingPath() + { + std::shared_ptr outboundTunnel = nullptr; + std::shared_ptr routingPath = nullptr; + // get existing routing path if we have one + if(m_RoutingSession) + routingPath = m_RoutingSession->GetSharedRoutingPath(); + // do we have an existing outbound tunnel and routing path? + if(routingPath && routingPath->outboundTunnel) + { + // is the outbound tunnel we are using good? + if (routingPath->outboundTunnel->IsEstablished()) + { + // ya so let's stick with it + outboundTunnel = routingPath->outboundTunnel; + } + else + outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels + } + // do we have an outbound tunnel that works already ? + if(!outboundTunnel) + outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started - void DatagramSession::HandleGotLeaseSet(std::shared_ptr remoteIdent, std::shared_ptr msg) - { - if(remoteIdent) { - // update routing session - if(m_RoutingSession) - m_RoutingSession = nullptr; - m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true); - // clear invalid IBGW as we have a new lease set - m_InvalidIBGW.clear(); - m_RemoteLeaseSet = remoteIdent; - // send the message that was queued if it was provided - if(msg) - HandleSend(msg); - } - } + if(outboundTunnel) + { + std::shared_ptr lease = nullptr; + // should we switch leases ? + if (ShouldSwitchLease ()) + { + // yes, get next available lease + lease = GetNextLease(); + } + else if (routingPath) + { + // stick with the lease we have if we have one + lease = routingPath->remoteLease; + } + if(lease) + { + // we have a valid lease to use and an outbound tunnel + // create new routing path + uint32_t now = i2p::util::GetSecondsSinceEpoch(); + routingPath = std::make_shared(i2p::garlic::GarlicRoutingPath{ + outboundTunnel, + lease, + 0, + now, + 0 + }); + } + else // we don't have a new routing path to give + routingPath = nullptr; + } + return routingPath; + } + + void DatagramSession::ResetRoutingPath() + { + if(m_RoutingSession) + { + auto routingPath = m_RoutingSession->GetSharedRoutingPath(); + if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path + { + // get outbound tunnel on this path + auto outboundTunnel = routingPath->outboundTunnel; + // is this outbound tunnel there and established + if (outboundTunnel && outboundTunnel->IsEstablished()) + m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine + } + // reset the routing path + UpdateRoutingPath(nullptr); + } + } + + std::shared_ptr DatagramSession::GetNextLease() + { + auto now = i2p::util::GetMillisecondsSinceEpoch (); + std::shared_ptr next = nullptr; + if(m_RemoteLeaseSet) + { + std::vector exclude; + for(const auto & ident : m_InvalidIBGW) + exclude.push_back(ident); + // find get all leases that are not in our ban list and are not going to expire within our lease set handover window + fudge + auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude, now] (const i2p::data::Lease & l) -> bool { + if(exclude.size()) + { + auto end = std::end(exclude); + return std::find_if(exclude.begin(), end, [l, now] ( const i2p::data::IdentHash & ident) -> bool { + return ident == l.tunnelGateway || l.ExpiresWithin (DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE); + }) != end; + } + else + return l.ExpiresWithin (DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE); + }); + if(leases.size()) + { + // pick random valid next lease + uint32_t idx = rand() % leases.size(); + next = leases[idx]; + } + } + return next; + } + + void DatagramSession::UpdateLeaseSet(std::shared_ptr msg) + { + LogPrint(eLogInfo, "DatagramSession: updating lease set"); + m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg)); + } + + void DatagramSession::HandleGotLeaseSet(std::shared_ptr remoteIdent, std::shared_ptr msg) + { + if(remoteIdent) + { + // update routing session + if(m_RoutingSession) + m_RoutingSession = nullptr; + m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true); + // clear invalid IBGW as we have a new lease set + m_InvalidIBGW.clear(); + m_RemoteLeaseSet = remoteIdent; + // send the message that was queued if it was provided + if(msg) + HandleSend(msg); + } + } } } diff --git a/Datagram.h b/Datagram.h index 887c82d2..fd1f290d 100644 --- a/Datagram.h +++ b/Datagram.h @@ -20,51 +20,70 @@ namespace client namespace datagram { - // seconds interval for cleanup timer - const int DATAGRAM_SESSION_CLEANUP_INTERVAL = 3; - // milliseconds for max session idle time (10 minutes) - const uint64_t DATAGRAM_SESSION_MAX_IDLE = 3600 * 1000; + // seconds interval for cleanup timer + const int DATAGRAM_SESSION_CLEANUP_INTERVAL = 3; + // milliseconds for max session idle time + const uint64_t DATAGRAM_SESSION_MAX_IDLE = 10 * 60 * 1000; + // milliseconds for how long we try sticking to a dead routing path before trying to switch + const uint64_t DATAGRAM_SESSION_PATH_TIMEOUT = 5000; + // milliseconds interval a routing path is used before switching + const uint64_t DATAGRAM_SESSION_PATH_SWITCH_INTERVAL = 60 * 1000; + // milliseconds before lease expire should we try switching leases + const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW = 10 * 1000; + // milliseconds fudge factor for leases handover + const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE = 1000; - - class DatagramSession - { - public: - DatagramSession(i2p::client::ClientDestination * localDestination, - const i2p::data::IdentHash & remoteIdent); + + class DatagramSession + { + public: + DatagramSession(i2p::client::ClientDestination * localDestination, + const i2p::data::IdentHash & remoteIdent); - /** send an i2np message to remote endpoint for this session */ - void SendMsg(std::shared_ptr msg); - /** get the last time in milliseconds for when we used this datagram session */ - uint64_t LastActivity() const { return m_LastUse; } - private: + /** send an i2np message to remote endpoint for this session */ + void SendMsg(std::shared_ptr msg); + /** get the last time in milliseconds for when we used this datagram session */ + uint64_t LastActivity() const { return m_LastUse; } + private: - /** get next usable routing path, try reusing outbound tunnels */ - std::shared_ptr GetNextRoutingPath(); - /** - * mark current routing path as invalid and clear it - * if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time - */ - void ResetRoutingPath(); + /** update our routing path we are using, mark that we have changed paths */ + void UpdateRoutingPath(const std::shared_ptr & path); - /** get next usable lease, does not fetch or update if expired or have no lease set */ - std::shared_ptr GetNextLease(); - - void HandleSend(std::shared_ptr msg); - void HandleGotLeaseSet(std::shared_ptr remoteIdent, - std::shared_ptr msg); - void UpdateLeaseSet(std::shared_ptr msg=nullptr); - - private: - i2p::client::ClientDestination * m_LocalDestination; - i2p::data::IdentHash m_RemoteIdentity; - std::shared_ptr m_RoutingSession; - // Ident hash of IBGW that are invalid - std::vector m_InvalidIBGW; - std::shared_ptr m_RemoteLeaseSet; - uint64_t m_LastUse; - }; - - const size_t MAX_DATAGRAM_SIZE = 32768; + /** return true if we should switch routing paths because of path lifetime or timeout otherwise false */ + bool ShouldUpdateRoutingPath() const; + + /** return true if we should switch the lease for out routing path otherwise return false */ + bool ShouldSwitchLease() const; + + /** get next usable routing path, try reusing outbound tunnels */ + std::shared_ptr GetNextRoutingPath(); + /** + * mark current routing path as invalid and clear it + * if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time + */ + void ResetRoutingPath(); + + /** get next usable lease, does not fetch or update if expired or have no lease set */ + std::shared_ptr GetNextLease(); + + void HandleSend(std::shared_ptr msg); + void HandleGotLeaseSet(std::shared_ptr remoteIdent, + std::shared_ptr msg); + void UpdateLeaseSet(std::shared_ptr msg=nullptr); + + private: + i2p::client::ClientDestination * m_LocalDestination; + i2p::data::IdentHash m_RemoteIdentity; + std::shared_ptr m_RoutingSession; + // Ident hash of IBGW that are invalid + std::vector m_InvalidIBGW; + std::shared_ptr m_RemoteLeaseSet; + uint64_t m_LastUse; + uint64_t m_LastPathChange; + uint64_t m_LastSuccess; + }; + + const size_t MAX_DATAGRAM_SIZE = 32768; class DatagramDestination { typedef std::function Receiver; @@ -82,15 +101,15 @@ namespace datagram void SetReceiver (const Receiver& receiver, uint16_t port) { std::lock_guard lock(m_ReceiversMutex); m_ReceiversByPorts[port] = receiver; }; void ResetReceiver (uint16_t port) { std::lock_guard lock(m_ReceiversMutex); m_ReceiversByPorts.erase (port); }; - + private: - // clean up after next tick - void ScheduleCleanup(); - - // clean up stale sessions and expire tags - void HandleCleanUp(const boost::system::error_code & ecode); - - std::shared_ptr ObtainSession(const i2p::data::IdentHash & ident); + // clean up after next tick + void ScheduleCleanup(); + + // clean up stale sessions and expire tags + void HandleCleanUp(const boost::system::error_code & ecode); + + std::shared_ptr ObtainSession(const i2p::data::IdentHash & ident); std::shared_ptr CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort); diff --git a/LeaseSet.cpp b/LeaseSet.cpp index ab0407cc..04dc77c5 100644 --- a/LeaseSet.cpp +++ b/LeaseSet.cpp @@ -163,9 +163,10 @@ namespace data return ExtractTimestamp (buf, len) > ExtractTimestamp (m_Buffer, m_BufferLen); } - bool LeaseSet::ExpiresSoon(const uint64_t dlt) const + bool LeaseSet::ExpiresSoon(const uint64_t dlt, const uint64_t fudge) const { auto now = i2p::util::GetMillisecondsSinceEpoch (); + if (fudge) now += rand() % fudge; if (now >= m_ExpirationTime) return true; return m_ExpirationTime - now <= dlt; } diff --git a/LeaseSet.h b/LeaseSet.h index 9b6b7844..ed21eccd 100644 --- a/LeaseSet.h +++ b/LeaseSet.h @@ -7,6 +7,7 @@ #include #include #include "Identity.h" +#include "Timestamp.h" namespace i2p { @@ -24,7 +25,13 @@ namespace data IdentHash tunnelGateway; uint32_t tunnelID; uint64_t endDate; // 0 means invalid - bool isUpdated; // trasient + bool isUpdated; // trasient + /* return true if this lease expires within t millisecond + fudge factor */ + bool ExpiresWithin( const uint64_t t, const uint64_t fudge = 1000 ) const { + auto expire = i2p::util::GetMillisecondsSinceEpoch (); + if(fudge) expire += rand() % fudge; + return expire - endDate >= t; + } }; struct LeaseCmp @@ -63,7 +70,7 @@ namespace data bool IsExpired () const; bool IsEmpty () const { return m_Leases.empty (); }; uint64_t GetExpirationTime () const { return m_ExpirationTime; }; - bool ExpiresSoon(const uint64_t dlt=1000 * 5) const ; + bool ExpiresSoon(const uint64_t dlt=1000 * 5, const uint64_t fudge = 0) const ; bool operator== (const LeaseSet& other) const { return m_BufferLen == other.m_BufferLen && !memcmp (m_Buffer, other.m_Buffer, m_BufferLen); }; diff --git a/Streaming.cpp b/Streaming.cpp index 0908ff4a..308ba02b 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -268,12 +268,12 @@ namespace stream } } auto sentPacket = *it; - uint64_t rtt = ts - sentPacket->sendTime; - if(ts < sentPacket->sendTime) - { - LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); - rtt = 1; - } + uint64_t rtt = ts - sentPacket->sendTime; + if(ts < sentPacket->sendTime) + { + LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); + rtt = 1; + } m_RTT = (m_RTT*seqn + rtt)/(seqn + 1); m_RTO = m_RTT*1.5; // TODO: implement it better LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime);