try adding garlic and session tags to datagram destination

This commit is contained in:
Jeff Becker 2016-08-27 13:17:34 -04:00
parent abaf36a2de
commit 0b21fce94e
No known key found for this signature in database
GPG key ID: AB950234D6EA286B
7 changed files with 300 additions and 51 deletions

View file

@ -12,12 +12,17 @@ namespace i2p
namespace datagram
{
DatagramDestination::DatagramDestination (std::shared_ptr<i2p::client::ClientDestination> owner):
m_Owner (owner.get()), m_Receiver (nullptr)
m_Owner (owner.get()),
m_CleanupTimer(owner->GetService()),
m_Receiver (nullptr)
{
ScheduleCleanup();
}
DatagramDestination::~DatagramDestination ()
{
m_CleanupTimer.cancel();
m_Sessions.clear();
}
void DatagramDestination::SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort, uint16_t toPort)
@ -41,45 +46,11 @@ namespace datagram
else
owner->Sign (buf1, len, signature);
auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort);
auto remote = owner->FindLeaseSet (ident);
if (remote)
owner->GetService ().post (std::bind (&DatagramDestination::SendMsg, this, msg, remote));
else
owner->RequestDestination (ident, std::bind (&DatagramDestination::HandleLeaseSetRequestComplete, this, std::placeholders::_1, msg));
}
auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort);
auto session = ObtainSession(ident);
session->SendMsg(msg);
}
void DatagramDestination::HandleLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> remote, std::shared_ptr<I2NPMessage> msg)
{
if (remote)
SendMsg (msg, remote);
}
void DatagramDestination::SendMsg (std::shared_ptr<I2NPMessage> msg, std::shared_ptr<const i2p::data::LeaseSet> remote)
{
auto outboundTunnel = m_Owner->GetTunnelPool ()->GetNextOutboundTunnel ();
auto leases = remote->GetNonExpiredLeases ();
if (!leases.empty () && outboundTunnel)
{
std::vector<i2p::tunnel::TunnelMessageBlock> msgs;
uint32_t i = rand () % leases.size ();
auto garlic = m_Owner->WrapMessage (remote, msg, true);
msgs.push_back (i2p::tunnel::TunnelMessageBlock
{
i2p::tunnel::eDeliveryTypeTunnel,
leases[i]->tunnelGateway, leases[i]->tunnelID,
garlic
});
outboundTunnel->SendTunnelDataMsg (msgs);
}
else
{
if (outboundTunnel)
LogPrint (eLogWarning, "Failed to send datagram. All leases expired");
else
LogPrint (eLogWarning, "Failed to send datagram. No outbound tunnels");
}
}
void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
{
@ -139,7 +110,223 @@ namespace datagram
else
msg = nullptr;
return msg;
}
}
void DatagramDestination::ScheduleCleanup()
{
m_CleanupTimer.expires_from_now(boost::posix_time::seconds(DATAGRAM_SESSION_CLEANUP_INTERVAL));
m_CleanupTimer.async_wait(std::bind(&DatagramDestination::HandleCleanUp, this, std::placeholders::_1));
}
void DatagramDestination::HandleCleanUp(const boost::system::error_code & ecode)
{
if(ecode)
return;
std::lock_guard<std::mutex> lock(m_SessionsMutex);
auto now = i2p::util::GetMillisecondsSinceEpoch();
LogPrint(eLogDebug, "DatagramDestination: clean up sessions");
std::vector<i2p::data::IdentHash> expiredSessions;
// for each session ...
for (auto & e : m_Sessions) {
// check if expired
if(now - e.second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE)
expiredSessions.push_back(e.first); // we are expired
}
// for each expired session ...
for (auto & ident : expiredSessions) {
// remove the expired session
LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", ident.ToBase32());
m_Sessions.erase(ident);
}
}
std::shared_ptr<DatagramSession> DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident)
{
std::shared_ptr<DatagramSession> session = nullptr;
std::lock_guard<std::mutex> lock(m_SessionsMutex);
auto itr = m_Sessions.find(ident);
if (itr == m_Sessions.end()) {
// not found, create new session
session = std::make_shared<DatagramSession>(m_Owner, ident);
m_Sessions[ident] = session;
} else {
session = itr->second;
}
return session;
}
DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination,
const i2p::data::IdentHash & remoteIdent) :
m_LocalDestination(localDestination),
m_RemoteIdentity(remoteIdent),
m_LastUse(i2p::util::GetMillisecondsSinceEpoch())
{
}
void DatagramSession::SendMsg(std::shared_ptr<I2NPMessage> msg)
{
// we used this session
m_LastUse = i2p::util::GetMillisecondsSinceEpoch();
// schedule send
m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg));
}
void DatagramSession::HandleSend(std::shared_ptr<I2NPMessage> msg)
{
// do we have a routing session?
if(m_RoutingSession)
{
// do we have a routing path ?
auto routingPath = m_RoutingSession->GetSharedRoutingPath();
if(!routingPath)
{
// no routing path, try getting one
routingPath = GetNextRoutingPath();
if(routingPath) // remember the routing path if we got one
m_RoutingSession->SetSharedRoutingPath(routingPath);
}
// make sure we have a routing path
if (routingPath)
{
auto outboundTunnel = routingPath->outboundTunnel;
if (outboundTunnel)
{
if(outboundTunnel->IsEstablished())
{
// we have a routing path and routing session and the outbound tunnel we are using is good
// wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW
auto m = m_RoutingSession->WrapSingleMessage(msg);
routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{
i2p::tunnel::eDeliveryTypeTunnel,
routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID,
m
}});
return;
}
}
}
}
// we couldn't send so let's try resetting the routing path and updating lease set
ResetRoutingPath();
UpdateLeaseSet(msg);
}
std::shared_ptr<i2p::garlic::GarlicRoutingPath> DatagramSession::GetNextRoutingPath()
{
std::shared_ptr<i2p::tunnel::OutboundTunnel> outboundTunnel = nullptr;
std::shared_ptr<i2p::garlic::GarlicRoutingPath> routingPath = nullptr;
// get existing routing path if we have one
if(m_RoutingSession)
routingPath = m_RoutingSession->GetSharedRoutingPath();
// do we have an existing outbound tunnel and routing path?
if(routingPath && routingPath->outboundTunnel)
{
// is the outbound tunnel we are using good?
if (routingPath->outboundTunnel->IsEstablished())
{
// ya so let's stick with it
outboundTunnel = routingPath->outboundTunnel;
}
else
outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels
// don't reuse the old path as we are making a new one
routingPath = nullptr;
}
// do we have an outbound tunnel that works already ?
if(!outboundTunnel)
outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started
if(outboundTunnel)
{
// get next available lease
auto lease = GetNextLease();
if(lease)
{
// we have a valid lease to use and an outbound tunnel
// create new routing path
uint32_t now = i2p::util::GetSecondsSinceEpoch();
routingPath = std::make_shared<i2p::garlic::GarlicRoutingPath>(i2p::garlic::GarlicRoutingPath{
outboundTunnel,
lease,
0,
now,
0
});
}
}
return routingPath;
}
void DatagramSession::ResetRoutingPath()
{
if(m_RoutingSession)
{
auto routingPath = m_RoutingSession->GetSharedRoutingPath();
if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path
{
// get outbound tunnel on this path
auto outboundTunnel = routingPath->outboundTunnel;
// is this outbound tunnel there and established
if (outboundTunnel && outboundTunnel->IsEstablished())
m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine
}
// reset the routing path
m_RoutingSession->SetSharedRoutingPath(nullptr);
}
}
std::shared_ptr<const i2p::data::Lease> DatagramSession::GetNextLease()
{
std::shared_ptr<const i2p::data::Lease> next = nullptr;
if(m_RemoteLeaseSet)
{
std::vector<i2p::data::IdentHash> exclude;
for(const auto & ident : m_InvalidIBGW)
exclude.push_back(ident);
// find get all leases that are not in our ban list
auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude] (const i2p::data::Lease & l) -> bool {
if(exclude.size())
{
auto end = std::end(exclude);
return std::find_if(exclude.begin(), end, [l] ( const i2p::data::IdentHash & ident) -> bool {
return ident == l.tunnelGateway;
}) != end;
}
else
return false;
});
if(leases.size())
{
// pick random valid next lease
uint32_t idx = rand() % leases.size();
next = leases[idx];
}
}
return next;
}
void DatagramSession::UpdateLeaseSet(std::shared_ptr<I2NPMessage> msg)
{
LogPrint(eLogInfo, "DatagramSession: updating lease set");
m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg));
}
void DatagramSession::HandleGotLeaseSet(std::shared_ptr<const i2p::data::LeaseSet> remoteIdent, std::shared_ptr<I2NPMessage> msg)
{
if(remoteIdent) {
// update routing session
if(m_RoutingSession)
m_RoutingSession = nullptr;
m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true);
// clear invalid IBGW as we have a new lease set
m_InvalidIBGW.clear();
m_RemoteLeaseSet = remoteIdent;
// send the message that was queued if it was provided
if(msg)
HandleSend(msg);
}
}
}
}

View file

@ -9,6 +9,7 @@
#include "Identity.h"
#include "LeaseSet.h"
#include "I2NPProtocol.h"
#include "Garlic.h"
namespace i2p
{
@ -18,7 +19,52 @@ namespace client
}
namespace datagram
{
const size_t MAX_DATAGRAM_SIZE = 32768;
// seconds interval for cleanup timer
const int DATAGRAM_SESSION_CLEANUP_INTERVAL = 30;
// milliseconds for max session idle time (10 minutes)
const uint64_t DATAGRAM_SESSION_MAX_IDLE = 3600 * 1000;
class DatagramSession
{
public:
DatagramSession(i2p::client::ClientDestination * localDestination,
const i2p::data::IdentHash & remoteIdent);
/** send an i2np message to remote endpoint for this session */
void SendMsg(std::shared_ptr<I2NPMessage> msg);
/** get the last time in milliseconds for when we used this datagram session */
uint64_t LastActivity() const { return m_LastUse; }
private:
/** get next usable routing path, try reusing outbound tunnels */
std::shared_ptr<i2p::garlic::GarlicRoutingPath> GetNextRoutingPath();
/**
* mark current routing path as invalid and clear it
* if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time
*/
void ResetRoutingPath();
/** get next usable lease, does not fetch or update if expired or have no lease set */
std::shared_ptr<const i2p::data::Lease> GetNextLease();
void HandleSend(std::shared_ptr<I2NPMessage> msg);
void HandleGotLeaseSet(std::shared_ptr<const i2p::data::LeaseSet> remoteIdent,
std::shared_ptr<I2NPMessage> msg);
void UpdateLeaseSet(std::shared_ptr<I2NPMessage> msg=nullptr);
private:
i2p::client::ClientDestination * m_LocalDestination;
i2p::data::IdentHash m_RemoteIdentity;
std::shared_ptr<i2p::garlic::GarlicRoutingSession> m_RoutingSession;
// Ident hash of IBGW that are invalid
std::vector<i2p::data::IdentHash> m_InvalidIBGW;
std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet;
uint64_t m_LastUse;
};
const size_t MAX_DATAGRAM_SIZE = 32768;
class DatagramDestination
{
typedef std::function<void (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)> Receiver;
@ -36,19 +82,26 @@ namespace datagram
void SetReceiver (const Receiver& receiver, uint16_t port) { m_ReceiversByPorts[port] = receiver; };
void ResetReceiver (uint16_t port) { m_ReceiversByPorts.erase (port); };
private:
void HandleLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> leaseSet, std::shared_ptr<I2NPMessage> msg);
// clean up after next tick
void ScheduleCleanup();
// clean up stale sessions and expire tags
void HandleCleanUp(const boost::system::error_code & ecode);
std::shared_ptr<DatagramSession> ObtainSession(const i2p::data::IdentHash & ident);
std::shared_ptr<I2NPMessage> CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort);
void SendMsg (std::shared_ptr<I2NPMessage> msg, std::shared_ptr<const i2p::data::LeaseSet> remote);
void HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len);
private:
i2p::client::ClientDestination * m_Owner;
boost::asio::deadline_timer m_CleanupTimer;
Receiver m_Receiver; // default
std::mutex m_SessionsMutex;
std::map<i2p::data::IdentHash, std::shared_ptr<DatagramSession> > m_Sessions;
std::map<uint16_t, Receiver> m_ReceiversByPorts;
i2p::data::GzipInflator m_Inflator;

View file

@ -701,6 +701,8 @@ namespace client
m_StreamingDestination = nullptr;
for (auto& it: m_StreamingDestinationsByPorts)
it.second->Stop ();
if(m_DatagramDestination)
delete m_DatagramDestination;
m_DatagramDestination = nullptr;
return true;
}

View file

@ -107,7 +107,7 @@ namespace garlic
std::shared_ptr<GarlicRoutingPath> GetSharedRoutingPath ();
void SetSharedRoutingPath (std::shared_ptr<GarlicRoutingPath> path);
private:
size_t CreateAESBlock (uint8_t * buf, std::shared_ptr<const I2NPMessage> msg);

View file

@ -169,8 +169,13 @@ namespace data
if (now >= m_ExpirationTime) return true;
return m_ExpirationTime - now <= dlt;
}
const std::vector<std::shared_ptr<const Lease> > LeaseSet::GetNonExpiredLeases (bool withThreshold) const
const std::vector<std::shared_ptr<const Lease> > LeaseSet::GetNonExpiredLeases (bool withThreshold) const
{
return GetNonExpiredLeasesExcluding( [] (const Lease & l) -> bool { return false; }, withThreshold);
}
const std::vector<std::shared_ptr<const Lease> > LeaseSet::GetNonExpiredLeasesExcluding (LeaseInspectFunc exclude, bool withThreshold) const
{
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
std::vector<std::shared_ptr<const Lease> > leases;
@ -181,7 +186,7 @@ namespace data
endDate += LEASE_ENDDATE_THRESHOLD;
else
endDate -= LEASE_ENDDATE_THRESHOLD;
if (ts < endDate)
if (ts < endDate && !exclude(*it))
leases.push_back (it);
}
return leases;

View file

@ -38,6 +38,8 @@ namespace data
};
};
typedef std::function<bool(const Lease & l)> LeaseInspectFunc;
const size_t MAX_LS_BUFFER_SIZE = 3072;
const size_t LEASE_SIZE = 44; // 32 + 4 + 8
const uint8_t MAX_NUM_LEASES = 16;
@ -56,6 +58,7 @@ namespace data
size_t GetBufferLen () const { return m_BufferLen; };
bool IsValid () const { return m_IsValid; };
const std::vector<std::shared_ptr<const Lease> > GetNonExpiredLeases (bool withThreshold = true) const;
const std::vector<std::shared_ptr<const Lease> > GetNonExpiredLeasesExcluding (LeaseInspectFunc exclude, bool withThreshold = true) const;
bool HasExpiredLeases () const;
bool IsExpired () const;
bool IsEmpty () const { return m_Leases.empty (); };

View file

@ -45,7 +45,6 @@ namespace tunnel
std::shared_ptr<OutboundTunnel> GetNextOutboundTunnel (std::shared_ptr<OutboundTunnel> excluded = nullptr) const;
std::shared_ptr<InboundTunnel> GetNextInboundTunnel (std::shared_ptr<InboundTunnel> excluded = nullptr) const;
std::shared_ptr<OutboundTunnel> GetNewOutboundTunnel (std::shared_ptr<OutboundTunnel> old) const;
void TestTunnels ();
void ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg);
void ProcessDeliveryStatus (std::shared_ptr<I2NPMessage> msg);