Update Indentation and don't spam route changes in datagram sessions

This commit is contained in:
Jeff Becker 2016-09-03 11:46:47 -04:00
parent c770bcbf96
commit 9acbb2203c
No known key found for this signature in database
GPG key ID: AB950234D6EA286B
7 changed files with 461 additions and 382 deletions

View file

@ -17,8 +17,8 @@ namespace client
ClientContext::ClientContext (): m_SharedLocalDestination (nullptr), ClientContext::ClientContext (): m_SharedLocalDestination (nullptr),
m_HttpProxy (nullptr), m_SocksProxy (nullptr), m_SamBridge (nullptr), m_HttpProxy (nullptr), m_SocksProxy (nullptr), m_SamBridge (nullptr),
m_BOBCommandChannel (nullptr), m_I2CPServer (nullptr), m_BOBCommandChannel (nullptr), m_I2CPServer (nullptr),
m_CleanupUDPTimer(m_Service, boost::posix_time::seconds(1)) m_CleanupUDPTimer(m_Service, boost::posix_time::seconds(1))
{ {
} }
@ -88,14 +88,14 @@ namespace client
} }
} }
if ( m_ServiceThread == nullptr ) { if ( m_ServiceThread == nullptr ) {
m_ServiceThread = new std::thread([&] () { m_ServiceThread = new std::thread([&] () {
LogPrint(eLogInfo, "ClientContext: starting service"); LogPrint(eLogInfo, "ClientContext: starting service");
m_Service.run(); m_Service.run();
LogPrint(eLogError, "ClientContext: service died"); LogPrint(eLogError, "ClientContext: service died");
}); });
ScheduleCleanupUDP(); ScheduleCleanupUDP();
} }
// I2P tunnels // I2P tunnels
@ -211,23 +211,24 @@ namespace client
m_AddressBook.Stop (); m_AddressBook.Stop ();
{ {
std::lock_guard<std::mutex> lock(m_ForwardsMutex); std::lock_guard<std::mutex> lock(m_ForwardsMutex);
m_ServerForwards.clear(); m_ServerForwards.clear();
m_ClientForwards.clear(); m_ClientForwards.clear();
} }
for (auto& it: m_Destinations) for (auto& it: m_Destinations)
it.second->Stop (); it.second->Stop ();
m_Destinations.clear (); m_Destinations.clear ();
m_SharedLocalDestination = nullptr; m_SharedLocalDestination = nullptr;
// stop io service thread // stop io service thread
if(m_ServiceThread) { if(m_ServiceThread)
m_Service.stop(); {
m_ServiceThread->join(); m_Service.stop();
delete m_ServiceThread; m_ServiceThread->join();
m_ServiceThread = nullptr; delete m_ServiceThread;
} m_ServiceThread = nullptr;
}
} }
void ClientContext::ReloadConfig () void ClientContext::ReloadConfig ()
@ -402,34 +403,34 @@ namespace client
localDestination = CreateNewLocalDestination (k, type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT, &options); localDestination = CreateNewLocalDestination (k, type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT, &options);
} }
} }
if (type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { if (type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) {
// udp client // udp client
// TODO: hostnames // TODO: hostnames
boost::asio::ip::udp::endpoint end(boost::asio::ip::address::from_string(address), port); boost::asio::ip::udp::endpoint end(boost::asio::ip::address::from_string(address), port);
if (!localDestination) if (!localDestination)
{ {
localDestination = m_SharedLocalDestination; localDestination = m_SharedLocalDestination;
} }
auto clientTunnel = new I2PUDPClientTunnel(name, dest, end, localDestination, destinationPort); auto clientTunnel = new I2PUDPClientTunnel(name, dest, end, localDestination, destinationPort);
if(m_ClientForwards.insert(std::make_pair(end, std::unique_ptr<I2PUDPClientTunnel>(clientTunnel))).second) if(m_ClientForwards.insert(std::make_pair(end, std::unique_ptr<I2PUDPClientTunnel>(clientTunnel))).second)
{ {
clientTunnel->Start(); clientTunnel->Start();
} }
else else
LogPrint(eLogError, "Clients: I2P Client forward for endpoint ", end, " already exists"); LogPrint(eLogError, "Clients: I2P Client forward for endpoint ", end, " already exists");
} else { } else {
// tcp client // tcp client
auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort);
if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (), if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (),
std::unique_ptr<I2PClientTunnel>(clientTunnel))).second) std::unique_ptr<I2PClientTunnel>(clientTunnel))).second)
{ {
clientTunnel->Start (); clientTunnel->Start ();
numClientTunnels++; numClientTunnels++;
} }
else else
LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists"); LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists");
} }
} }
else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC || type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC || type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER)
{ {
@ -458,28 +459,28 @@ namespace client
localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ()); localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ());
if (!localDestination) if (!localDestination)
localDestination = CreateNewLocalDestination (k, true, &options); localDestination = CreateNewLocalDestination (k, true, &options);
if (type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) if (type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER)
{ {
// udp server tunnel // udp server tunnel
// TODO: hostnames // TODO: hostnames
auto localAddress = boost::asio::ip::address::from_string(address); auto localAddress = boost::asio::ip::address::from_string(address);
boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port); boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port);
I2PUDPServerTunnel * serverTunnel = new I2PUDPServerTunnel(name, localDestination, localAddress, endpoint, port); I2PUDPServerTunnel * serverTunnel = new I2PUDPServerTunnel(name, localDestination, localAddress, endpoint, port);
std::lock_guard<std::mutex> lock(m_ForwardsMutex); std::lock_guard<std::mutex> lock(m_ForwardsMutex);
if(m_ServerForwards.insert( if(m_ServerForwards.insert(
std::make_pair( std::make_pair(
std::make_pair( std::make_pair(
localDestination->GetIdentHash(), port), localDestination->GetIdentHash(), port),
std::unique_ptr<I2PUDPServerTunnel>(serverTunnel))).second) std::unique_ptr<I2PUDPServerTunnel>(serverTunnel))).second)
{ {
serverTunnel->Start(); serverTunnel->Start();
LogPrint(eLogInfo, "Clients: I2P Server Forward created for UDP Endpoint ", host, ":", port, " bound on ", address, " for ",localDestination->GetIdentHash().ToBase32()); LogPrint(eLogInfo, "Clients: I2P Server Forward created for UDP Endpoint ", host, ":", port, " bound on ", address, " for ",localDestination->GetIdentHash().ToBase32());
} }
else else
LogPrint(eLogError, "Clients: I2P Server Forward for destination/port ", m_AddressBook.ToAddress(localDestination->GetIdentHash()), "/", port, "already exists"); LogPrint(eLogError, "Clients: I2P Server Forward for destination/port ", m_AddressBook.ToAddress(localDestination->GetIdentHash()), "/", port, "already exists");
continue; continue;
} }
I2PServerTunnel * serverTunnel; I2PServerTunnel * serverTunnel;
if (type == I2P_TUNNELS_SECTION_TYPE_HTTP) if (type == I2P_TUNNELS_SECTION_TYPE_HTTP)
@ -512,7 +513,7 @@ namespace client
std::make_pair (localDestination->GetIdentHash (), inPort), std::make_pair (localDestination->GetIdentHash (), inPort),
std::unique_ptr<I2PServerTunnel>(serverTunnel))).second) std::unique_ptr<I2PServerTunnel>(serverTunnel))).second)
{ {
serverTunnel->Start (); serverTunnel->Start ();
numServerTunnels++; numServerTunnels++;
} }
else else
@ -532,21 +533,21 @@ namespace client
LogPrint (eLogInfo, "Clients: ", numServerTunnels, " I2P server tunnels created"); LogPrint (eLogInfo, "Clients: ", numServerTunnels, " I2P server tunnels created");
} }
void ClientContext::ScheduleCleanupUDP() void ClientContext::ScheduleCleanupUDP()
{ {
// schedule cleanup in 1 second // schedule cleanup in 1 second
m_CleanupUDPTimer.expires_at(m_CleanupUDPTimer.expires_at() + boost::posix_time::seconds(1)); m_CleanupUDPTimer.expires_at(m_CleanupUDPTimer.expires_at() + boost::posix_time::seconds(1));
m_CleanupUDPTimer.async_wait(std::bind(&ClientContext::CleanupUDP, this, std::placeholders::_1)); m_CleanupUDPTimer.async_wait(std::bind(&ClientContext::CleanupUDP, this, std::placeholders::_1));
} }
void ClientContext::CleanupUDP(const boost::system::error_code & ecode) void ClientContext::CleanupUDP(const boost::system::error_code & ecode)
{ {
if(!ecode) if(!ecode)
{ {
std::lock_guard<std::mutex> lock(m_ForwardsMutex); std::lock_guard<std::mutex> lock(m_ForwardsMutex);
for ( auto & s : m_ServerForwards ) s.second->ExpireStale(); for ( auto & s : m_ServerForwards ) s.second->ExpireStale();
ScheduleCleanupUDP(); ScheduleCleanupUDP();
} }
} }
} }
} }

View file

@ -24,8 +24,8 @@ namespace client
const char I2P_TUNNELS_SECTION_TYPE_SERVER[] = "server"; const char I2P_TUNNELS_SECTION_TYPE_SERVER[] = "server";
const char I2P_TUNNELS_SECTION_TYPE_HTTP[] = "http"; const char I2P_TUNNELS_SECTION_TYPE_HTTP[] = "http";
const char I2P_TUNNELS_SECTION_TYPE_IRC[] = "irc"; const char I2P_TUNNELS_SECTION_TYPE_IRC[] = "irc";
const char I2P_TUNNELS_SECTION_TYPE_UDPCLIENT[] = "udpclient"; const char I2P_TUNNELS_SECTION_TYPE_UDPCLIENT[] = "udpclient";
const char I2P_TUNNELS_SECTION_TYPE_UDPSERVER[] = "udpserver"; const char I2P_TUNNELS_SECTION_TYPE_UDPSERVER[] = "udpserver";
const char I2P_CLIENT_TUNNEL_PORT[] = "port"; const char I2P_CLIENT_TUNNEL_PORT[] = "port";
const char I2P_CLIENT_TUNNEL_ADDRESS[] = "address"; const char I2P_CLIENT_TUNNEL_ADDRESS[] = "address";
const char I2P_CLIENT_TUNNEL_DESTINATION[] = "destination"; const char I2P_CLIENT_TUNNEL_DESTINATION[] = "destination";
@ -75,10 +75,10 @@ namespace client
template<typename Section> template<typename Section>
void ReadI2CPOptions (const Section& section, std::map<std::string, std::string>& options) const; void ReadI2CPOptions (const Section& section, std::map<std::string, std::string>& options) const;
void CleanupUDP(const boost::system::error_code & ecode); void CleanupUDP(const boost::system::error_code & ecode);
void ScheduleCleanupUDP(); void ScheduleCleanupUDP();
private: private:
std::mutex m_DestinationsMutex; std::mutex m_DestinationsMutex;
std::map<i2p::data::IdentHash, std::shared_ptr<ClientDestination> > m_Destinations; std::map<i2p::data::IdentHash, std::shared_ptr<ClientDestination> > m_Destinations;
@ -91,25 +91,26 @@ namespace client
std::map<boost::asio::ip::tcp::endpoint, std::unique_ptr<I2PClientTunnel> > m_ClientTunnels; // local endpoint->tunnel std::map<boost::asio::ip::tcp::endpoint, std::unique_ptr<I2PClientTunnel> > m_ClientTunnels; // local endpoint->tunnel
std::map<std::pair<i2p::data::IdentHash, int>, std::unique_ptr<I2PServerTunnel> > m_ServerTunnels; // <destination,port>->tunnel std::map<std::pair<i2p::data::IdentHash, int>, std::unique_ptr<I2PServerTunnel> > m_ServerTunnels; // <destination,port>->tunnel
std::mutex m_ForwardsMutex; std::mutex m_ForwardsMutex;
std::map<boost::asio::ip::udp::endpoint, std::unique_ptr<I2PUDPClientTunnel> > m_ClientForwards; // local endpoint -> udp tunnel
std::map<boost::asio::ip::udp::endpoint, std::unique_ptr<I2PUDPClientTunnel> > m_ClientForwards; // local endpoint -> udp tunnel std::map<std::pair<i2p::data::IdentHash, int>, std::unique_ptr<I2PUDPServerTunnel> > m_ServerForwards; // <destination,port> -> udp tunnel
std::map<std::pair<i2p::data::IdentHash, int>, std::unique_ptr<I2PUDPServerTunnel> > m_ServerForwards; // <destination,port> -> udp tunnel
SAMBridge * m_SamBridge;
SAMBridge * m_SamBridge;
BOBCommandChannel * m_BOBCommandChannel; BOBCommandChannel * m_BOBCommandChannel;
I2CPServer * m_I2CPServer; I2CPServer * m_I2CPServer;
boost::asio::io_service m_Service; boost::asio::io_service m_Service;
std::thread * m_ServiceThread; std::thread * m_ServiceThread;
boost::asio::deadline_timer m_CleanupUDPTimer; boost::asio::deadline_timer m_CleanupUDPTimer;
public: public:
// for HTTP // for HTTP
const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; }; const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; };
const decltype(m_ClientTunnels)& GetClientTunnels () const { return m_ClientTunnels; }; const decltype(m_ClientTunnels)& GetClientTunnels () const { return m_ClientTunnels; };
const decltype(m_ServerTunnels)& GetServerTunnels () const { return m_ServerTunnels; }; const decltype(m_ServerTunnels)& GetServerTunnels () const { return m_ServerTunnels; };
const decltype(m_ClientForwards)& GetClientForwards () const { return m_ClientForwards; }
const decltype(m_ServerForwards)& GetServerForwards () const { return m_ServerForwards; }
}; };
extern ClientContext context; extern ClientContext context;

View file

@ -13,22 +13,22 @@ namespace datagram
{ {
DatagramDestination::DatagramDestination (std::shared_ptr<i2p::client::ClientDestination> owner): DatagramDestination::DatagramDestination (std::shared_ptr<i2p::client::ClientDestination> owner):
m_Owner (owner.get()), m_Owner (owner.get()),
m_CleanupTimer(owner->GetService()), m_CleanupTimer(owner->GetService()),
m_Receiver (nullptr) m_Receiver (nullptr)
{ {
ScheduleCleanup(); ScheduleCleanup();
} }
DatagramDestination::~DatagramDestination () DatagramDestination::~DatagramDestination ()
{ {
m_CleanupTimer.cancel(); m_CleanupTimer.cancel();
m_Sessions.clear(); m_Sessions.clear();
} }
void DatagramDestination::SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort, uint16_t toPort) void DatagramDestination::SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort, uint16_t toPort)
{ {
auto owner = m_Owner; auto owner = m_Owner;
auto i = owner->GetIdentity(); auto i = owner->GetIdentity();
uint8_t buf[MAX_DATAGRAM_SIZE]; uint8_t buf[MAX_DATAGRAM_SIZE];
auto identityLen = i->ToBuffer (buf, MAX_DATAGRAM_SIZE); auto identityLen = i->ToBuffer (buf, MAX_DATAGRAM_SIZE);
uint8_t * signature = buf + identityLen; uint8_t * signature = buf + identityLen;
@ -41,15 +41,15 @@ namespace datagram
{ {
uint8_t hash[32]; uint8_t hash[32];
SHA256(buf1, len, hash); SHA256(buf1, len, hash);
owner->Sign (hash, 32, signature); owner->Sign (hash, 32, signature);
} }
else else
owner->Sign (buf1, len, signature); owner->Sign (buf1, len, signature);
auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort);
auto session = ObtainSession(ident); auto session = ObtainSession(ident);
session->SendMsg(msg); session->SendMsg(msg);
} }
void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
@ -120,224 +120,274 @@ namespace datagram
return msg; return msg;
} }
void DatagramDestination::ScheduleCleanup() void DatagramDestination::ScheduleCleanup()
{ {
m_CleanupTimer.expires_from_now(boost::posix_time::seconds(DATAGRAM_SESSION_CLEANUP_INTERVAL)); m_CleanupTimer.expires_from_now(boost::posix_time::seconds(DATAGRAM_SESSION_CLEANUP_INTERVAL));
m_CleanupTimer.async_wait(std::bind(&DatagramDestination::HandleCleanUp, this, std::placeholders::_1)); m_CleanupTimer.async_wait(std::bind(&DatagramDestination::HandleCleanUp, this, std::placeholders::_1));
} }
void DatagramDestination::HandleCleanUp(const boost::system::error_code & ecode) void DatagramDestination::HandleCleanUp(const boost::system::error_code & ecode)
{ {
if(ecode) if(ecode)
return; return;
std::lock_guard<std::mutex> lock(m_SessionsMutex); std::lock_guard<std::mutex> lock(m_SessionsMutex);
auto now = i2p::util::GetMillisecondsSinceEpoch(); auto now = i2p::util::GetMillisecondsSinceEpoch();
LogPrint(eLogDebug, "DatagramDestination: clean up sessions"); LogPrint(eLogDebug, "DatagramDestination: clean up sessions");
std::vector<i2p::data::IdentHash> expiredSessions; std::vector<i2p::data::IdentHash> expiredSessions;
// for each session ... // for each session ...
for (auto & e : m_Sessions) { for (auto & e : m_Sessions) {
// check if expired // check if expired
if(now - e.second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE) if(now - e.second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE)
expiredSessions.push_back(e.first); // we are expired expiredSessions.push_back(e.first); // we are expired
} }
// for each expired session ... // for each expired session ...
for (auto & ident : expiredSessions) { for (auto & ident : expiredSessions) {
// remove the expired session // remove the expired session
LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", ident.ToBase32()); LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", ident.ToBase32());
m_Sessions.erase(ident); m_Sessions.erase(ident);
} }
m_Owner->CleanupExpiredTags(); m_Owner->CleanupExpiredTags();
ScheduleCleanup(); ScheduleCleanup();
} }
std::shared_ptr<DatagramSession> DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident) std::shared_ptr<DatagramSession> DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident)
{ {
std::shared_ptr<DatagramSession> session = nullptr; std::shared_ptr<DatagramSession> session = nullptr;
std::lock_guard<std::mutex> lock(m_SessionsMutex); std::lock_guard<std::mutex> lock(m_SessionsMutex);
auto itr = m_Sessions.find(ident); auto itr = m_Sessions.find(ident);
if (itr == m_Sessions.end()) { if (itr == m_Sessions.end()) {
// not found, create new session // not found, create new session
session = std::make_shared<DatagramSession>(m_Owner, ident); session = std::make_shared<DatagramSession>(m_Owner, ident);
m_Sessions[ident] = session; m_Sessions[ident] = session;
} else { } else {
session = itr->second; session = itr->second;
} }
return session; return session;
} }
DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination, DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination,
const i2p::data::IdentHash & remoteIdent) : const i2p::data::IdentHash & remoteIdent) :
m_LocalDestination(localDestination), m_LocalDestination(localDestination),
m_RemoteIdentity(remoteIdent), m_RemoteIdentity(remoteIdent),
m_LastUse(i2p::util::GetMillisecondsSinceEpoch()) m_LastUse(i2p::util::GetMillisecondsSinceEpoch ()),
{ m_LastPathChange(0),
} m_LastSuccess(0)
{
}
void DatagramSession::SendMsg(std::shared_ptr<I2NPMessage> msg) void DatagramSession::SendMsg(std::shared_ptr<I2NPMessage> msg)
{ {
// we used this session // we used this session
m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); m_LastUse = i2p::util::GetMillisecondsSinceEpoch();
// schedule send // schedule send
m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg)); m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg));
} }
void DatagramSession::HandleSend(std::shared_ptr<I2NPMessage> msg) void DatagramSession::HandleSend(std::shared_ptr<I2NPMessage> msg)
{ {
// do we have a routing session? // do we have a routing session?
if(m_RoutingSession) if(m_RoutingSession)
{ {
// do we have a routing path ? // should we switch paths?
auto routingPath = m_RoutingSession->GetSharedRoutingPath(); if(ShouldUpdateRoutingPath ())
if(!routingPath) {
{ LogPrint(eLogDebug, "DatagramSession: try getting new routing path");
LogPrint(eLogDebug, "DatagramSession: try getting new routing path"); // try switching paths
// no routing path, try getting one UpdateRoutingPath (GetNextRoutingPath ());
routingPath = GetNextRoutingPath(); }
if(routingPath) // remember the routing path if we got one auto routingPath = m_RoutingSession->GetSharedRoutingPath ();
m_RoutingSession->SetSharedRoutingPath(routingPath); // make sure we have a routing path
} if (routingPath)
// make sure we have a routing path {
if (routingPath) auto outboundTunnel = routingPath->outboundTunnel;
{ if (outboundTunnel)
auto outboundTunnel = routingPath->outboundTunnel; {
if (outboundTunnel) if(outboundTunnel->IsEstablished())
{ {
if(outboundTunnel->IsEstablished()) m_LastSuccess = i2p::util::GetMillisecondsSinceEpoch ();
{ // we have a routing path and routing session and the outbound tunnel we are using is good
// we have a routing path and routing session and the outbound tunnel we are using is good // wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW
// wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW auto m = m_RoutingSession->WrapSingleMessage(msg);
auto m = m_RoutingSession->WrapSingleMessage(msg); routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{
routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{ i2p::tunnel::eDeliveryTypeTunnel,
i2p::tunnel::eDeliveryTypeTunnel, routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID,
routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, m
m }});
}}); return;
return; }
} }
} }
} }
} auto now = i2p::util::GetMillisecondsSinceEpoch ();
// we couldn't send so let's try resetting the routing path and updating lease set // if this path looks dead reset the routing path since we didn't seem to be able to get a path in time
ResetRoutingPath(); if (now - m_LastPathChange >= DATAGRAM_SESSION_PATH_TIMEOUT ) ResetRoutingPath();
UpdateLeaseSet(msg); UpdateLeaseSet(msg);
} }
std::shared_ptr<i2p::garlic::GarlicRoutingPath> DatagramSession::GetNextRoutingPath() void DatagramSession::UpdateRoutingPath(const std::shared_ptr<i2p::garlic::GarlicRoutingPath> & path)
{ {
std::shared_ptr<i2p::tunnel::OutboundTunnel> outboundTunnel = nullptr; // we can't update routing path because we have no routing session
std::shared_ptr<i2p::garlic::GarlicRoutingPath> routingPath = nullptr; if(!m_RoutingSession) return;
// get existing routing path if we have one // set routing path and update time we last updated the routing path
if(m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (path);
routingPath = m_RoutingSession->GetSharedRoutingPath(); m_LastPathChange = i2p::util::GetMillisecondsSinceEpoch ();
// do we have an existing outbound tunnel and routing path? }
if(routingPath && routingPath->outboundTunnel)
{
// is the outbound tunnel we are using good?
if (routingPath->outboundTunnel->IsEstablished())
{
// ya so let's stick with it
outboundTunnel = routingPath->outboundTunnel;
}
else
outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels
// don't reuse the old path as we are making a new one
routingPath = nullptr;
}
// do we have an outbound tunnel that works already ?
if(!outboundTunnel)
outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started
if(outboundTunnel) bool DatagramSession::ShouldUpdateRoutingPath() const
{ {
// get next available lease auto now = i2p::util::GetMillisecondsSinceEpoch ();
auto lease = GetNextLease(); // we need to rotate paths becuase the routing path is too old
if(lease) if (now - m_LastPathChange >= DATAGRAM_SESSION_PATH_SWITCH_INTERVAL) return true;
{ // our path looks dead so we need to rotate paths
// we have a valid lease to use and an outbound tunnel if (now - m_LastSuccess >= DATAGRAM_SESSION_PATH_TIMEOUT) return true;
// create new routing path // if we have a routing session and routing path we don't need to switch paths
uint32_t now = i2p::util::GetSecondsSinceEpoch(); return m_RoutingSession != nullptr && m_RoutingSession->GetSharedRoutingPath () != nullptr;
routingPath = std::make_shared<i2p::garlic::GarlicRoutingPath>(i2p::garlic::GarlicRoutingPath{ }
outboundTunnel,
lease,
0,
now,
0
});
}
}
return routingPath;
}
void DatagramSession::ResetRoutingPath()
{
if(m_RoutingSession)
{
auto routingPath = m_RoutingSession->GetSharedRoutingPath();
if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path
{
// get outbound tunnel on this path
auto outboundTunnel = routingPath->outboundTunnel;
// is this outbound tunnel there and established
if (outboundTunnel && outboundTunnel->IsEstablished())
m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine
}
// reset the routing path
m_RoutingSession->SetSharedRoutingPath(nullptr);
}
}
std::shared_ptr<const i2p::data::Lease> DatagramSession::GetNextLease() bool DatagramSession::ShouldSwitchLease() const
{ {
std::shared_ptr<const i2p::data::Lease> next = nullptr; auto now = i2p::util::GetMillisecondsSinceEpoch ();
if(m_RemoteLeaseSet) std::shared_ptr<i2p::garlic::GarlicRoutingPath> routingPath = nullptr;
{ std::shared_ptr<const i2p::data::Lease> currentLease = nullptr;
std::vector<i2p::data::IdentHash> exclude; if(m_RoutingSession)
for(const auto & ident : m_InvalidIBGW) routingPath = m_RoutingSession->GetSharedRoutingPath ();
exclude.push_back(ident); if(routingPath)
// find get all leases that are not in our ban list currentLease = routingPath->remoteLease;
auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude] (const i2p::data::Lease & l) -> bool { if(currentLease) // if we have a lease return true if it's about to expire otherwise return false
if(exclude.size()) return now - currentLease->ExpiresWithin( DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE );
{ // we have no current lease, we should switch
auto end = std::end(exclude); return true;
return std::find_if(exclude.begin(), end, [l] ( const i2p::data::IdentHash & ident) -> bool { }
return ident == l.tunnelGateway;
}) != end; std::shared_ptr<i2p::garlic::GarlicRoutingPath> DatagramSession::GetNextRoutingPath()
} {
else std::shared_ptr<i2p::tunnel::OutboundTunnel> outboundTunnel = nullptr;
return false; std::shared_ptr<i2p::garlic::GarlicRoutingPath> routingPath = nullptr;
}); // get existing routing path if we have one
if(leases.size()) if(m_RoutingSession)
{ routingPath = m_RoutingSession->GetSharedRoutingPath();
// pick random valid next lease // do we have an existing outbound tunnel and routing path?
uint32_t idx = rand() % leases.size(); if(routingPath && routingPath->outboundTunnel)
next = leases[idx]; {
} // is the outbound tunnel we are using good?
} if (routingPath->outboundTunnel->IsEstablished())
return next; {
} // ya so let's stick with it
outboundTunnel = routingPath->outboundTunnel;
void DatagramSession::UpdateLeaseSet(std::shared_ptr<I2NPMessage> msg) }
{ else
LogPrint(eLogInfo, "DatagramSession: updating lease set"); outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels
m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg)); }
} // do we have an outbound tunnel that works already ?
if(!outboundTunnel)
outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started
void DatagramSession::HandleGotLeaseSet(std::shared_ptr<const i2p::data::LeaseSet> remoteIdent, std::shared_ptr<I2NPMessage> msg) if(outboundTunnel)
{ {
if(remoteIdent) { std::shared_ptr<const i2p::data::Lease> lease = nullptr;
// update routing session // should we switch leases ?
if(m_RoutingSession) if (ShouldSwitchLease ())
m_RoutingSession = nullptr; {
m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true); // yes, get next available lease
// clear invalid IBGW as we have a new lease set lease = GetNextLease();
m_InvalidIBGW.clear(); }
m_RemoteLeaseSet = remoteIdent; else if (routingPath)
// send the message that was queued if it was provided {
if(msg) // stick with the lease we have if we have one
HandleSend(msg); lease = routingPath->remoteLease;
} }
} if(lease)
{
// we have a valid lease to use and an outbound tunnel
// create new routing path
uint32_t now = i2p::util::GetSecondsSinceEpoch();
routingPath = std::make_shared<i2p::garlic::GarlicRoutingPath>(i2p::garlic::GarlicRoutingPath{
outboundTunnel,
lease,
0,
now,
0
});
}
else // we don't have a new routing path to give
routingPath = nullptr;
}
return routingPath;
}
void DatagramSession::ResetRoutingPath()
{
if(m_RoutingSession)
{
auto routingPath = m_RoutingSession->GetSharedRoutingPath();
if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path
{
// get outbound tunnel on this path
auto outboundTunnel = routingPath->outboundTunnel;
// is this outbound tunnel there and established
if (outboundTunnel && outboundTunnel->IsEstablished())
m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine
}
// reset the routing path
UpdateRoutingPath(nullptr);
}
}
std::shared_ptr<const i2p::data::Lease> DatagramSession::GetNextLease()
{
auto now = i2p::util::GetMillisecondsSinceEpoch ();
std::shared_ptr<const i2p::data::Lease> next = nullptr;
if(m_RemoteLeaseSet)
{
std::vector<i2p::data::IdentHash> exclude;
for(const auto & ident : m_InvalidIBGW)
exclude.push_back(ident);
// find get all leases that are not in our ban list and are not going to expire within our lease set handover window + fudge
auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude, now] (const i2p::data::Lease & l) -> bool {
if(exclude.size())
{
auto end = std::end(exclude);
return std::find_if(exclude.begin(), end, [l, now] ( const i2p::data::IdentHash & ident) -> bool {
return ident == l.tunnelGateway || l.ExpiresWithin (DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE);
}) != end;
}
else
return l.ExpiresWithin (DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE);
});
if(leases.size())
{
// pick random valid next lease
uint32_t idx = rand() % leases.size();
next = leases[idx];
}
}
return next;
}
void DatagramSession::UpdateLeaseSet(std::shared_ptr<I2NPMessage> msg)
{
LogPrint(eLogInfo, "DatagramSession: updating lease set");
m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg));
}
void DatagramSession::HandleGotLeaseSet(std::shared_ptr<const i2p::data::LeaseSet> remoteIdent, std::shared_ptr<I2NPMessage> msg)
{
if(remoteIdent)
{
// update routing session
if(m_RoutingSession)
m_RoutingSession = nullptr;
m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true);
// clear invalid IBGW as we have a new lease set
m_InvalidIBGW.clear();
m_RemoteLeaseSet = remoteIdent;
// send the message that was queued if it was provided
if(msg)
HandleSend(msg);
}
}
} }
} }

View file

@ -20,51 +20,70 @@ namespace client
namespace datagram namespace datagram
{ {
// seconds interval for cleanup timer // seconds interval for cleanup timer
const int DATAGRAM_SESSION_CLEANUP_INTERVAL = 3; const int DATAGRAM_SESSION_CLEANUP_INTERVAL = 3;
// milliseconds for max session idle time (10 minutes) // milliseconds for max session idle time
const uint64_t DATAGRAM_SESSION_MAX_IDLE = 3600 * 1000; const uint64_t DATAGRAM_SESSION_MAX_IDLE = 10 * 60 * 1000;
// milliseconds for how long we try sticking to a dead routing path before trying to switch
const uint64_t DATAGRAM_SESSION_PATH_TIMEOUT = 5000;
// milliseconds interval a routing path is used before switching
const uint64_t DATAGRAM_SESSION_PATH_SWITCH_INTERVAL = 60 * 1000;
// milliseconds before lease expire should we try switching leases
const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW = 10 * 1000;
// milliseconds fudge factor for leases handover
const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE = 1000;
class DatagramSession class DatagramSession
{ {
public: public:
DatagramSession(i2p::client::ClientDestination * localDestination, DatagramSession(i2p::client::ClientDestination * localDestination,
const i2p::data::IdentHash & remoteIdent); const i2p::data::IdentHash & remoteIdent);
/** send an i2np message to remote endpoint for this session */ /** send an i2np message to remote endpoint for this session */
void SendMsg(std::shared_ptr<I2NPMessage> msg); void SendMsg(std::shared_ptr<I2NPMessage> msg);
/** get the last time in milliseconds for when we used this datagram session */ /** get the last time in milliseconds for when we used this datagram session */
uint64_t LastActivity() const { return m_LastUse; } uint64_t LastActivity() const { return m_LastUse; }
private: private:
/** get next usable routing path, try reusing outbound tunnels */ /** update our routing path we are using, mark that we have changed paths */
std::shared_ptr<i2p::garlic::GarlicRoutingPath> GetNextRoutingPath(); void UpdateRoutingPath(const std::shared_ptr<i2p::garlic::GarlicRoutingPath> & path);
/**
* mark current routing path as invalid and clear it
* if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time
*/
void ResetRoutingPath();
/** get next usable lease, does not fetch or update if expired or have no lease set */ /** return true if we should switch routing paths because of path lifetime or timeout otherwise false */
std::shared_ptr<const i2p::data::Lease> GetNextLease(); bool ShouldUpdateRoutingPath() const;
void HandleSend(std::shared_ptr<I2NPMessage> msg); /** return true if we should switch the lease for out routing path otherwise return false */
void HandleGotLeaseSet(std::shared_ptr<const i2p::data::LeaseSet> remoteIdent, bool ShouldSwitchLease() const;
std::shared_ptr<I2NPMessage> msg);
void UpdateLeaseSet(std::shared_ptr<I2NPMessage> msg=nullptr); /** get next usable routing path, try reusing outbound tunnels */
std::shared_ptr<i2p::garlic::GarlicRoutingPath> GetNextRoutingPath();
private: /**
i2p::client::ClientDestination * m_LocalDestination; * mark current routing path as invalid and clear it
i2p::data::IdentHash m_RemoteIdentity; * if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time
std::shared_ptr<i2p::garlic::GarlicRoutingSession> m_RoutingSession; */
// Ident hash of IBGW that are invalid void ResetRoutingPath();
std::vector<i2p::data::IdentHash> m_InvalidIBGW;
std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet; /** get next usable lease, does not fetch or update if expired or have no lease set */
uint64_t m_LastUse; std::shared_ptr<const i2p::data::Lease> GetNextLease();
};
void HandleSend(std::shared_ptr<I2NPMessage> msg);
const size_t MAX_DATAGRAM_SIZE = 32768; void HandleGotLeaseSet(std::shared_ptr<const i2p::data::LeaseSet> remoteIdent,
std::shared_ptr<I2NPMessage> msg);
void UpdateLeaseSet(std::shared_ptr<I2NPMessage> msg=nullptr);
private:
i2p::client::ClientDestination * m_LocalDestination;
i2p::data::IdentHash m_RemoteIdentity;
std::shared_ptr<i2p::garlic::GarlicRoutingSession> m_RoutingSession;
// Ident hash of IBGW that are invalid
std::vector<i2p::data::IdentHash> m_InvalidIBGW;
std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet;
uint64_t m_LastUse;
uint64_t m_LastPathChange;
uint64_t m_LastSuccess;
};
const size_t MAX_DATAGRAM_SIZE = 32768;
class DatagramDestination class DatagramDestination
{ {
typedef std::function<void (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)> Receiver; typedef std::function<void (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)> Receiver;
@ -82,15 +101,15 @@ namespace datagram
void SetReceiver (const Receiver& receiver, uint16_t port) { std::lock_guard<std::mutex> lock(m_ReceiversMutex); m_ReceiversByPorts[port] = receiver; }; void SetReceiver (const Receiver& receiver, uint16_t port) { std::lock_guard<std::mutex> lock(m_ReceiversMutex); m_ReceiversByPorts[port] = receiver; };
void ResetReceiver (uint16_t port) { std::lock_guard<std::mutex> lock(m_ReceiversMutex); m_ReceiversByPorts.erase (port); }; void ResetReceiver (uint16_t port) { std::lock_guard<std::mutex> lock(m_ReceiversMutex); m_ReceiversByPorts.erase (port); };
private: private:
// clean up after next tick // clean up after next tick
void ScheduleCleanup(); void ScheduleCleanup();
// clean up stale sessions and expire tags // clean up stale sessions and expire tags
void HandleCleanUp(const boost::system::error_code & ecode); void HandleCleanUp(const boost::system::error_code & ecode);
std::shared_ptr<DatagramSession> ObtainSession(const i2p::data::IdentHash & ident); std::shared_ptr<DatagramSession> ObtainSession(const i2p::data::IdentHash & ident);
std::shared_ptr<I2NPMessage> CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort); std::shared_ptr<I2NPMessage> CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort);

View file

@ -163,9 +163,10 @@ namespace data
return ExtractTimestamp (buf, len) > ExtractTimestamp (m_Buffer, m_BufferLen); return ExtractTimestamp (buf, len) > ExtractTimestamp (m_Buffer, m_BufferLen);
} }
bool LeaseSet::ExpiresSoon(const uint64_t dlt) const bool LeaseSet::ExpiresSoon(const uint64_t dlt, const uint64_t fudge) const
{ {
auto now = i2p::util::GetMillisecondsSinceEpoch (); auto now = i2p::util::GetMillisecondsSinceEpoch ();
if (fudge) now += rand() % fudge;
if (now >= m_ExpirationTime) return true; if (now >= m_ExpirationTime) return true;
return m_ExpirationTime - now <= dlt; return m_ExpirationTime - now <= dlt;
} }

View file

@ -7,6 +7,7 @@
#include <set> #include <set>
#include <memory> #include <memory>
#include "Identity.h" #include "Identity.h"
#include "Timestamp.h"
namespace i2p namespace i2p
{ {
@ -24,7 +25,13 @@ namespace data
IdentHash tunnelGateway; IdentHash tunnelGateway;
uint32_t tunnelID; uint32_t tunnelID;
uint64_t endDate; // 0 means invalid uint64_t endDate; // 0 means invalid
bool isUpdated; // trasient bool isUpdated; // trasient
/* return true if this lease expires within t millisecond + fudge factor */
bool ExpiresWithin( const uint64_t t, const uint64_t fudge = 1000 ) const {
auto expire = i2p::util::GetMillisecondsSinceEpoch ();
if(fudge) expire += rand() % fudge;
return expire - endDate >= t;
}
}; };
struct LeaseCmp struct LeaseCmp
@ -63,7 +70,7 @@ namespace data
bool IsExpired () const; bool IsExpired () const;
bool IsEmpty () const { return m_Leases.empty (); }; bool IsEmpty () const { return m_Leases.empty (); };
uint64_t GetExpirationTime () const { return m_ExpirationTime; }; uint64_t GetExpirationTime () const { return m_ExpirationTime; };
bool ExpiresSoon(const uint64_t dlt=1000 * 5) const ; bool ExpiresSoon(const uint64_t dlt=1000 * 5, const uint64_t fudge = 0) const ;
bool operator== (const LeaseSet& other) const bool operator== (const LeaseSet& other) const
{ return m_BufferLen == other.m_BufferLen && !memcmp (m_Buffer, other.m_Buffer, m_BufferLen); }; { return m_BufferLen == other.m_BufferLen && !memcmp (m_Buffer, other.m_Buffer, m_BufferLen); };

View file

@ -268,12 +268,12 @@ namespace stream
} }
} }
auto sentPacket = *it; auto sentPacket = *it;
uint64_t rtt = ts - sentPacket->sendTime; uint64_t rtt = ts - sentPacket->sendTime;
if(ts < sentPacket->sendTime) if(ts < sentPacket->sendTime)
{ {
LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime);
rtt = 1; rtt = 1;
} }
m_RTT = (m_RTT*seqn + rtt)/(seqn + 1); m_RTT = (m_RTT*seqn + rtt)/(seqn + 1);
m_RTO = m_RTT*1.5; // TODO: implement it better m_RTO = m_RTT*1.5; // TODO: implement it better
LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime);