i2pd/libi2pd/Datagram.cpp

392 lines
13 KiB
C++
Raw Normal View History

2014-10-23 22:56:50 +02:00
#include <string.h>
#include <vector>
2016-05-11 21:12:38 +02:00
#include "Crypto.h"
2014-10-22 21:30:25 +02:00
#include "Log.h"
2014-10-23 22:56:50 +02:00
#include "TunnelBase.h"
#include "RouterContext.h"
#include "Destination.h"
2014-10-22 21:30:25 +02:00
#include "Datagram.h"
namespace i2p
{
namespace datagram
{
2018-01-06 04:48:51 +01:00
DatagramDestination::DatagramDestination (std::shared_ptr<i2p::client::ClientDestination> owner):
2019-07-10 03:33:55 +02:00
m_Owner (owner), m_Receiver (nullptr), m_RawReceiver (nullptr)
2014-10-23 22:56:50 +02:00
{
}
2018-01-06 04:48:51 +01:00
2015-11-03 15:15:49 +01:00
DatagramDestination::~DatagramDestination ()
{
m_Sessions.clear();
2015-11-03 15:15:49 +01:00
}
2016-12-12 19:40:24 +01:00
void DatagramDestination::SendDatagramTo(const uint8_t * payload, size_t len, const i2p::data::IdentHash & identity, uint16_t fromPort, uint16_t toPort)
2018-01-06 04:48:51 +01:00
{
auto owner = m_Owner;
2020-05-17 22:49:31 +02:00
auto localIdentity = m_Owner->GetIdentity ();
auto identityLen = localIdentity->GetFullLen ();
2019-07-10 03:33:55 +02:00
auto signatureLen = localIdentity->GetSignatureLen ();
size_t headerLen = identityLen + signatureLen;
2020-05-17 22:49:31 +02:00
std::vector<uint8_t> header(headerLen);
localIdentity->ToBuffer (header.data (), identityLen);
uint8_t * signature = header.data () + identityLen;
2019-07-10 03:33:55 +02:00
if (localIdentity->GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1)
{
2018-01-06 04:48:51 +01:00
uint8_t hash[32];
2020-05-17 22:49:31 +02:00
SHA256(payload, len, hash);
owner->Sign (hash, 32, signature);
}
else
2020-05-17 22:49:31 +02:00
owner->Sign (payload, len, signature);
2015-03-27 02:23:59 +01:00
2017-01-13 17:45:52 +01:00
auto session = ObtainSession(identity);
2020-05-17 22:49:31 +02:00
auto msg = CreateDataMessage ({{header.data (), headerLen}, {payload, len}}, fromPort, toPort, false, !session->IsRatchets ()); // datagram
2017-01-13 17:45:52 +01:00
session->SendMsg(msg);
}
2019-07-10 03:33:55 +02:00
void DatagramDestination::SendRawDatagramTo(const uint8_t * payload, size_t len, const i2p::data::IdentHash & identity, uint16_t fromPort, uint16_t toPort)
{
auto session = ObtainSession(identity);
2020-05-17 22:49:31 +02:00
auto msg = CreateDataMessage ({{payload, len}}, fromPort, toPort, true, !session->IsRatchets ()); // raw
2019-07-10 03:33:55 +02:00
session->SendMsg(msg);
}
2016-10-09 16:55:55 +02:00
void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort,uint8_t * const &buf, size_t len)
2014-10-27 21:30:56 +01:00
{
i2p::data::IdentityEx identity;
size_t identityLen = identity.FromBuffer (buf, len);
const uint8_t * signature = buf + identityLen;
size_t headerLen = identityLen + identity.GetSignatureLen ();
bool verified = false;
if (identity.GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1)
2015-03-27 03:17:26 +01:00
{
uint8_t hash[32];
2015-11-03 15:15:49 +01:00
SHA256(buf + headerLen, len - headerLen, hash);
2015-03-27 03:17:26 +01:00
verified = identity.Verify (hash, 32, signature);
2018-01-06 04:48:51 +01:00
}
else
2014-10-27 21:30:56 +01:00
verified = identity.Verify (buf + headerLen, len - headerLen, signature);
2018-01-06 04:48:51 +01:00
2014-10-27 21:30:56 +01:00
if (verified)
{
2016-12-12 19:40:24 +01:00
auto h = identity.GetIdentHash();
2017-01-13 17:45:52 +01:00
auto session = ObtainSession(h);
session->Ack();
auto r = FindReceiver(toPort);
if(r)
r(identity, fromPort, toPort, buf + headerLen, len -headerLen);
2014-10-31 21:44:44 +01:00
else
LogPrint (eLogWarning, "DatagramDestination: no receiver for port ", toPort);
2014-10-27 21:30:56 +01:00
}
else
2018-01-06 04:48:51 +01:00
LogPrint (eLogWarning, "Datagram signature verification failed");
2014-10-27 21:30:56 +01:00
}
2019-07-10 03:33:55 +02:00
void DatagramDestination::HandleRawDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
{
if (m_RawReceiver)
m_RawReceiver (fromPort, toPort, buf, len);
else
LogPrint (eLogWarning, "DatagramDestination: no receiver for raw datagram");
}
DatagramDestination::Receiver DatagramDestination::FindReceiver(uint16_t port)
{
std::lock_guard<std::mutex> lock(m_ReceiversMutex);
Receiver r = m_Receiver;
auto itr = m_ReceiversByPorts.find(port);
if (itr != m_ReceiversByPorts.end())
r = itr->second;
return r;
}
2019-07-10 03:33:55 +02:00
void DatagramDestination::HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len, bool isRaw)
2014-10-22 21:30:25 +02:00
{
// unzip it
uint8_t uncompressed[MAX_DATAGRAM_SIZE];
2015-11-03 15:15:49 +01:00
size_t uncompressedLen = m_Inflator.Inflate (buf, len, uncompressed, MAX_DATAGRAM_SIZE);
if (uncompressedLen)
2019-07-10 03:33:55 +02:00
{
if (isRaw)
HandleRawDatagram (fromPort, toPort, uncompressed, uncompressedLen);
else
HandleDatagram (fromPort, toPort, uncompressed, uncompressedLen);
}
2017-01-17 18:13:56 +01:00
else
LogPrint (eLogWarning, "Datagram: decompression failed");
2014-10-22 21:30:25 +02:00
}
2019-07-10 03:33:55 +02:00
2020-05-17 22:49:31 +02:00
std::shared_ptr<I2NPMessage> DatagramDestination::CreateDataMessage (
const std::vector<std::pair<const uint8_t *, size_t> >& payloads,
uint16_t fromPort, uint16_t toPort, bool isRaw, bool checksum)
2014-10-23 22:56:50 +02:00
{
2015-11-24 19:09:12 +01:00
auto msg = NewI2NPMessage ();
2014-10-23 22:56:50 +02:00
uint8_t * buf = msg->GetPayload ();
2015-11-03 15:15:49 +01:00
buf += 4; // reserve for length
2020-05-17 22:49:31 +02:00
size_t size = m_Deflator.Deflate (payloads, buf, msg->maxLen - msg->len);
2015-11-03 15:15:49 +01:00
if (size)
{
htobe32buf (msg->GetPayload (), size); // length
htobe16buf (buf + 4, fromPort); // source port
2018-01-06 04:48:51 +01:00
htobe16buf (buf + 6, toPort); // destination port
buf[9] = isRaw ? i2p::client::PROTOCOL_TYPE_RAW : i2p::client::PROTOCOL_TYPE_DATAGRAM; // raw or datagram protocol
2018-01-06 04:48:51 +01:00
msg->len += size + 4;
2020-05-17 22:49:31 +02:00
msg->FillI2NPMessageHeader (eI2NPData, 0, checksum);
2018-01-06 04:48:51 +01:00
}
2015-11-03 15:15:49 +01:00
else
msg = nullptr;
2014-10-23 22:56:50 +02:00
return msg;
}
void DatagramDestination::CleanUp ()
2018-01-06 04:48:51 +01:00
{
2016-09-08 16:56:22 +02:00
if (m_Sessions.empty ()) return;
auto now = i2p::util::GetMillisecondsSinceEpoch();
LogPrint(eLogDebug, "DatagramDestination: clean up sessions");
2016-10-09 16:55:55 +02:00
std::unique_lock<std::mutex> lock(m_SessionsMutex);
// for each session ...
2018-01-06 04:48:51 +01:00
for (auto it = m_Sessions.begin (); it != m_Sessions.end (); )
{
// check if expired
if (now - it->second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE)
{
LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", it->first.ToBase32());
2017-01-17 18:13:56 +01:00
it->second->Stop ();
it = m_Sessions.erase (it); // we are expired
}
else
it++;
}
}
2018-01-06 04:48:51 +01:00
2016-12-12 19:40:24 +01:00
std::shared_ptr<DatagramSession> DatagramDestination::ObtainSession(const i2p::data::IdentHash & identity)
{
std::shared_ptr<DatagramSession> session = nullptr;
std::lock_guard<std::mutex> lock(m_SessionsMutex);
2016-12-12 19:40:24 +01:00
auto itr = m_Sessions.find(identity);
if (itr == m_Sessions.end()) {
// not found, create new session
2016-12-12 19:40:24 +01:00
session = std::make_shared<DatagramSession>(m_Owner, identity);
2017-01-17 18:13:56 +01:00
session->Start ();
2016-12-12 19:40:24 +01:00
m_Sessions[identity] = session;
} else {
session = itr->second;
}
return session;
}
2016-09-03 19:58:34 +02:00
std::shared_ptr<DatagramSession::Info> DatagramDestination::GetInfoForRemote(const i2p::data::IdentHash & remote)
{
std::lock_guard<std::mutex> lock(m_SessionsMutex);
for ( auto & item : m_Sessions)
{
if(item.first == remote) return std::make_shared<DatagramSession::Info>(item.second->GetSessionInfo());
}
return nullptr;
}
2018-01-06 04:48:51 +01:00
2019-07-10 03:33:55 +02:00
DatagramSession::DatagramSession(std::shared_ptr<i2p::client::ClientDestination> localDestination,
2016-12-12 19:40:24 +01:00
const i2p::data::IdentHash & remoteIdent) :
m_LocalDestination(localDestination),
2016-12-12 19:40:24 +01:00
m_RemoteIdent(remoteIdent),
2016-12-27 00:47:47 +01:00
m_SendQueueTimer(localDestination->GetService()),
m_RequestingLS(false)
2017-01-17 18:13:56 +01:00
{
}
void DatagramSession::Start ()
{
2016-12-12 19:40:24 +01:00
m_LastUse = i2p::util::GetMillisecondsSinceEpoch ();
ScheduleFlushSendQueue();
}
2017-01-17 18:13:56 +01:00
void DatagramSession::Stop ()
{
m_SendQueueTimer.cancel ();
}
void DatagramSession::SendMsg(std::shared_ptr<I2NPMessage> msg)
{
// we used this session
m_LastUse = i2p::util::GetMillisecondsSinceEpoch();
// schedule send
2017-01-16 13:54:56 +01:00
auto self = shared_from_this();
m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, self, msg));
}
2016-09-03 19:58:34 +02:00
DatagramSession::Info DatagramSession::GetSessionInfo() const
{
if(!m_RoutingSession)
2016-12-12 19:40:24 +01:00
return DatagramSession::Info(nullptr, nullptr, m_LastUse);
2018-01-06 04:48:51 +01:00
2016-09-03 19:58:34 +02:00
auto routingPath = m_RoutingSession->GetSharedRoutingPath();
if (!routingPath)
2016-12-12 19:40:24 +01:00
return DatagramSession::Info(nullptr, nullptr, m_LastUse);
2016-09-03 19:58:34 +02:00
auto lease = routingPath->remoteLease;
auto tunnel = routingPath->outboundTunnel;
if(lease)
{
if(tunnel)
2016-12-12 19:40:24 +01:00
return DatagramSession::Info(lease->tunnelGateway, tunnel->GetEndpointIdentHash(), m_LastUse);
2016-09-03 19:58:34 +02:00
else
2016-12-12 19:40:24 +01:00
return DatagramSession::Info(lease->tunnelGateway, nullptr, m_LastUse);
2016-09-03 19:58:34 +02:00
}
else if(tunnel)
2016-12-12 19:40:24 +01:00
return DatagramSession::Info(nullptr, tunnel->GetEndpointIdentHash(), m_LastUse);
2016-09-03 19:58:34 +02:00
else
2016-12-12 19:40:24 +01:00
return DatagramSession::Info(nullptr, nullptr, m_LastUse);
}
2016-12-12 19:40:24 +01:00
void DatagramSession::Ack()
{
2016-12-12 19:40:24 +01:00
m_LastUse = i2p::util::GetMillisecondsSinceEpoch();
auto path = GetSharedRoutingPath();
if(path)
path->updateTime = i2p::util::GetSecondsSinceEpoch ();
2020-05-17 22:49:31 +02:00
if (IsRatchets ())
SendMsg (nullptr); // send empty message in case if we have some data to send
}
2016-12-12 19:40:24 +01:00
std::shared_ptr<i2p::garlic::GarlicRoutingPath> DatagramSession::GetSharedRoutingPath ()
{
2016-12-12 19:40:24 +01:00
if(!m_RoutingSession) {
if(!m_RemoteLeaseSet) {
m_RemoteLeaseSet = m_LocalDestination->FindLeaseSet(m_RemoteIdent);
}
2016-12-12 19:40:24 +01:00
if(!m_RemoteLeaseSet) {
// no remote lease set
2016-12-27 00:47:47 +01:00
if(!m_RequestingLS) {
m_RequestingLS = true;
m_LocalDestination->RequestDestination(m_RemoteIdent, std::bind(&DatagramSession::HandleLeaseSetUpdated, this, std::placeholders::_1));
}
2016-12-12 19:40:24 +01:00
return nullptr;
}
m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true);
}
2016-12-12 19:40:24 +01:00
auto path = m_RoutingSession->GetSharedRoutingPath();
if(path) {
if (m_CurrentOutboundTunnel && !m_CurrentOutboundTunnel->IsEstablished()) {
// bad outbound tunnel, switch outbound tunnel
m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(m_CurrentOutboundTunnel);
path->outboundTunnel = m_CurrentOutboundTunnel;
}
2016-12-13 00:54:31 +01:00
if(m_CurrentRemoteLease && m_CurrentRemoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) {
2016-12-12 19:40:24 +01:00
// bad lease, switch to next one
2017-04-09 14:52:42 +02:00
if(m_RemoteLeaseSet && m_RemoteLeaseSet->IsExpired())
m_RemoteLeaseSet = m_LocalDestination->FindLeaseSet(m_RemoteIdent);
2016-12-12 19:40:24 +01:00
if(m_RemoteLeaseSet) {
auto ls = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding([&](const i2p::data::Lease& l) -> bool {
2017-04-09 14:52:42 +02:00
return l.tunnelID == m_CurrentRemoteLease->tunnelID;
2016-12-12 19:40:24 +01:00
});
auto sz = ls.size();
if (sz) {
auto idx = rand() % sz;
m_CurrentRemoteLease = ls[idx];
}
} else {
// no remote lease set?
LogPrint(eLogWarning, "DatagramSession: no cached remote lease set for ", m_RemoteIdent.ToBase32());
2016-09-03 21:53:28 +02:00
}
2016-12-12 19:40:24 +01:00
path->remoteLease = m_CurrentRemoteLease;
}
2016-12-12 19:40:24 +01:00
} else {
// no current path, make one
path = std::make_shared<i2p::garlic::GarlicRoutingPath>();
// switch outbound tunnel if bad
if(m_CurrentOutboundTunnel == nullptr || ! m_CurrentOutboundTunnel->IsEstablished()) {
m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(m_CurrentOutboundTunnel);
}
// switch lease if bad
2017-04-09 14:52:42 +02:00
if(m_CurrentRemoteLease && m_CurrentRemoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) {
2016-12-12 19:40:24 +01:00
if(!m_RemoteLeaseSet) {
m_RemoteLeaseSet = m_LocalDestination->FindLeaseSet(m_RemoteIdent);
}
if(m_RemoteLeaseSet) {
// pick random next good lease
auto ls = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding([&] (const i2p::data::Lease & l) -> bool {
if(m_CurrentRemoteLease)
return l.tunnelGateway == m_CurrentRemoteLease->tunnelGateway;
return false;
});
auto sz = ls.size();
if(sz) {
auto idx = rand() % sz;
m_CurrentRemoteLease = ls[idx];
}
} else {
// no remote lease set currently, bail
LogPrint(eLogWarning, "DatagramSession: no remote lease set found for ", m_RemoteIdent.ToBase32());
return nullptr;
}
2017-04-09 14:52:42 +02:00
} else if (!m_CurrentRemoteLease) {
if(!m_RemoteLeaseSet) m_RemoteLeaseSet = m_LocalDestination->FindLeaseSet(m_RemoteIdent);
if (m_RemoteLeaseSet)
{
auto ls = m_RemoteLeaseSet->GetNonExpiredLeases();
auto sz = ls.size();
if (sz) {
auto idx = rand() % sz;
m_CurrentRemoteLease = ls[idx];
}
}
}
2016-12-12 19:40:24 +01:00
path->outboundTunnel = m_CurrentOutboundTunnel;
path->remoteLease = m_CurrentRemoteLease;
m_RoutingSession->SetSharedRoutingPath(path);
}
2016-12-12 19:40:24 +01:00
return path;
2018-01-06 04:48:51 +01:00
}
2016-12-12 19:40:24 +01:00
void DatagramSession::HandleLeaseSetUpdated(std::shared_ptr<i2p::data::LeaseSet> ls)
{
2016-12-27 00:47:47 +01:00
m_RequestingLS = false;
if(!ls) return;
2016-12-12 19:40:24 +01:00
// only update lease set if found and newer than previous lease set
uint64_t oldExpire = 0;
if(m_RemoteLeaseSet) oldExpire = m_RemoteLeaseSet->GetExpirationTime();
if(ls && ls->GetExpirationTime() > oldExpire) m_RemoteLeaseSet = ls;
}
2016-12-12 19:40:24 +01:00
void DatagramSession::HandleSend(std::shared_ptr<I2NPMessage> msg)
{
if (msg || m_SendQueue.empty ())
m_SendQueue.push_back(msg);
2016-12-12 19:40:24 +01:00
// flush queue right away if full
if(m_SendQueue.size() >= DATAGRAM_SEND_QUEUE_MAX_SIZE) FlushSendQueue();
}
void DatagramSession::FlushSendQueue ()
{
std::vector<i2p::tunnel::TunnelMessageBlock> send;
auto routingPath = GetSharedRoutingPath();
// if we don't have a routing path we will drop all queued messages
if(routingPath && routingPath->outboundTunnel && routingPath->remoteLease)
{
2016-12-12 19:40:24 +01:00
for (const auto & msg : m_SendQueue)
{
2016-12-12 19:40:24 +01:00
auto m = m_RoutingSession->WrapSingleMessage(msg);
if (m)
send.push_back(i2p::tunnel::TunnelMessageBlock{i2p::tunnel::eDeliveryTypeTunnel,routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, m});
}
2016-12-12 19:40:24 +01:00
routingPath->outboundTunnel->SendTunnelDataMsg(send);
}
2016-12-12 19:40:24 +01:00
m_SendQueue.clear();
ScheduleFlushSendQueue();
}
2016-12-12 19:40:24 +01:00
void DatagramSession::ScheduleFlushSendQueue()
{
2017-04-09 14:52:42 +02:00
boost::posix_time::milliseconds dlt(10);
2016-12-12 19:40:24 +01:00
m_SendQueueTimer.expires_from_now(dlt);
2017-01-16 13:54:56 +01:00
auto self = shared_from_this();
m_SendQueueTimer.async_wait([self](const boost::system::error_code & ec) { if(ec) return; self->FlushSendQueue(); });
}
2014-10-22 21:30:25 +02:00
}
}