mirror of
https://github.com/PurpleI2P/i2pd.git
synced 2025-04-30 12:47:48 +02:00
Implement #243, separate core/client (PCH support dropped for now)
This commit is contained in:
parent
bdaf2c16aa
commit
8ac9520dfd
153 changed files with 360 additions and 20020 deletions
936
core/transport/NTCPSession.cpp
Normal file
936
core/transport/NTCPSession.cpp
Normal file
|
@ -0,0 +1,936 @@
|
|||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include "util/I2PEndian.h"
|
||||
#include <cryptopp/dh.h>
|
||||
#include <cryptopp/adler32.h>
|
||||
#include "util/base64.h"
|
||||
#include "util/Log.h"
|
||||
#include "util/Timestamp.h"
|
||||
#include "crypto/CryptoConst.h"
|
||||
#include "I2NPProtocol.h"
|
||||
#include "RouterContext.h"
|
||||
#include "Transports.h"
|
||||
#include "NetDb.h"
|
||||
#include "NTCPSession.h"
|
||||
|
||||
using namespace i2p::crypto;
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
NTCPSession::NTCPSession (NTCPServer& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter):
|
||||
TransportSession (in_RemoteRouter), m_Server (server), m_Socket (m_Server.GetService ()),
|
||||
m_TerminationTimer (m_Server.GetService ()), m_IsEstablished (false), m_IsTerminated (false),
|
||||
m_ReceiveBufferOffset (0), m_NextMessage (nullptr), m_IsSending (false)
|
||||
{
|
||||
m_DHKeysPair = transports.GetNextDHKeysPair ();
|
||||
m_Establisher = new Establisher;
|
||||
}
|
||||
|
||||
NTCPSession::~NTCPSession ()
|
||||
{
|
||||
delete m_Establisher;
|
||||
}
|
||||
|
||||
void NTCPSession::CreateAESKey (uint8_t * pubKey, i2p::crypto::AESKey& key)
|
||||
{
|
||||
CryptoPP::DH dh (elgp, elgg);
|
||||
uint8_t sharedKey[256];
|
||||
if (!dh.Agree (sharedKey, m_DHKeysPair->privateKey, pubKey))
|
||||
{
|
||||
LogPrint (eLogError, "Couldn't create shared key");
|
||||
Terminate ();
|
||||
return;
|
||||
};
|
||||
|
||||
uint8_t * aesKey = key;
|
||||
if (sharedKey[0] & 0x80)
|
||||
{
|
||||
aesKey[0] = 0;
|
||||
memcpy (aesKey + 1, sharedKey, 31);
|
||||
}
|
||||
else if (sharedKey[0])
|
||||
memcpy (aesKey, sharedKey, 32);
|
||||
else
|
||||
{
|
||||
// find first non-zero byte
|
||||
uint8_t * nonZero = sharedKey + 1;
|
||||
while (!*nonZero)
|
||||
{
|
||||
nonZero++;
|
||||
if (nonZero - sharedKey > 32)
|
||||
{
|
||||
LogPrint (eLogWarning, "First 32 bytes of shared key is all zeros. Ignored");
|
||||
return;
|
||||
}
|
||||
}
|
||||
memcpy (aesKey, nonZero, 32);
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::Done ()
|
||||
{
|
||||
m_Server.GetService ().post (std::bind (&NTCPSession::Terminate, shared_from_this ()));
|
||||
}
|
||||
|
||||
void NTCPSession::Terminate ()
|
||||
{
|
||||
if (!m_IsTerminated)
|
||||
{
|
||||
m_IsTerminated = true;
|
||||
m_IsEstablished = false;
|
||||
m_Socket.close ();
|
||||
transports.PeerDisconnected (shared_from_this ());
|
||||
m_Server.RemoveNTCPSession (shared_from_this ());
|
||||
m_SendQueue.clear ();
|
||||
m_NextMessage = nullptr;
|
||||
m_TerminationTimer.cancel ();
|
||||
LogPrint (eLogInfo, "NTCP session terminated");
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::Connected ()
|
||||
{
|
||||
m_IsEstablished = true;
|
||||
|
||||
delete m_Establisher;
|
||||
m_Establisher = nullptr;
|
||||
|
||||
delete m_DHKeysPair;
|
||||
m_DHKeysPair = nullptr;
|
||||
|
||||
SendTimeSyncMessage ();
|
||||
m_SendQueue.push_back (CreateDatabaseStoreMsg ()); // we tell immediately who we are
|
||||
|
||||
transports.PeerConnected (shared_from_this ());
|
||||
}
|
||||
|
||||
void NTCPSession::ClientLogin ()
|
||||
{
|
||||
if (!m_DHKeysPair)
|
||||
m_DHKeysPair = transports.GetNextDHKeysPair ();
|
||||
// send Phase1
|
||||
const uint8_t * x = m_DHKeysPair->publicKey;
|
||||
memcpy (m_Establisher->phase1.pubKey, x, 256);
|
||||
CryptoPP::SHA256().CalculateDigest(m_Establisher->phase1.HXxorHI, x, 256);
|
||||
const uint8_t * ident = m_RemoteIdentity.GetIdentHash ();
|
||||
for (int i = 0; i < 32; i++)
|
||||
m_Establisher->phase1.HXxorHI[i] ^= ident[i];
|
||||
|
||||
boost::asio::async_write (m_Socket, boost::asio::buffer (&m_Establisher->phase1, sizeof (NTCPPhase1)), boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandlePhase1Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2));
|
||||
ScheduleTermination ();
|
||||
}
|
||||
|
||||
void NTCPSession::ServerLogin ()
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto ep = m_Socket.remote_endpoint(ec);
|
||||
if (!ec)
|
||||
{
|
||||
m_ConnectedFrom = ep.address ();
|
||||
// receive Phase1
|
||||
boost::asio::async_read (m_Socket, boost::asio::buffer(&m_Establisher->phase1, sizeof (NTCPPhase1)), boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandlePhase1Received, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2));
|
||||
ScheduleTermination ();
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::HandlePhase1Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogError, "Couldn't send Phase 1 message: ", ecode.message ());
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
Terminate ();
|
||||
}
|
||||
else
|
||||
{
|
||||
boost::asio::async_read (m_Socket, boost::asio::buffer(&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandlePhase2Received, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::HandlePhase1Received (const boost::system::error_code& ecode, std::size_t bytes_transferred)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogError, "Phase 1 read error: ", ecode.message ());
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
Terminate ();
|
||||
}
|
||||
else
|
||||
{
|
||||
// verify ident
|
||||
uint8_t digest[32];
|
||||
CryptoPP::SHA256().CalculateDigest(digest, m_Establisher->phase1.pubKey, 256);
|
||||
const uint8_t * ident = i2p::context.GetRouterInfo ().GetIdentHash ();
|
||||
for (int i = 0; i < 32; i++)
|
||||
{
|
||||
if ((m_Establisher->phase1.HXxorHI[i] ^ ident[i]) != digest[i])
|
||||
{
|
||||
LogPrint (eLogError, "Wrong ident");
|
||||
Terminate ();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
SendPhase2 ();
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::SendPhase2 ()
|
||||
{
|
||||
if (!m_DHKeysPair)
|
||||
m_DHKeysPair = transports.GetNextDHKeysPair ();
|
||||
const uint8_t * y = m_DHKeysPair->publicKey;
|
||||
memcpy (m_Establisher->phase2.pubKey, y, 256);
|
||||
uint8_t xy[512];
|
||||
memcpy (xy, m_Establisher->phase1.pubKey, 256);
|
||||
memcpy (xy + 256, y, 256);
|
||||
CryptoPP::SHA256().CalculateDigest(m_Establisher->phase2.encrypted.hxy, xy, 512);
|
||||
uint32_t tsB = htobe32 (i2p::util::GetSecondsSinceEpoch ());
|
||||
m_Establisher->phase2.encrypted.timestamp = tsB;
|
||||
// TODO: fill filler
|
||||
|
||||
i2p::crypto::AESKey aesKey;
|
||||
CreateAESKey (m_Establisher->phase1.pubKey, aesKey);
|
||||
m_Encryption.SetKey (aesKey);
|
||||
m_Encryption.SetIV (y + 240);
|
||||
m_Decryption.SetKey (aesKey);
|
||||
m_Decryption.SetIV (m_Establisher->phase1.HXxorHI + 16);
|
||||
|
||||
m_Encryption.Encrypt ((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted);
|
||||
boost::asio::async_write (m_Socket, boost::asio::buffer (&m_Establisher->phase2, sizeof (NTCPPhase2)), boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandlePhase2Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsB));
|
||||
|
||||
}
|
||||
|
||||
void NTCPSession::HandlePhase2Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogError, "Couldn't send Phase 2 message: ", ecode.message ());
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
Terminate ();
|
||||
}
|
||||
else
|
||||
{
|
||||
boost::asio::async_read (m_Socket, boost::asio::buffer(m_ReceiveBuffer, NTCP_DEFAULT_PHASE3_SIZE), boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandlePhase3Received, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2, tsB));
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::HandlePhase2Received (const boost::system::error_code& ecode, std::size_t bytes_transferred)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogError, "Phase 2 read error: ", ecode.message (), ". Wrong ident assumed");
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
{
|
||||
// this RI is not valid
|
||||
i2p::data::netdb.SetUnreachable (GetRemoteIdentity ().GetIdentHash (), true);
|
||||
transports.ReuseDHKeysPair (m_DHKeysPair);
|
||||
m_DHKeysPair = nullptr;
|
||||
Terminate ();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
i2p::crypto::AESKey aesKey;
|
||||
CreateAESKey (m_Establisher->phase2.pubKey, aesKey);
|
||||
m_Decryption.SetKey (aesKey);
|
||||
m_Decryption.SetIV (m_Establisher->phase2.pubKey + 240);
|
||||
m_Encryption.SetKey (aesKey);
|
||||
m_Encryption.SetIV (m_Establisher->phase1.HXxorHI + 16);
|
||||
|
||||
m_Decryption.Decrypt((uint8_t *)&m_Establisher->phase2.encrypted, sizeof(m_Establisher->phase2.encrypted), (uint8_t *)&m_Establisher->phase2.encrypted);
|
||||
// verify
|
||||
uint8_t xy[512];
|
||||
memcpy (xy, m_DHKeysPair->publicKey, 256);
|
||||
memcpy (xy + 256, m_Establisher->phase2.pubKey, 256);
|
||||
if (!CryptoPP::SHA256().VerifyDigest(m_Establisher->phase2.encrypted.hxy, xy, 512))
|
||||
{
|
||||
LogPrint (eLogError, "Incorrect hash");
|
||||
transports.ReuseDHKeysPair (m_DHKeysPair);
|
||||
m_DHKeysPair = nullptr;
|
||||
Terminate ();
|
||||
return ;
|
||||
}
|
||||
SendPhase3 ();
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::SendPhase3 ()
|
||||
{
|
||||
auto keys = i2p::context.GetPrivateKeys ();
|
||||
uint8_t * buf = m_ReceiveBuffer;
|
||||
htobe16buf (buf, keys.GetPublic ().GetFullLen ());
|
||||
buf += 2;
|
||||
buf += i2p::context.GetIdentity ().ToBuffer (buf, NTCP_BUFFER_SIZE);
|
||||
uint32_t tsA = htobe32 (i2p::util::GetSecondsSinceEpoch ());
|
||||
htobuf32(buf,tsA);
|
||||
buf += 4;
|
||||
size_t signatureLen = keys.GetPublic ().GetSignatureLen ();
|
||||
size_t len = (buf - m_ReceiveBuffer) + signatureLen;
|
||||
size_t paddingSize = len & 0x0F; // %16
|
||||
if (paddingSize > 0)
|
||||
{
|
||||
paddingSize = 16 - paddingSize;
|
||||
// TODO: fill padding with random data
|
||||
buf += paddingSize;
|
||||
len += paddingSize;
|
||||
}
|
||||
|
||||
SignedData s;
|
||||
s.Insert (m_Establisher->phase1.pubKey, 256); // x
|
||||
s.Insert (m_Establisher->phase2.pubKey, 256); // y
|
||||
s.Insert (m_RemoteIdentity.GetIdentHash (), 32); // ident
|
||||
s.Insert (tsA); // tsA
|
||||
s.Insert (m_Establisher->phase2.encrypted.timestamp); // tsB
|
||||
s.Sign (keys, buf);
|
||||
|
||||
m_Encryption.Encrypt(m_ReceiveBuffer, len, m_ReceiveBuffer);
|
||||
boost::asio::async_write (m_Socket, boost::asio::buffer (m_ReceiveBuffer, len), boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandlePhase3Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, tsA));
|
||||
}
|
||||
|
||||
void NTCPSession::HandlePhase3Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogError, "Couldn't send Phase 3 message: ", ecode.message ());
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
Terminate ();
|
||||
}
|
||||
else
|
||||
{
|
||||
// wait for phase4
|
||||
auto signatureLen = m_RemoteIdentity.GetSignatureLen ();
|
||||
size_t paddingSize = signatureLen & 0x0F; // %16
|
||||
if (paddingSize > 0) signatureLen += (16 - paddingSize);
|
||||
boost::asio::async_read (m_Socket, boost::asio::buffer(m_ReceiveBuffer, signatureLen), boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandlePhase4Received, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2, tsA));
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::HandlePhase3Received (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogError, "Phase 3 read error: ", ecode.message ());
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
Terminate ();
|
||||
}
|
||||
else
|
||||
{
|
||||
m_Decryption.Decrypt (m_ReceiveBuffer, bytes_transferred, m_ReceiveBuffer);
|
||||
uint8_t * buf = m_ReceiveBuffer;
|
||||
uint16_t size = bufbe16toh (buf);
|
||||
m_RemoteIdentity.FromBuffer (buf + 2, size);
|
||||
if (m_Server.FindNTCPSession (m_RemoteIdentity.GetIdentHash ()))
|
||||
{
|
||||
LogPrint (eLogError, "NTCP session already exists");
|
||||
Terminate ();
|
||||
}
|
||||
size_t expectedSize = size + 2/*size*/ + 4/*timestamp*/ + m_RemoteIdentity.GetSignatureLen ();
|
||||
size_t paddingLen = expectedSize & 0x0F;
|
||||
if (paddingLen) paddingLen = (16 - paddingLen);
|
||||
if (expectedSize > NTCP_DEFAULT_PHASE3_SIZE)
|
||||
{
|
||||
// we need more bytes for Phase3
|
||||
expectedSize += paddingLen;
|
||||
boost::asio::async_read (m_Socket, boost::asio::buffer(m_ReceiveBuffer + NTCP_DEFAULT_PHASE3_SIZE, expectedSize), boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandlePhase3ExtraReceived, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2, tsB, paddingLen));
|
||||
}
|
||||
else
|
||||
HandlePhase3 (tsB, paddingLen);
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::HandlePhase3ExtraReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB, size_t paddingLen)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogError, "Phase 3 extra read error: ", ecode.message ());
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
Terminate ();
|
||||
}
|
||||
else
|
||||
{
|
||||
m_Decryption.Decrypt (m_ReceiveBuffer + NTCP_DEFAULT_PHASE3_SIZE, bytes_transferred, m_ReceiveBuffer+ NTCP_DEFAULT_PHASE3_SIZE);
|
||||
HandlePhase3 (tsB, paddingLen);
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::HandlePhase3 (uint32_t tsB, size_t paddingLen)
|
||||
{
|
||||
uint8_t * buf = m_ReceiveBuffer + m_RemoteIdentity.GetFullLen () + 2 /*size*/;
|
||||
uint32_t tsA = buf32toh(buf);
|
||||
buf += 4;
|
||||
buf += paddingLen;
|
||||
|
||||
SignedData s;
|
||||
s.Insert (m_Establisher->phase1.pubKey, 256); // x
|
||||
s.Insert (m_Establisher->phase2.pubKey, 256); // y
|
||||
s.Insert (i2p::context.GetRouterInfo ().GetIdentHash (), 32); // ident
|
||||
s.Insert (tsA); // tsA
|
||||
s.Insert (tsB); // tsB
|
||||
if (!s.Verify (m_RemoteIdentity, buf))
|
||||
{
|
||||
LogPrint (eLogError, "signature verification failed");
|
||||
Terminate ();
|
||||
return;
|
||||
}
|
||||
m_RemoteIdentity.DropVerifier ();
|
||||
|
||||
SendPhase4 (tsA, tsB);
|
||||
}
|
||||
|
||||
void NTCPSession::SendPhase4 (uint32_t tsA, uint32_t tsB)
|
||||
{
|
||||
SignedData s;
|
||||
s.Insert (m_Establisher->phase1.pubKey, 256); // x
|
||||
s.Insert (m_Establisher->phase2.pubKey, 256); // y
|
||||
s.Insert (m_RemoteIdentity.GetIdentHash (), 32); // ident
|
||||
s.Insert (tsA); // tsA
|
||||
s.Insert (tsB); // tsB
|
||||
auto keys = i2p::context.GetPrivateKeys ();
|
||||
auto signatureLen = keys.GetPublic ().GetSignatureLen ();
|
||||
s.Sign (keys, m_ReceiveBuffer);
|
||||
size_t paddingSize = signatureLen & 0x0F; // %16
|
||||
if (paddingSize > 0) signatureLen += (16 - paddingSize);
|
||||
m_Encryption.Encrypt (m_ReceiveBuffer, signatureLen, m_ReceiveBuffer);
|
||||
|
||||
boost::asio::async_write (m_Socket, boost::asio::buffer (m_ReceiveBuffer, signatureLen), boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandlePhase4Sent, shared_from_this (), std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
|
||||
void NTCPSession::HandlePhase4Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogWarning, "Couldn't send Phase 4 message: ", ecode.message ());
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
Terminate ();
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint (eLogInfo, "NTCP server session from ", m_Socket.remote_endpoint (), " connected");
|
||||
m_Server.AddNTCPSession (shared_from_this ());
|
||||
|
||||
Connected ();
|
||||
m_ReceiveBufferOffset = 0;
|
||||
m_NextMessage = nullptr;
|
||||
Receive ();
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::HandlePhase4Received (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogError, "Phase 4 read error: ", ecode.message (), ". Check your clock");
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
{
|
||||
// this router doesn't like us
|
||||
i2p::data::netdb.SetUnreachable (GetRemoteIdentity ().GetIdentHash (), true);
|
||||
Terminate ();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
m_Decryption.Decrypt(m_ReceiveBuffer, bytes_transferred, m_ReceiveBuffer);
|
||||
|
||||
// verify signature
|
||||
SignedData s;
|
||||
s.Insert (m_Establisher->phase1.pubKey, 256); // x
|
||||
s.Insert (m_Establisher->phase2.pubKey, 256); // y
|
||||
s.Insert (i2p::context.GetRouterInfo ().GetIdentHash (), 32); // ident
|
||||
s.Insert (tsA); // tsA
|
||||
s.Insert (m_Establisher->phase2.encrypted.timestamp); // tsB
|
||||
|
||||
if (!s.Verify (m_RemoteIdentity, m_ReceiveBuffer))
|
||||
{
|
||||
LogPrint (eLogError, "signature verification failed");
|
||||
Terminate ();
|
||||
return;
|
||||
}
|
||||
m_RemoteIdentity.DropVerifier ();
|
||||
LogPrint (eLogInfo, "NTCP session to ", m_Socket.remote_endpoint (), " connected");
|
||||
Connected ();
|
||||
|
||||
m_ReceiveBufferOffset = 0;
|
||||
m_NextMessage = nullptr;
|
||||
Receive ();
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPSession::Receive ()
|
||||
{
|
||||
m_Socket.async_read_some (boost::asio::buffer(m_ReceiveBuffer + m_ReceiveBufferOffset, NTCP_BUFFER_SIZE - m_ReceiveBufferOffset),
|
||||
std::bind(&NTCPSession::HandleReceived, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
|
||||
void NTCPSession::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogError, "Read error: ", ecode.message ());
|
||||
if (!m_NumReceivedBytes) m_Server.Ban (m_ConnectedFrom);
|
||||
//if (ecode != boost::asio::error::operation_aborted)
|
||||
Terminate ();
|
||||
}
|
||||
else
|
||||
{
|
||||
m_NumReceivedBytes += bytes_transferred;
|
||||
i2p::transport::transports.UpdateReceivedBytes (bytes_transferred);
|
||||
m_ReceiveBufferOffset += bytes_transferred;
|
||||
|
||||
if (m_ReceiveBufferOffset >= 16)
|
||||
{
|
||||
int numReloads = 0;
|
||||
do
|
||||
{
|
||||
uint8_t * nextBlock = m_ReceiveBuffer;
|
||||
while (m_ReceiveBufferOffset >= 16)
|
||||
{
|
||||
if (!DecryptNextBlock (nextBlock)) // 16 bytes
|
||||
{
|
||||
Terminate ();
|
||||
return;
|
||||
}
|
||||
nextBlock += 16;
|
||||
m_ReceiveBufferOffset -= 16;
|
||||
}
|
||||
if (m_ReceiveBufferOffset > 0)
|
||||
memcpy (m_ReceiveBuffer, nextBlock, m_ReceiveBufferOffset);
|
||||
|
||||
// try to read more
|
||||
if (numReloads < 5)
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
size_t moreBytes = m_Socket.available(ec);
|
||||
if (moreBytes)
|
||||
{
|
||||
if (moreBytes > NTCP_BUFFER_SIZE - m_ReceiveBufferOffset)
|
||||
moreBytes = NTCP_BUFFER_SIZE - m_ReceiveBufferOffset;
|
||||
moreBytes = m_Socket.read_some (boost::asio::buffer (m_ReceiveBuffer + m_ReceiveBufferOffset, moreBytes));
|
||||
if (ec)
|
||||
{
|
||||
LogPrint (eLogError, "Read more bytes error: ", ec.message ());
|
||||
Terminate ();
|
||||
return;
|
||||
}
|
||||
m_NumReceivedBytes += moreBytes;
|
||||
m_ReceiveBufferOffset += moreBytes;
|
||||
numReloads++;
|
||||
}
|
||||
}
|
||||
}
|
||||
while (m_ReceiveBufferOffset >= 16);
|
||||
m_Handler.Flush ();
|
||||
}
|
||||
|
||||
ScheduleTermination (); // reset termination timer
|
||||
Receive ();
|
||||
}
|
||||
}
|
||||
|
||||
bool NTCPSession::DecryptNextBlock (const uint8_t * encrypted) // 16 bytes
|
||||
{
|
||||
if (!m_NextMessage) // new message, header expected
|
||||
{
|
||||
// descrypt header and extract length
|
||||
uint8_t buf[16];
|
||||
m_Decryption.Decrypt (encrypted, buf);
|
||||
uint16_t dataSize = bufbe16toh (buf);
|
||||
if (dataSize)
|
||||
{
|
||||
// new message
|
||||
if (dataSize > NTCP_MAX_MESSAGE_SIZE)
|
||||
{
|
||||
LogPrint (eLogError, "NTCP data size ", dataSize, " exceeds max size");
|
||||
return false;
|
||||
}
|
||||
auto msg = dataSize <= I2NP_MAX_SHORT_MESSAGE_SIZE - 2 ? NewI2NPShortMessage () : NewI2NPMessage ();
|
||||
m_NextMessage = ToSharedI2NPMessage (msg);
|
||||
memcpy (m_NextMessage->buf, buf, 16);
|
||||
m_NextMessageOffset = 16;
|
||||
m_NextMessage->offset = 2; // size field
|
||||
m_NextMessage->len = dataSize + 2;
|
||||
}
|
||||
else
|
||||
{
|
||||
// timestamp
|
||||
LogPrint ("Timestamp");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
else // message continues
|
||||
{
|
||||
m_Decryption.Decrypt (encrypted, m_NextMessage->buf + m_NextMessageOffset);
|
||||
m_NextMessageOffset += 16;
|
||||
}
|
||||
|
||||
if (m_NextMessageOffset >= m_NextMessage->len + 4) // +checksum
|
||||
{
|
||||
// we have a complete I2NP message
|
||||
if (CryptoPP::Adler32().VerifyDigest (m_NextMessage->buf + m_NextMessageOffset - 4, m_NextMessage->buf, m_NextMessageOffset - 4))
|
||||
m_Handler.PutNextMessage (m_NextMessage);
|
||||
else
|
||||
LogPrint (eLogWarning, "Incorrect adler checksum of NTCP message. Dropped");
|
||||
m_NextMessage = nullptr;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void NTCPSession::Send (std::shared_ptr<i2p::I2NPMessage> msg)
|
||||
{
|
||||
m_IsSending = true;
|
||||
boost::asio::async_write (m_Socket, CreateMsgBuffer (msg), boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::vector<std::shared_ptr<I2NPMessage> >{ msg }));
|
||||
}
|
||||
|
||||
boost::asio::const_buffers_1 NTCPSession::CreateMsgBuffer (std::shared_ptr<I2NPMessage> msg)
|
||||
{
|
||||
uint8_t * sendBuffer;
|
||||
int len;
|
||||
|
||||
if (msg)
|
||||
{
|
||||
// regular I2NP
|
||||
if (msg->offset < 2)
|
||||
LogPrint (eLogError, "Malformed I2NP message"); // TODO:
|
||||
sendBuffer = msg->GetBuffer () - 2;
|
||||
len = msg->GetLength ();
|
||||
htobe16buf (sendBuffer, len);
|
||||
}
|
||||
else
|
||||
{
|
||||
// prepare timestamp
|
||||
sendBuffer = m_TimeSyncBuffer;
|
||||
len = 4;
|
||||
htobuf16(sendBuffer, 0);
|
||||
htobe32buf (sendBuffer + 2, time (0));
|
||||
}
|
||||
int rem = (len + 6) & 0x0F; // %16
|
||||
int padding = 0;
|
||||
if (rem > 0) padding = 16 - rem;
|
||||
// TODO: fill padding
|
||||
CryptoPP::Adler32().CalculateDigest (sendBuffer + len + 2 + padding, sendBuffer, len + 2+ padding);
|
||||
|
||||
int l = len + padding + 6;
|
||||
m_Encryption.Encrypt(sendBuffer, l, sendBuffer);
|
||||
return boost::asio::buffer ((const uint8_t *)sendBuffer, l);
|
||||
}
|
||||
|
||||
|
||||
void NTCPSession::Send (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
|
||||
{
|
||||
m_IsSending = true;
|
||||
std::vector<boost::asio::const_buffer> bufs;
|
||||
for (auto it: msgs)
|
||||
bufs.push_back (CreateMsgBuffer (it));
|
||||
boost::asio::async_write (m_Socket, bufs, boost::asio::transfer_all (),
|
||||
std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, msgs));
|
||||
}
|
||||
|
||||
void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector<std::shared_ptr<I2NPMessage> > msgs)
|
||||
{
|
||||
m_IsSending = false;
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogWarning, "Couldn't send msgs: ", ecode.message ());
|
||||
// we shouldn't call Terminate () here, because HandleReceive takes care
|
||||
// TODO: 'delete this' statement in Terminate () must be eliminated later
|
||||
// Terminate ();
|
||||
}
|
||||
else
|
||||
{
|
||||
m_NumSentBytes += bytes_transferred;
|
||||
i2p::transport::transports.UpdateSentBytes (bytes_transferred);
|
||||
if (!m_SendQueue.empty())
|
||||
{
|
||||
Send (m_SendQueue);
|
||||
m_SendQueue.clear ();
|
||||
}
|
||||
else
|
||||
ScheduleTermination (); // reset termination timer
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void NTCPSession::SendTimeSyncMessage ()
|
||||
{
|
||||
Send (nullptr);
|
||||
}
|
||||
|
||||
|
||||
void NTCPSession::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
|
||||
{
|
||||
m_Server.GetService ().post (std::bind (&NTCPSession::PostI2NPMessages, shared_from_this (), msgs));
|
||||
}
|
||||
|
||||
void NTCPSession::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
|
||||
{
|
||||
if (m_IsTerminated) return;
|
||||
if (m_IsSending)
|
||||
{
|
||||
for (auto it: msgs)
|
||||
m_SendQueue.push_back (it);
|
||||
}
|
||||
else
|
||||
Send (msgs);
|
||||
}
|
||||
|
||||
void NTCPSession::ScheduleTermination ()
|
||||
{
|
||||
m_TerminationTimer.cancel ();
|
||||
m_TerminationTimer.expires_from_now (boost::posix_time::seconds(NTCP_TERMINATION_TIMEOUT));
|
||||
m_TerminationTimer.async_wait (std::bind (&NTCPSession::HandleTerminationTimer,
|
||||
shared_from_this (), std::placeholders::_1));
|
||||
}
|
||||
|
||||
void NTCPSession::HandleTerminationTimer (const boost::system::error_code& ecode)
|
||||
{
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
{
|
||||
LogPrint ("No activity fo ", NTCP_TERMINATION_TIMEOUT, " seconds");
|
||||
//Terminate ();
|
||||
m_Socket.close ();// invoke Terminate () from HandleReceive
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------
|
||||
NTCPServer::NTCPServer (int port):
|
||||
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service),
|
||||
m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
NTCPServer::~NTCPServer ()
|
||||
{
|
||||
Stop ();
|
||||
}
|
||||
|
||||
void NTCPServer::Start ()
|
||||
{
|
||||
if (!m_IsRunning)
|
||||
{
|
||||
m_IsRunning = true;
|
||||
m_Thread = new std::thread (std::bind (&NTCPServer::Run, this));
|
||||
// create acceptors
|
||||
auto addresses = context.GetRouterInfo ().GetAddresses ();
|
||||
for (auto& address : addresses)
|
||||
{
|
||||
if (address.transportStyle == i2p::data::RouterInfo::eTransportNTCP && address.host.is_v4 ())
|
||||
{
|
||||
m_NTCPAcceptor = new boost::asio::ip::tcp::acceptor (m_Service,
|
||||
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), address.port));
|
||||
|
||||
LogPrint (eLogInfo, "Start listening TCP port ", address.port);
|
||||
auto conn = std::make_shared<NTCPSession>(*this);
|
||||
m_NTCPAcceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAccept, this,
|
||||
conn, std::placeholders::_1));
|
||||
|
||||
if (context.SupportsV6 ())
|
||||
{
|
||||
m_NTCPV6Acceptor = new boost::asio::ip::tcp::acceptor (m_Service);
|
||||
m_NTCPV6Acceptor->open (boost::asio::ip::tcp::v6());
|
||||
m_NTCPV6Acceptor->set_option (boost::asio::ip::v6_only (true));
|
||||
m_NTCPV6Acceptor->bind (boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v6(), address.port));
|
||||
m_NTCPV6Acceptor->listen ();
|
||||
|
||||
LogPrint (eLogInfo, "Start listening V6 TCP port ", address.port);
|
||||
auto conn = std::make_shared<NTCPSession> (*this);
|
||||
m_NTCPV6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAcceptV6,
|
||||
this, conn, std::placeholders::_1));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPServer::Stop ()
|
||||
{
|
||||
m_NTCPSessions.clear ();
|
||||
|
||||
if (m_IsRunning)
|
||||
{
|
||||
m_IsRunning = false;
|
||||
delete m_NTCPAcceptor;
|
||||
m_NTCPAcceptor = nullptr;
|
||||
delete m_NTCPV6Acceptor;
|
||||
m_NTCPV6Acceptor = nullptr;
|
||||
|
||||
m_Service.stop ();
|
||||
if (m_Thread)
|
||||
{
|
||||
m_Thread->join ();
|
||||
delete m_Thread;
|
||||
m_Thread = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void NTCPServer::Run ()
|
||||
{
|
||||
while (m_IsRunning)
|
||||
{
|
||||
try
|
||||
{
|
||||
m_Service.run ();
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LogPrint ("NTCP server: ", ex.what ());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPServer::AddNTCPSession (std::shared_ptr<NTCPSession> session)
|
||||
{
|
||||
if (session)
|
||||
{
|
||||
std::unique_lock<std::mutex> l(m_NTCPSessionsMutex);
|
||||
m_NTCPSessions[session->GetRemoteIdentity ().GetIdentHash ()] = session;
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPServer::RemoveNTCPSession (std::shared_ptr<NTCPSession> session)
|
||||
{
|
||||
if (session)
|
||||
{
|
||||
std::unique_lock<std::mutex> l(m_NTCPSessionsMutex);
|
||||
m_NTCPSessions.erase (session->GetRemoteIdentity ().GetIdentHash ());
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<NTCPSession> NTCPServer::FindNTCPSession (const i2p::data::IdentHash& ident)
|
||||
{
|
||||
std::unique_lock<std::mutex> l(m_NTCPSessionsMutex);
|
||||
auto it = m_NTCPSessions.find (ident);
|
||||
if (it != m_NTCPSessions.end ())
|
||||
return it->second;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void NTCPServer::HandleAccept (std::shared_ptr<NTCPSession> conn, const boost::system::error_code& error)
|
||||
{
|
||||
if (!error)
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto ep = conn->GetSocket ().remote_endpoint(ec);
|
||||
if (!ec)
|
||||
{
|
||||
LogPrint (eLogInfo, "Connected from ", ep);
|
||||
auto it = m_BanList.find (ep.address ());
|
||||
if (it != m_BanList.end ())
|
||||
{
|
||||
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
|
||||
if (ts < it->second)
|
||||
{
|
||||
LogPrint (eLogInfo, ep.address (), " is banned for ", it->second - ts, " more seconds");
|
||||
conn = nullptr;
|
||||
}
|
||||
else
|
||||
m_BanList.erase (it);
|
||||
}
|
||||
if (conn)
|
||||
conn->ServerLogin ();
|
||||
}
|
||||
else
|
||||
LogPrint (eLogError, "Connected from error ", ec.message ());
|
||||
}
|
||||
|
||||
|
||||
if (error != boost::asio::error::operation_aborted)
|
||||
{
|
||||
conn = std::make_shared<NTCPSession> (*this);
|
||||
m_NTCPAcceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAccept, this,
|
||||
conn, std::placeholders::_1));
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPServer::HandleAcceptV6 (std::shared_ptr<NTCPSession> conn, const boost::system::error_code& error)
|
||||
{
|
||||
if (!error)
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto ep = conn->GetSocket ().remote_endpoint(ec);
|
||||
if (!ec)
|
||||
{
|
||||
LogPrint (eLogInfo, "Connected from ", ep);
|
||||
auto it = m_BanList.find (ep.address ());
|
||||
if (it != m_BanList.end ())
|
||||
{
|
||||
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
|
||||
if (ts < it->second)
|
||||
{
|
||||
LogPrint (eLogInfo, ep.address (), " is banned for ", it->second - ts, " more seconds");
|
||||
conn = nullptr;
|
||||
}
|
||||
else
|
||||
m_BanList.erase (it);
|
||||
}
|
||||
if (conn)
|
||||
conn->ServerLogin ();
|
||||
}
|
||||
else
|
||||
LogPrint (eLogError, "Connected from error ", ec.message ());
|
||||
}
|
||||
|
||||
if (error != boost::asio::error::operation_aborted)
|
||||
{
|
||||
conn = std::make_shared<NTCPSession> (*this);
|
||||
m_NTCPV6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAcceptV6, this,
|
||||
conn, std::placeholders::_1));
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPServer::Connect (const boost::asio::ip::address& address, int port, std::shared_ptr<NTCPSession> conn)
|
||||
{
|
||||
LogPrint (eLogInfo, "Connecting to ", address ,":", port);
|
||||
m_Service.post([conn, this]()
|
||||
{
|
||||
this->AddNTCPSession (conn);
|
||||
});
|
||||
conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port),
|
||||
std::bind (&NTCPServer::HandleConnect, this, std::placeholders::_1, conn));
|
||||
}
|
||||
|
||||
void NTCPServer::HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCPSession> conn)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
LogPrint (eLogError, "Connect error: ", ecode.message ());
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ().GetIdentHash (), true);
|
||||
conn->Terminate ();
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint (eLogInfo, "Connected to ", conn->GetSocket ().remote_endpoint ());
|
||||
if (conn->GetSocket ().local_endpoint ().protocol () == boost::asio::ip::tcp::v6()) // ipv6
|
||||
context.UpdateNTCPV6Address (conn->GetSocket ().local_endpoint ().address ());
|
||||
conn->ClientLogin ();
|
||||
}
|
||||
}
|
||||
|
||||
void NTCPServer::Ban (const boost::asio::ip::address& addr)
|
||||
{
|
||||
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
|
||||
m_BanList[addr] = ts + NTCP_BAN_EXPIRATION_TIMEOUT;
|
||||
LogPrint (eLogInfo, addr, " has been banned for ", NTCP_BAN_EXPIRATION_TIMEOUT, " seconds");
|
||||
}
|
||||
}
|
||||
}
|
183
core/transport/NTCPSession.h
Normal file
183
core/transport/NTCPSession.h
Normal file
|
@ -0,0 +1,183 @@
|
|||
#ifndef NTCP_SESSION_H__
|
||||
#define NTCP_SESSION_H__
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <boost/asio.hpp>
|
||||
#include <cryptopp/modes.h>
|
||||
#include <cryptopp/aes.h>
|
||||
#include "crypto/aes.h"
|
||||
#include "Identity.h"
|
||||
#include "RouterInfo.h"
|
||||
#include "I2NPProtocol.h"
|
||||
#include "TransportSession.h"
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
|
||||
#pragma pack(1)
|
||||
struct NTCPPhase1
|
||||
{
|
||||
uint8_t pubKey[256];
|
||||
uint8_t HXxorHI[32];
|
||||
};
|
||||
|
||||
struct NTCPPhase2
|
||||
{
|
||||
uint8_t pubKey[256];
|
||||
struct
|
||||
{
|
||||
uint8_t hxy[32];
|
||||
uint32_t timestamp;
|
||||
uint8_t filler[12];
|
||||
} encrypted;
|
||||
};
|
||||
|
||||
#pragma pack()
|
||||
|
||||
const size_t NTCP_MAX_MESSAGE_SIZE = 16384;
|
||||
const size_t NTCP_BUFFER_SIZE = 4160; // fits 4 tunnel messages (4*1028)
|
||||
const int NTCP_TERMINATION_TIMEOUT = 120; // 2 minutes
|
||||
const size_t NTCP_DEFAULT_PHASE3_SIZE = 2/*size*/ + i2p::data::DEFAULT_IDENTITY_SIZE/*387*/ + 4/*ts*/ + 15/*padding*/ + 40/*signature*/; // 448
|
||||
const int NTCP_BAN_EXPIRATION_TIMEOUT = 70; // in second
|
||||
|
||||
class NTCPServer;
|
||||
class NTCPSession: public TransportSession, public std::enable_shared_from_this<NTCPSession>
|
||||
{
|
||||
public:
|
||||
|
||||
NTCPSession (NTCPServer& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter = nullptr);
|
||||
~NTCPSession ();
|
||||
void Terminate ();
|
||||
void Done ();
|
||||
|
||||
boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; };
|
||||
bool IsEstablished () const { return m_IsEstablished; };
|
||||
|
||||
void ClientLogin ();
|
||||
void ServerLogin ();
|
||||
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs);
|
||||
|
||||
private:
|
||||
|
||||
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
|
||||
void Connected ();
|
||||
void SendTimeSyncMessage ();
|
||||
void SetIsEstablished (bool isEstablished) { m_IsEstablished = isEstablished; }
|
||||
|
||||
void CreateAESKey (uint8_t * pubKey, i2p::crypto::AESKey& key);
|
||||
|
||||
// client
|
||||
void SendPhase3 ();
|
||||
void HandlePhase1Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred);
|
||||
void HandlePhase2Received (const boost::system::error_code& ecode, std::size_t bytes_transferred);
|
||||
void HandlePhase3Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA);
|
||||
void HandlePhase4Received (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsA);
|
||||
|
||||
//server
|
||||
void SendPhase2 ();
|
||||
void SendPhase4 (uint32_t tsA, uint32_t tsB);
|
||||
void HandlePhase1Received (const boost::system::error_code& ecode, std::size_t bytes_transferred);
|
||||
void HandlePhase2Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB);
|
||||
void HandlePhase3Received (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB);
|
||||
void HandlePhase3ExtraReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint32_t tsB, size_t paddingLen);
|
||||
void HandlePhase3 (uint32_t tsB, size_t paddingLen);
|
||||
void HandlePhase4Sent (const boost::system::error_code& ecode, std::size_t bytes_transferred);
|
||||
|
||||
// common
|
||||
void Receive ();
|
||||
void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred);
|
||||
bool DecryptNextBlock (const uint8_t * encrypted);
|
||||
|
||||
void Send (std::shared_ptr<i2p::I2NPMessage> msg);
|
||||
boost::asio::const_buffers_1 CreateMsgBuffer (std::shared_ptr<I2NPMessage> msg);
|
||||
void Send (const std::vector<std::shared_ptr<I2NPMessage> >& msgs);
|
||||
void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector<std::shared_ptr<I2NPMessage> > msgs);
|
||||
|
||||
|
||||
// timer
|
||||
void ScheduleTermination ();
|
||||
void HandleTerminationTimer (const boost::system::error_code& ecode);
|
||||
|
||||
private:
|
||||
|
||||
NTCPServer& m_Server;
|
||||
boost::asio::ip::tcp::socket m_Socket;
|
||||
boost::asio::deadline_timer m_TerminationTimer;
|
||||
bool m_IsEstablished, m_IsTerminated;
|
||||
|
||||
i2p::crypto::CBCDecryption m_Decryption;
|
||||
i2p::crypto::CBCEncryption m_Encryption;
|
||||
|
||||
struct Establisher
|
||||
{
|
||||
NTCPPhase1 phase1;
|
||||
NTCPPhase2 phase2;
|
||||
} * m_Establisher;
|
||||
|
||||
i2p::crypto::AESAlignedBuffer<NTCP_BUFFER_SIZE + 16> m_ReceiveBuffer;
|
||||
i2p::crypto::AESAlignedBuffer<16> m_TimeSyncBuffer;
|
||||
int m_ReceiveBufferOffset;
|
||||
|
||||
std::shared_ptr<I2NPMessage> m_NextMessage;
|
||||
size_t m_NextMessageOffset;
|
||||
i2p::I2NPMessagesHandler m_Handler;
|
||||
|
||||
bool m_IsSending;
|
||||
std::vector<std::shared_ptr<I2NPMessage> > m_SendQueue;
|
||||
|
||||
boost::asio::ip::address m_ConnectedFrom; // for ban
|
||||
};
|
||||
|
||||
// TODO: move to NTCP.h/.cpp
|
||||
class NTCPServer
|
||||
{
|
||||
public:
|
||||
|
||||
NTCPServer (int port);
|
||||
~NTCPServer ();
|
||||
|
||||
void Start ();
|
||||
void Stop ();
|
||||
|
||||
void AddNTCPSession (std::shared_ptr<NTCPSession> session);
|
||||
void RemoveNTCPSession (std::shared_ptr<NTCPSession> session);
|
||||
std::shared_ptr<NTCPSession> FindNTCPSession (const i2p::data::IdentHash& ident);
|
||||
void Connect (const boost::asio::ip::address& address, int port, std::shared_ptr<NTCPSession> conn);
|
||||
|
||||
boost::asio::io_service& GetService () { return m_Service; };
|
||||
void Ban (const boost::asio::ip::address& addr);
|
||||
|
||||
private:
|
||||
|
||||
void Run ();
|
||||
void HandleAccept (std::shared_ptr<NTCPSession> conn, const boost::system::error_code& error);
|
||||
void HandleAcceptV6 (std::shared_ptr<NTCPSession> conn, const boost::system::error_code& error);
|
||||
|
||||
void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCPSession> conn);
|
||||
|
||||
private:
|
||||
|
||||
bool m_IsRunning;
|
||||
std::thread * m_Thread;
|
||||
boost::asio::io_service m_Service;
|
||||
boost::asio::io_service::work m_Work;
|
||||
boost::asio::ip::tcp::acceptor * m_NTCPAcceptor, * m_NTCPV6Acceptor;
|
||||
std::mutex m_NTCPSessionsMutex;
|
||||
std::map<i2p::data::IdentHash, std::shared_ptr<NTCPSession> > m_NTCPSessions;
|
||||
std::map<boost::asio::ip::address, uint32_t> m_BanList; // IP -> ban expiration time in seconds
|
||||
|
||||
public:
|
||||
|
||||
// for HTTP/I2PControl
|
||||
const decltype(m_NTCPSessions)& GetNTCPSessions () const { return m_NTCPSessions; };
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
556
core/transport/SSU.cpp
Normal file
556
core/transport/SSU.cpp
Normal file
|
@ -0,0 +1,556 @@
|
|||
#include <string.h>
|
||||
#include <boost/bind.hpp>
|
||||
#include "util/Log.h"
|
||||
#include "util/Timestamp.h"
|
||||
#include "RouterContext.h"
|
||||
#include "NetDb.h"
|
||||
#include "SSU.h"
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
SSUServer::SSUServer (int port): m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr),
|
||||
m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService),
|
||||
m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port),
|
||||
m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService),
|
||||
m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service)
|
||||
{
|
||||
m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535));
|
||||
m_Socket.set_option (boost::asio::socket_base::send_buffer_size (65535));
|
||||
if (context.SupportsV6 ())
|
||||
{
|
||||
m_SocketV6.open (boost::asio::ip::udp::v6());
|
||||
m_SocketV6.set_option (boost::asio::ip::v6_only (true));
|
||||
m_SocketV6.set_option (boost::asio::socket_base::receive_buffer_size (65535));
|
||||
m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (65535));
|
||||
m_SocketV6.bind (m_EndpointV6);
|
||||
}
|
||||
}
|
||||
|
||||
SSUServer::~SSUServer ()
|
||||
{
|
||||
}
|
||||
|
||||
void SSUServer::Start ()
|
||||
{
|
||||
m_IsRunning = true;
|
||||
m_ReceiversThread = new std::thread (std::bind (&SSUServer::RunReceivers, this));
|
||||
m_Thread = new std::thread (std::bind (&SSUServer::Run, this));
|
||||
m_ReceiversService.post (std::bind (&SSUServer::Receive, this));
|
||||
if (context.SupportsV6 ())
|
||||
{
|
||||
m_ThreadV6 = new std::thread (std::bind (&SSUServer::RunV6, this));
|
||||
m_ReceiversService.post (std::bind (&SSUServer::ReceiveV6, this));
|
||||
}
|
||||
SchedulePeerTestsCleanupTimer ();
|
||||
ScheduleIntroducersUpdateTimer (); // wait for 30 seconds and decide if we need introducers
|
||||
}
|
||||
|
||||
void SSUServer::Stop ()
|
||||
{
|
||||
DeleteAllSessions ();
|
||||
m_IsRunning = false;
|
||||
m_Service.stop ();
|
||||
m_Socket.close ();
|
||||
m_ServiceV6.stop ();
|
||||
m_SocketV6.close ();
|
||||
m_ReceiversService.stop ();
|
||||
if (m_ReceiversThread)
|
||||
{
|
||||
m_ReceiversThread->join ();
|
||||
delete m_ReceiversThread;
|
||||
m_ReceiversThread = nullptr;
|
||||
}
|
||||
if (m_Thread)
|
||||
{
|
||||
m_Thread->join ();
|
||||
delete m_Thread;
|
||||
m_Thread = nullptr;
|
||||
}
|
||||
if (m_ThreadV6)
|
||||
{
|
||||
m_ThreadV6->join ();
|
||||
delete m_ThreadV6;
|
||||
m_ThreadV6 = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::Run ()
|
||||
{
|
||||
while (m_IsRunning)
|
||||
{
|
||||
try
|
||||
{
|
||||
m_Service.run ();
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LogPrint (eLogError, "SSU server: ", ex.what ());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::RunV6 ()
|
||||
{
|
||||
while (m_IsRunning)
|
||||
{
|
||||
try
|
||||
{
|
||||
m_ServiceV6.run ();
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LogPrint (eLogError, "SSU V6 server: ", ex.what ());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::RunReceivers ()
|
||||
{
|
||||
while (m_IsRunning)
|
||||
{
|
||||
try
|
||||
{
|
||||
m_ReceiversService.run ();
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LogPrint (eLogError, "SSU receivers: ", ex.what ());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::AddRelay (uint32_t tag, const boost::asio::ip::udp::endpoint& relay)
|
||||
{
|
||||
m_Relays[tag] = relay;
|
||||
}
|
||||
|
||||
std::shared_ptr<SSUSession> SSUServer::FindRelaySession (uint32_t tag)
|
||||
{
|
||||
auto it = m_Relays.find (tag);
|
||||
if (it != m_Relays.end ())
|
||||
return FindSession (it->second);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void SSUServer::Send (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& to)
|
||||
{
|
||||
if (to.protocol () == boost::asio::ip::udp::v4())
|
||||
m_Socket.send_to (boost::asio::buffer (buf, len), to);
|
||||
else
|
||||
m_SocketV6.send_to (boost::asio::buffer (buf, len), to);
|
||||
}
|
||||
|
||||
void SSUServer::Receive ()
|
||||
{
|
||||
SSUPacket * packet = new SSUPacket ();
|
||||
m_Socket.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from,
|
||||
std::bind (&SSUServer::HandleReceivedFrom, this, std::placeholders::_1, std::placeholders::_2, packet));
|
||||
}
|
||||
|
||||
void SSUServer::ReceiveV6 ()
|
||||
{
|
||||
SSUPacket * packet = new SSUPacket ();
|
||||
m_SocketV6.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from,
|
||||
std::bind (&SSUServer::HandleReceivedFromV6, this, std::placeholders::_1, std::placeholders::_2, packet));
|
||||
}
|
||||
|
||||
void SSUServer::HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet)
|
||||
{
|
||||
if (!ecode)
|
||||
{
|
||||
packet->len = bytes_transferred;
|
||||
std::vector<SSUPacket *> packets;
|
||||
packets.push_back (packet);
|
||||
|
||||
boost::system::error_code ec;
|
||||
size_t moreBytes = m_Socket.available(ec);
|
||||
while (moreBytes && packets.size () < 25)
|
||||
{
|
||||
packet = new SSUPacket ();
|
||||
packet->len = m_Socket.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from);
|
||||
packets.push_back (packet);
|
||||
moreBytes = m_Socket.available();
|
||||
}
|
||||
|
||||
m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets));
|
||||
Receive ();
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint ("SSU receive error: ", ecode.message ());
|
||||
delete packet;
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet)
|
||||
{
|
||||
if (!ecode)
|
||||
{
|
||||
packet->len = bytes_transferred;
|
||||
std::vector<SSUPacket *> packets;
|
||||
packets.push_back (packet);
|
||||
|
||||
size_t moreBytes = m_SocketV6.available ();
|
||||
while (moreBytes && packets.size () < 25)
|
||||
{
|
||||
packet = new SSUPacket ();
|
||||
packet->len = m_SocketV6.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from);
|
||||
packets.push_back (packet);
|
||||
moreBytes = m_SocketV6.available();
|
||||
}
|
||||
|
||||
m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets));
|
||||
ReceiveV6 ();
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint ("SSU V6 receive error: ", ecode.message ());
|
||||
delete packet;
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::HandleReceivedPackets (std::vector<SSUPacket *> packets)
|
||||
{
|
||||
std::shared_ptr<SSUSession> session;
|
||||
for (auto it1: packets)
|
||||
{
|
||||
auto packet = it1;
|
||||
try
|
||||
{
|
||||
if (!session || session->GetRemoteEndpoint () != packet->from) // we received packet for other session than previous
|
||||
{
|
||||
if (session) session->FlushData ();
|
||||
auto it = m_Sessions.find (packet->from);
|
||||
if (it != m_Sessions.end ())
|
||||
session = it->second;
|
||||
if (!session)
|
||||
{
|
||||
session = std::make_shared<SSUSession> (*this, packet->from);
|
||||
session->WaitForConnect ();
|
||||
{
|
||||
std::unique_lock<std::mutex> l(m_SessionsMutex);
|
||||
m_Sessions[packet->from] = session;
|
||||
}
|
||||
LogPrint (eLogInfo, "New SSU session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created");
|
||||
}
|
||||
}
|
||||
session->ProcessNextMessage (packet->buf, packet->len, packet->from);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LogPrint (eLogError, "SSU: HandleReceivedPackets ", ex.what ());
|
||||
if (session) session->FlushData ();
|
||||
session = nullptr;
|
||||
}
|
||||
delete packet;
|
||||
}
|
||||
if (session) session->FlushData ();
|
||||
}
|
||||
|
||||
std::shared_ptr<SSUSession> SSUServer::FindSession (std::shared_ptr<const i2p::data::RouterInfo> router) const
|
||||
{
|
||||
if (!router) return nullptr;
|
||||
auto address = router->GetSSUAddress (true); // v4 only
|
||||
if (!address) return nullptr;
|
||||
auto session = FindSession (boost::asio::ip::udp::endpoint (address->host, address->port));
|
||||
if (session || !context.SupportsV6 ())
|
||||
return session;
|
||||
// try v6
|
||||
address = router->GetSSUV6Address ();
|
||||
if (!address) return nullptr;
|
||||
return FindSession (boost::asio::ip::udp::endpoint (address->host, address->port));
|
||||
}
|
||||
|
||||
std::shared_ptr<SSUSession> SSUServer::FindSession (const boost::asio::ip::udp::endpoint& e) const
|
||||
{
|
||||
auto it = m_Sessions.find (e);
|
||||
if (it != m_Sessions.end ())
|
||||
return it->second;
|
||||
else
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr<SSUSession> SSUServer::GetSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest)
|
||||
{
|
||||
std::shared_ptr<SSUSession> session;
|
||||
if (router)
|
||||
{
|
||||
auto address = router->GetSSUAddress (!context.SupportsV6 ());
|
||||
if (address)
|
||||
{
|
||||
boost::asio::ip::udp::endpoint remoteEndpoint (address->host, address->port);
|
||||
auto it = m_Sessions.find (remoteEndpoint);
|
||||
if (it != m_Sessions.end ())
|
||||
session = it->second;
|
||||
else
|
||||
{
|
||||
// otherwise create new session
|
||||
session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest);
|
||||
{
|
||||
std::unique_lock<std::mutex> l(m_SessionsMutex);
|
||||
m_Sessions[remoteEndpoint] = session;
|
||||
}
|
||||
if (!router->UsesIntroducer ())
|
||||
{
|
||||
// connect directly
|
||||
LogPrint ("Creating new SSU session to [", router->GetIdentHashAbbreviation (), "] ",
|
||||
remoteEndpoint.address ().to_string (), ":", remoteEndpoint.port ());
|
||||
session->Connect ();
|
||||
}
|
||||
else
|
||||
{
|
||||
// connect through introducer
|
||||
int numIntroducers = address->introducers.size ();
|
||||
if (numIntroducers > 0)
|
||||
{
|
||||
std::shared_ptr<SSUSession> introducerSession;
|
||||
const i2p::data::RouterInfo::Introducer * introducer = nullptr;
|
||||
// we might have a session to introducer already
|
||||
for (int i = 0; i < numIntroducers; i++)
|
||||
{
|
||||
introducer = &(address->introducers[i]);
|
||||
it = m_Sessions.find (boost::asio::ip::udp::endpoint (introducer->iHost, introducer->iPort));
|
||||
if (it != m_Sessions.end ())
|
||||
{
|
||||
introducerSession = it->second;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (introducerSession) // session found
|
||||
LogPrint ("Session to introducer already exists");
|
||||
else // create new
|
||||
{
|
||||
LogPrint ("Creating new session to introducer");
|
||||
introducer = &(address->introducers[0]); // TODO:
|
||||
boost::asio::ip::udp::endpoint introducerEndpoint (introducer->iHost, introducer->iPort);
|
||||
introducerSession = std::make_shared<SSUSession> (*this, introducerEndpoint, router);
|
||||
std::unique_lock<std::mutex> l(m_SessionsMutex);
|
||||
m_Sessions[introducerEndpoint] = introducerSession;
|
||||
}
|
||||
// introduce
|
||||
LogPrint ("Introduce new SSU session to [", router->GetIdentHashAbbreviation (),
|
||||
"] through introducer ", introducer->iHost, ":", introducer->iPort);
|
||||
session->WaitForIntroduction ();
|
||||
if (i2p::context.GetRouterInfo ().UsesIntroducer ()) // if we are unreachable
|
||||
{
|
||||
uint8_t buf[1];
|
||||
Send (buf, 0, remoteEndpoint); // send HolePunch
|
||||
}
|
||||
introducerSession->Introduce (introducer->iTag, introducer->iKey);
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint (eLogWarning, "Can't connect to unreachable router. No introducers presented");
|
||||
std::unique_lock<std::mutex> l(m_SessionsMutex);
|
||||
m_Sessions.erase (remoteEndpoint);
|
||||
session.reset ();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
LogPrint (eLogWarning, "Router ", router->GetIdentHashAbbreviation (), " doesn't have SSU address");
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
void SSUServer::DeleteSession (std::shared_ptr<SSUSession> session)
|
||||
{
|
||||
if (session)
|
||||
{
|
||||
session->Close ();
|
||||
std::unique_lock<std::mutex> l(m_SessionsMutex);
|
||||
m_Sessions.erase (session->GetRemoteEndpoint ());
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::DeleteAllSessions ()
|
||||
{
|
||||
std::unique_lock<std::mutex> l(m_SessionsMutex);
|
||||
for (auto it: m_Sessions)
|
||||
it.second->Close ();
|
||||
m_Sessions.clear ();
|
||||
}
|
||||
|
||||
template<typename Filter>
|
||||
std::shared_ptr<SSUSession> SSUServer::GetRandomSession (Filter filter)
|
||||
{
|
||||
std::vector<std::shared_ptr<SSUSession> > filteredSessions;
|
||||
for (auto s :m_Sessions)
|
||||
if (filter (s.second)) filteredSessions.push_back (s.second);
|
||||
if (filteredSessions.size () > 0)
|
||||
{
|
||||
auto ind = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (0, filteredSessions.size ()-1);
|
||||
return filteredSessions[ind];
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr<SSUSession> SSUServer::GetRandomEstablishedSession (std::shared_ptr<const SSUSession> excluded)
|
||||
{
|
||||
return GetRandomSession (
|
||||
[excluded](std::shared_ptr<SSUSession> session)->bool
|
||||
{
|
||||
return session->GetState () == eSessionStateEstablished && !session->IsV6 () &&
|
||||
session != excluded;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
std::set<SSUSession *> SSUServer::FindIntroducers (int maxNumIntroducers)
|
||||
{
|
||||
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
|
||||
std::set<SSUSession *> ret;
|
||||
for (int i = 0; i < maxNumIntroducers; i++)
|
||||
{
|
||||
auto session = GetRandomSession (
|
||||
[&ret, ts](std::shared_ptr<SSUSession> session)->bool
|
||||
{
|
||||
return session->GetRelayTag () && !ret.count (session.get ()) &&
|
||||
session->GetState () == eSessionStateEstablished &&
|
||||
ts < session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_DURATION;
|
||||
}
|
||||
);
|
||||
if (session)
|
||||
{
|
||||
ret.insert (session.get ());
|
||||
break;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void SSUServer::ScheduleIntroducersUpdateTimer ()
|
||||
{
|
||||
m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds(SSU_KEEP_ALIVE_INTERVAL));
|
||||
m_IntroducersUpdateTimer.async_wait (std::bind (&SSUServer::HandleIntroducersUpdateTimer,
|
||||
this, std::placeholders::_1));
|
||||
}
|
||||
|
||||
void SSUServer::HandleIntroducersUpdateTimer (const boost::system::error_code& ecode)
|
||||
{
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
{
|
||||
// timeout expired
|
||||
if (i2p::context.GetStatus () == eRouterStatusTesting)
|
||||
{
|
||||
// we still don't know if we need introducers
|
||||
ScheduleIntroducersUpdateTimer ();
|
||||
return;
|
||||
}
|
||||
if (i2p::context.GetStatus () == eRouterStatusOK) return; // we don't need introducers anymore
|
||||
// we are firewalled
|
||||
if (!i2p::context.IsUnreachable ()) i2p::context.SetUnreachable ();
|
||||
std::list<boost::asio::ip::udp::endpoint> newList;
|
||||
size_t numIntroducers = 0;
|
||||
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
|
||||
for (auto it :m_Introducers)
|
||||
{
|
||||
auto session = FindSession (it);
|
||||
if (session && ts < session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_DURATION)
|
||||
{
|
||||
session->SendKeepAlive ();
|
||||
newList.push_back (it);
|
||||
numIntroducers++;
|
||||
}
|
||||
else
|
||||
i2p::context.RemoveIntroducer (it);
|
||||
}
|
||||
|
||||
if (numIntroducers < SSU_MAX_NUM_INTRODUCERS)
|
||||
{
|
||||
// create new
|
||||
auto introducers = FindIntroducers (SSU_MAX_NUM_INTRODUCERS);
|
||||
if (introducers.size () > 0)
|
||||
{
|
||||
for (auto it1: introducers)
|
||||
{
|
||||
auto router = it1->GetRemoteRouter ();
|
||||
if (router && i2p::context.AddIntroducer (*router, it1->GetRelayTag ()))
|
||||
{
|
||||
newList.push_back (it1->GetRemoteEndpoint ());
|
||||
if (newList.size () >= SSU_MAX_NUM_INTRODUCERS) break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
m_Introducers = newList;
|
||||
if (m_Introducers.empty ())
|
||||
{
|
||||
auto introducer = i2p::data::netdb.GetRandomIntroducer ();
|
||||
if (introducer)
|
||||
GetSession (introducer);
|
||||
}
|
||||
ScheduleIntroducersUpdateTimer ();
|
||||
}
|
||||
}
|
||||
|
||||
void SSUServer::NewPeerTest (uint32_t nonce, PeerTestParticipant role, std::shared_ptr<SSUSession> session)
|
||||
{
|
||||
m_PeerTests[nonce] = { i2p::util::GetMillisecondsSinceEpoch (), role, session };
|
||||
}
|
||||
|
||||
PeerTestParticipant SSUServer::GetPeerTestParticipant (uint32_t nonce)
|
||||
{
|
||||
auto it = m_PeerTests.find (nonce);
|
||||
if (it != m_PeerTests.end ())
|
||||
return it->second.role;
|
||||
else
|
||||
return ePeerTestParticipantUnknown;
|
||||
}
|
||||
|
||||
std::shared_ptr<SSUSession> SSUServer::GetPeerTestSession (uint32_t nonce)
|
||||
{
|
||||
auto it = m_PeerTests.find (nonce);
|
||||
if (it != m_PeerTests.end ())
|
||||
return it->second.session;
|
||||
else
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void SSUServer::UpdatePeerTest (uint32_t nonce, PeerTestParticipant role)
|
||||
{
|
||||
auto it = m_PeerTests.find (nonce);
|
||||
if (it != m_PeerTests.end ())
|
||||
it->second.role = role;
|
||||
}
|
||||
|
||||
void SSUServer::RemovePeerTest (uint32_t nonce)
|
||||
{
|
||||
m_PeerTests.erase (nonce);
|
||||
}
|
||||
|
||||
void SSUServer::SchedulePeerTestsCleanupTimer ()
|
||||
{
|
||||
m_PeerTestsCleanupTimer.expires_from_now (boost::posix_time::seconds(SSU_PEER_TEST_TIMEOUT));
|
||||
m_PeerTestsCleanupTimer.async_wait (std::bind (&SSUServer::HandlePeerTestsCleanupTimer,
|
||||
this, std::placeholders::_1));
|
||||
}
|
||||
|
||||
void SSUServer::HandlePeerTestsCleanupTimer (const boost::system::error_code& ecode)
|
||||
{
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
{
|
||||
int numDeleted = 0;
|
||||
uint64_t ts = i2p::util::GetMillisecondsSinceEpoch ();
|
||||
for (auto it = m_PeerTests.begin (); it != m_PeerTests.end ();)
|
||||
{
|
||||
if (ts > it->second.creationTime + SSU_PEER_TEST_TIMEOUT*1000LL)
|
||||
{
|
||||
numDeleted++;
|
||||
it = m_PeerTests.erase (it);
|
||||
}
|
||||
else
|
||||
it++;
|
||||
}
|
||||
if (numDeleted > 0)
|
||||
LogPrint (eLogInfo, numDeleted, " peer tests have been expired");
|
||||
SchedulePeerTestsCleanupTimer ();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
114
core/transport/SSU.h
Normal file
114
core/transport/SSU.h
Normal file
|
@ -0,0 +1,114 @@
|
|||
#ifndef SSU_H__
|
||||
#define SSU_H__
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <string.h>
|
||||
#include <map>
|
||||
#include <list>
|
||||
#include <set>
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <boost/asio.hpp>
|
||||
#include "crypto/aes.h"
|
||||
#include "util/I2PEndian.h"
|
||||
#include "Identity.h"
|
||||
#include "RouterInfo.h"
|
||||
#include "I2NPProtocol.h"
|
||||
#include "SSUSession.h"
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
const int SSU_KEEP_ALIVE_INTERVAL = 30; // 30 seconds
|
||||
const int SSU_PEER_TEST_TIMEOUT = 60; // 60 seconds
|
||||
const int SSU_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour
|
||||
const size_t SSU_MAX_NUM_INTRODUCERS = 3;
|
||||
|
||||
struct SSUPacket
|
||||
{
|
||||
i2p::crypto::AESAlignedBuffer<1500> buf;
|
||||
boost::asio::ip::udp::endpoint from;
|
||||
size_t len;
|
||||
};
|
||||
|
||||
class SSUServer
|
||||
{
|
||||
public:
|
||||
|
||||
SSUServer (int port);
|
||||
~SSUServer ();
|
||||
void Start ();
|
||||
void Stop ();
|
||||
std::shared_ptr<SSUSession> GetSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest = false);
|
||||
std::shared_ptr<SSUSession> FindSession (std::shared_ptr<const i2p::data::RouterInfo> router) const;
|
||||
std::shared_ptr<SSUSession> FindSession (const boost::asio::ip::udp::endpoint& e) const;
|
||||
std::shared_ptr<SSUSession> GetRandomEstablishedSession (std::shared_ptr<const SSUSession> excluded);
|
||||
void DeleteSession (std::shared_ptr<SSUSession> session);
|
||||
void DeleteAllSessions ();
|
||||
|
||||
boost::asio::io_service& GetService () { return m_Service; };
|
||||
boost::asio::io_service& GetServiceV6 () { return m_ServiceV6; };
|
||||
const boost::asio::ip::udp::endpoint& GetEndpoint () const { return m_Endpoint; };
|
||||
void Send (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& to);
|
||||
void AddRelay (uint32_t tag, const boost::asio::ip::udp::endpoint& relay);
|
||||
std::shared_ptr<SSUSession> FindRelaySession (uint32_t tag);
|
||||
|
||||
void NewPeerTest (uint32_t nonce, PeerTestParticipant role, std::shared_ptr<SSUSession> session = nullptr);
|
||||
PeerTestParticipant GetPeerTestParticipant (uint32_t nonce);
|
||||
std::shared_ptr<SSUSession> GetPeerTestSession (uint32_t nonce);
|
||||
void UpdatePeerTest (uint32_t nonce, PeerTestParticipant role);
|
||||
void RemovePeerTest (uint32_t nonce);
|
||||
|
||||
private:
|
||||
|
||||
void Run ();
|
||||
void RunV6 ();
|
||||
void RunReceivers ();
|
||||
void Receive ();
|
||||
void ReceiveV6 ();
|
||||
void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet);
|
||||
void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet);
|
||||
void HandleReceivedPackets (std::vector<SSUPacket *> packets);
|
||||
|
||||
template<typename Filter>
|
||||
std::shared_ptr<SSUSession> GetRandomSession (Filter filter);
|
||||
|
||||
std::set<SSUSession *> FindIntroducers (int maxNumIntroducers);
|
||||
void ScheduleIntroducersUpdateTimer ();
|
||||
void HandleIntroducersUpdateTimer (const boost::system::error_code& ecode);
|
||||
|
||||
void SchedulePeerTestsCleanupTimer ();
|
||||
void HandlePeerTestsCleanupTimer (const boost::system::error_code& ecode);
|
||||
|
||||
private:
|
||||
|
||||
struct PeerTest
|
||||
{
|
||||
uint64_t creationTime;
|
||||
PeerTestParticipant role;
|
||||
std::shared_ptr<SSUSession> session; // for Bob to Alice
|
||||
};
|
||||
|
||||
bool m_IsRunning;
|
||||
std::thread * m_Thread, * m_ThreadV6, * m_ReceiversThread;
|
||||
boost::asio::io_service m_Service, m_ServiceV6, m_ReceiversService;
|
||||
boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork;
|
||||
boost::asio::ip::udp::endpoint m_Endpoint, m_EndpointV6;
|
||||
boost::asio::ip::udp::socket m_Socket, m_SocketV6;
|
||||
boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer;
|
||||
std::list<boost::asio::ip::udp::endpoint> m_Introducers; // introducers we are connected to
|
||||
mutable std::mutex m_SessionsMutex;
|
||||
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions;
|
||||
std::map<uint32_t, boost::asio::ip::udp::endpoint> m_Relays; // we are introducer
|
||||
std::map<uint32_t, PeerTest> m_PeerTests; // nonce -> creation time in milliseconds
|
||||
|
||||
public:
|
||||
// for HTTP only
|
||||
const decltype(m_Sessions)& GetSessions () const { return m_Sessions; };
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
507
core/transport/SSUData.cpp
Normal file
507
core/transport/SSUData.cpp
Normal file
|
@ -0,0 +1,507 @@
|
|||
#include <stdlib.h>
|
||||
#include <boost/bind.hpp>
|
||||
#include "util/Log.h"
|
||||
#include "util/Timestamp.h"
|
||||
#include "NetDb.h"
|
||||
#include "SSU.h"
|
||||
#include "SSUData.h"
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
void IncompleteMessage::AttachNextFragment (const uint8_t * fragment, size_t fragmentSize)
|
||||
{
|
||||
if (msg->len + fragmentSize > msg->maxLen)
|
||||
{
|
||||
LogPrint (eLogInfo, "SSU I2NP message size ", msg->maxLen, " is not enough");
|
||||
auto newMsg = ToSharedI2NPMessage(NewI2NPMessage ());
|
||||
*newMsg = *msg;
|
||||
msg = newMsg;
|
||||
}
|
||||
memcpy (msg->buf + msg->len, fragment, fragmentSize);
|
||||
msg->len += fragmentSize;
|
||||
nextFragmentNum++;
|
||||
}
|
||||
|
||||
SSUData::SSUData (SSUSession& session):
|
||||
m_Session (session), m_ResendTimer (session.GetService ()), m_DecayTimer (session.GetService ()),
|
||||
m_IncompleteMessagesCleanupTimer (session.GetService ())
|
||||
{
|
||||
m_MaxPacketSize = session.IsV6 () ? SSU_V6_MAX_PACKET_SIZE : SSU_V4_MAX_PACKET_SIZE;
|
||||
m_PacketSize = m_MaxPacketSize;
|
||||
auto remoteRouter = session.GetRemoteRouter ();
|
||||
if (remoteRouter)
|
||||
AdjustPacketSize (*remoteRouter);
|
||||
}
|
||||
|
||||
SSUData::~SSUData ()
|
||||
{
|
||||
}
|
||||
|
||||
void SSUData::Start ()
|
||||
{
|
||||
ScheduleIncompleteMessagesCleanup ();
|
||||
}
|
||||
|
||||
void SSUData::Stop ()
|
||||
{
|
||||
m_ResendTimer.cancel ();
|
||||
m_DecayTimer.cancel ();
|
||||
m_IncompleteMessagesCleanupTimer.cancel ();
|
||||
}
|
||||
|
||||
void SSUData::AdjustPacketSize (const i2p::data::RouterInfo& remoteRouter)
|
||||
{
|
||||
auto ssuAddress = remoteRouter.GetSSUAddress ();
|
||||
if (ssuAddress && ssuAddress->mtu)
|
||||
{
|
||||
if (m_Session.IsV6 ())
|
||||
m_PacketSize = ssuAddress->mtu - IPV6_HEADER_SIZE - UDP_HEADER_SIZE;
|
||||
else
|
||||
m_PacketSize = ssuAddress->mtu - IPV4_HEADER_SIZE - UDP_HEADER_SIZE;
|
||||
if (m_PacketSize > 0)
|
||||
{
|
||||
// make sure packet size multiple of 16
|
||||
m_PacketSize >>= 4;
|
||||
m_PacketSize <<= 4;
|
||||
if (m_PacketSize > m_MaxPacketSize) m_PacketSize = m_MaxPacketSize;
|
||||
LogPrint ("MTU=", ssuAddress->mtu, " packet size=", m_PacketSize);
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint (eLogWarning, "Unexpected MTU ", ssuAddress->mtu);
|
||||
m_PacketSize = m_MaxPacketSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SSUData::UpdatePacketSize (const i2p::data::IdentHash& remoteIdent)
|
||||
{
|
||||
auto routerInfo = i2p::data::netdb.FindRouter (remoteIdent);
|
||||
if (routerInfo)
|
||||
AdjustPacketSize (*routerInfo);
|
||||
}
|
||||
|
||||
void SSUData::ProcessSentMessageAck (uint32_t msgID)
|
||||
{
|
||||
auto it = m_SentMessages.find (msgID);
|
||||
if (it != m_SentMessages.end ())
|
||||
{
|
||||
m_SentMessages.erase (it);
|
||||
if (m_SentMessages.empty ())
|
||||
m_ResendTimer.cancel ();
|
||||
}
|
||||
}
|
||||
|
||||
void SSUData::ProcessAcks (uint8_t *& buf, uint8_t flag)
|
||||
{
|
||||
if (flag & DATA_FLAG_EXPLICIT_ACKS_INCLUDED)
|
||||
{
|
||||
// explicit ACKs
|
||||
uint8_t numAcks =*buf;
|
||||
buf++;
|
||||
for (int i = 0; i < numAcks; i++)
|
||||
ProcessSentMessageAck (bufbe32toh (buf+i*4));
|
||||
buf += numAcks*4;
|
||||
}
|
||||
if (flag & DATA_FLAG_ACK_BITFIELDS_INCLUDED)
|
||||
{
|
||||
// explicit ACK bitfields
|
||||
uint8_t numBitfields =*buf;
|
||||
buf++;
|
||||
for (int i = 0; i < numBitfields; i++)
|
||||
{
|
||||
uint32_t msgID = bufbe32toh (buf);
|
||||
buf += 4; // msgID
|
||||
auto it = m_SentMessages.find (msgID);
|
||||
// process individual Ack bitfields
|
||||
bool isNonLast = false;
|
||||
int fragment = 0;
|
||||
do
|
||||
{
|
||||
uint8_t bitfield = *buf;
|
||||
isNonLast = bitfield & 0x80;
|
||||
bitfield &= 0x7F; // clear MSB
|
||||
if (bitfield && it != m_SentMessages.end ())
|
||||
{
|
||||
int numSentFragments = it->second->fragments.size ();
|
||||
// process bits
|
||||
uint8_t mask = 0x01;
|
||||
for (int j = 0; j < 7; j++)
|
||||
{
|
||||
if (bitfield & mask)
|
||||
{
|
||||
if (fragment < numSentFragments)
|
||||
it->second->fragments[fragment].reset (nullptr);
|
||||
}
|
||||
fragment++;
|
||||
mask <<= 1;
|
||||
}
|
||||
}
|
||||
buf++;
|
||||
}
|
||||
while (isNonLast);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SSUData::ProcessFragments (uint8_t * buf)
|
||||
{
|
||||
uint8_t numFragments = *buf; // number of fragments
|
||||
buf++;
|
||||
for (int i = 0; i < numFragments; i++)
|
||||
{
|
||||
uint32_t msgID = bufbe32toh (buf); // message ID
|
||||
buf += 4;
|
||||
uint8_t frag[4];
|
||||
frag[0] = 0;
|
||||
memcpy (frag + 1, buf, 3);
|
||||
buf += 3;
|
||||
uint32_t fragmentInfo = bufbe32toh (frag); // fragment info
|
||||
uint16_t fragmentSize = fragmentInfo & 0x1FFF; // bits 0 - 13
|
||||
bool isLast = fragmentInfo & 0x010000; // bit 16
|
||||
uint8_t fragmentNum = fragmentInfo >> 17; // bits 23 - 17
|
||||
if (fragmentSize >= SSU_V4_MAX_PACKET_SIZE)
|
||||
{
|
||||
LogPrint (eLogError, "Fragment size ", fragmentSize, "exceeds max SSU packet size");
|
||||
return;
|
||||
}
|
||||
|
||||
// find message with msgID
|
||||
auto it = m_IncompleteMessages.find (msgID);
|
||||
if (it == m_IncompleteMessages.end ())
|
||||
{
|
||||
// create new message
|
||||
auto msg = ToSharedI2NPMessage (NewI2NPShortMessage ());
|
||||
msg->len -= I2NP_SHORT_HEADER_SIZE;
|
||||
it = m_IncompleteMessages.insert (std::make_pair (msgID,
|
||||
std::unique_ptr<IncompleteMessage>(new IncompleteMessage (msg)))).first;
|
||||
}
|
||||
std::unique_ptr<IncompleteMessage>& incompleteMessage = it->second;
|
||||
|
||||
// handle current fragment
|
||||
if (fragmentNum == incompleteMessage->nextFragmentNum)
|
||||
{
|
||||
// expected fragment
|
||||
incompleteMessage->AttachNextFragment (buf, fragmentSize);
|
||||
if (!isLast && !incompleteMessage->savedFragments.empty ())
|
||||
{
|
||||
// try saved fragments
|
||||
for (auto it1 = incompleteMessage->savedFragments.begin (); it1 != incompleteMessage->savedFragments.end ();)
|
||||
{
|
||||
auto& savedFragment = *it1;
|
||||
if (savedFragment->fragmentNum == incompleteMessage->nextFragmentNum)
|
||||
{
|
||||
incompleteMessage->AttachNextFragment (savedFragment->buf, savedFragment->len);
|
||||
isLast = savedFragment->isLast;
|
||||
incompleteMessage->savedFragments.erase (it1++);
|
||||
}
|
||||
else
|
||||
break;
|
||||
}
|
||||
if (isLast)
|
||||
LogPrint (eLogDebug, "Message ", msgID, " complete");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (fragmentNum < incompleteMessage->nextFragmentNum)
|
||||
// duplicate fragment
|
||||
LogPrint (eLogWarning, "Duplicate fragment ", (int)fragmentNum, " of message ", msgID, ". Ignored");
|
||||
else
|
||||
{
|
||||
// missing fragment
|
||||
LogPrint (eLogWarning, "Missing fragments from ", (int)incompleteMessage->nextFragmentNum, " to ", fragmentNum - 1, " of message ", msgID);
|
||||
auto savedFragment = new Fragment (fragmentNum, buf, fragmentSize, isLast);
|
||||
if (incompleteMessage->savedFragments.insert (std::unique_ptr<Fragment>(savedFragment)).second)
|
||||
incompleteMessage->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch ();
|
||||
else
|
||||
LogPrint (eLogWarning, "Fragment ", (int)fragmentNum, " of message ", msgID, " already saved");
|
||||
}
|
||||
isLast = false;
|
||||
}
|
||||
|
||||
if (isLast)
|
||||
{
|
||||
// delete incomplete message
|
||||
auto msg = incompleteMessage->msg;
|
||||
incompleteMessage->msg = nullptr;
|
||||
m_IncompleteMessages.erase (msgID);
|
||||
// process message
|
||||
SendMsgAck (msgID);
|
||||
msg->FromSSU (msgID);
|
||||
if (m_Session.GetState () == eSessionStateEstablished)
|
||||
{
|
||||
if (!m_ReceivedMessages.count (msgID))
|
||||
{
|
||||
if (m_ReceivedMessages.size () > MAX_NUM_RECEIVED_MESSAGES)
|
||||
m_ReceivedMessages.clear ();
|
||||
else
|
||||
ScheduleDecay ();
|
||||
m_ReceivedMessages.insert (msgID);
|
||||
m_Handler.PutNextMessage (msg);
|
||||
}
|
||||
else
|
||||
LogPrint (eLogWarning, "SSU message ", msgID, " already received");
|
||||
}
|
||||
else
|
||||
{
|
||||
// we expect DeliveryStatus
|
||||
if (msg->GetTypeID () == eI2NPDeliveryStatus)
|
||||
{
|
||||
LogPrint ("SSU session established");
|
||||
m_Session.Established ();
|
||||
}
|
||||
else
|
||||
LogPrint (eLogError, "SSU unexpected message ", (int)msg->GetTypeID ());
|
||||
}
|
||||
}
|
||||
else
|
||||
SendFragmentAck (msgID, fragmentNum);
|
||||
buf += fragmentSize;
|
||||
}
|
||||
}
|
||||
|
||||
void SSUData::FlushReceivedMessage ()
|
||||
{
|
||||
m_Handler.Flush ();
|
||||
}
|
||||
|
||||
void SSUData::ProcessMessage (uint8_t * buf, size_t len)
|
||||
{
|
||||
//uint8_t * start = buf;
|
||||
uint8_t flag = *buf;
|
||||
buf++;
|
||||
LogPrint (eLogDebug, "Process SSU data flags=", (int)flag, " len=", len);
|
||||
// process acks if presented
|
||||
if (flag & (DATA_FLAG_ACK_BITFIELDS_INCLUDED | DATA_FLAG_EXPLICIT_ACKS_INCLUDED))
|
||||
ProcessAcks (buf, flag);
|
||||
// extended data if presented
|
||||
if (flag & DATA_FLAG_EXTENDED_DATA_INCLUDED)
|
||||
{
|
||||
uint8_t extendedDataSize = *buf;
|
||||
buf++; // size
|
||||
LogPrint (eLogDebug, "SSU extended data of ", extendedDataSize, " bytes presented");
|
||||
buf += extendedDataSize;
|
||||
}
|
||||
// process data
|
||||
ProcessFragments (buf);
|
||||
}
|
||||
|
||||
void SSUData::Send (std::shared_ptr<i2p::I2NPMessage> msg)
|
||||
{
|
||||
uint32_t msgID = msg->ToSSU ();
|
||||
if (m_SentMessages.count (msgID) > 0)
|
||||
{
|
||||
LogPrint (eLogWarning, "SSU message ", msgID, " already sent");
|
||||
return;
|
||||
}
|
||||
if (m_SentMessages.empty ()) // schedule resend at first message only
|
||||
ScheduleResend ();
|
||||
|
||||
auto ret = m_SentMessages.insert (std::make_pair (msgID, std::unique_ptr<SentMessage>(new SentMessage)));
|
||||
std::unique_ptr<SentMessage>& sentMessage = ret.first->second;
|
||||
if (ret.second)
|
||||
{
|
||||
sentMessage->nextResendTime = i2p::util::GetSecondsSinceEpoch () + RESEND_INTERVAL;
|
||||
sentMessage->numResends = 0;
|
||||
}
|
||||
auto& fragments = sentMessage->fragments;
|
||||
size_t payloadSize = m_PacketSize - sizeof (SSUHeader) - 9; // 9 = flag + #frg(1) + messageID(4) + frag info (3)
|
||||
size_t len = msg->GetLength ();
|
||||
uint8_t * msgBuf = msg->GetSSUHeader ();
|
||||
|
||||
uint32_t fragmentNum = 0;
|
||||
while (len > 0)
|
||||
{
|
||||
Fragment * fragment = new Fragment;
|
||||
fragment->fragmentNum = fragmentNum;
|
||||
uint8_t * buf = fragment->buf;
|
||||
uint8_t * payload = buf + sizeof (SSUHeader);
|
||||
*payload = DATA_FLAG_WANT_REPLY; // for compatibility
|
||||
payload++;
|
||||
*payload = 1; // always 1 message fragment per message
|
||||
payload++;
|
||||
htobe32buf (payload, msgID);
|
||||
payload += 4;
|
||||
bool isLast = (len <= payloadSize);
|
||||
size_t size = isLast ? len : payloadSize;
|
||||
uint32_t fragmentInfo = (fragmentNum << 17);
|
||||
if (isLast)
|
||||
fragmentInfo |= 0x010000;
|
||||
|
||||
fragmentInfo |= size;
|
||||
fragmentInfo = htobe32 (fragmentInfo);
|
||||
memcpy (payload, (uint8_t *)(&fragmentInfo) + 1, 3);
|
||||
payload += 3;
|
||||
memcpy (payload, msgBuf, size);
|
||||
|
||||
size += payload - buf;
|
||||
if (size & 0x0F) // make sure 16 bytes boundary
|
||||
size = ((size >> 4) + 1) << 4; // (/16 + 1)*16
|
||||
fragment->len = size;
|
||||
fragments.push_back (std::unique_ptr<Fragment> (fragment));
|
||||
|
||||
// encrypt message with session key
|
||||
m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, size);
|
||||
try
|
||||
{
|
||||
m_Session.Send (buf, size);
|
||||
}
|
||||
catch (boost::system::system_error& ec)
|
||||
{
|
||||
LogPrint (eLogError, "Can't send SSU fragment ", ec.what ());
|
||||
}
|
||||
if (!isLast)
|
||||
{
|
||||
len -= payloadSize;
|
||||
msgBuf += payloadSize;
|
||||
}
|
||||
else
|
||||
len = 0;
|
||||
fragmentNum++;
|
||||
}
|
||||
}
|
||||
|
||||
void SSUData::SendMsgAck (uint32_t msgID)
|
||||
{
|
||||
uint8_t buf[48 + 18]; // actual length is 44 = 37 + 7 but pad it to multiple of 16
|
||||
uint8_t * payload = buf + sizeof (SSUHeader);
|
||||
*payload = DATA_FLAG_EXPLICIT_ACKS_INCLUDED; // flag
|
||||
payload++;
|
||||
*payload = 1; // number of ACKs
|
||||
payload++;
|
||||
*(uint32_t *)(payload) = htobe32 (msgID); // msgID
|
||||
payload += 4;
|
||||
*payload = 0; // number of fragments
|
||||
|
||||
// encrypt message with session key
|
||||
m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, 48);
|
||||
m_Session.Send (buf, 48);
|
||||
}
|
||||
|
||||
void SSUData::SendFragmentAck (uint32_t msgID, int fragmentNum)
|
||||
{
|
||||
if (fragmentNum > 64)
|
||||
{
|
||||
LogPrint (eLogWarning, "Fragment number ", fragmentNum, " exceeds 64");
|
||||
return;
|
||||
}
|
||||
uint8_t buf[64 + 18];
|
||||
uint8_t * payload = buf + sizeof (SSUHeader);
|
||||
*payload = DATA_FLAG_ACK_BITFIELDS_INCLUDED; // flag
|
||||
payload++;
|
||||
*payload = 1; // number of ACK bitfields
|
||||
payload++;
|
||||
// one ack
|
||||
*(uint32_t *)(payload) = htobe32 (msgID); // msgID
|
||||
payload += 4;
|
||||
div_t d = div (fragmentNum, 7);
|
||||
memset (payload, 0x80, d.quot); // 0x80 means non-last
|
||||
payload += d.quot;
|
||||
*payload = 0x01 << d.rem; // set corresponding bit
|
||||
payload++;
|
||||
*payload = 0; // number of fragments
|
||||
|
||||
size_t len = d.quot < 4 ? 48 : 64; // 48 = 37 + 7 + 4 (3+1)
|
||||
// encrypt message with session key
|
||||
m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, len);
|
||||
m_Session.Send (buf, len);
|
||||
}
|
||||
|
||||
void SSUData::ScheduleResend()
|
||||
{
|
||||
m_ResendTimer.cancel ();
|
||||
m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_INTERVAL));
|
||||
auto s = m_Session.shared_from_this();
|
||||
m_ResendTimer.async_wait ([s](const boost::system::error_code& ecode)
|
||||
{ s->m_Data.HandleResendTimer (ecode); });
|
||||
}
|
||||
|
||||
void SSUData::HandleResendTimer (const boost::system::error_code& ecode)
|
||||
{
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
{
|
||||
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
|
||||
for (auto it = m_SentMessages.begin (); it != m_SentMessages.end ();)
|
||||
{
|
||||
if (ts >= it->second->nextResendTime)
|
||||
{
|
||||
if (it->second->numResends < MAX_NUM_RESENDS)
|
||||
{
|
||||
for (auto& f: it->second->fragments)
|
||||
if (f)
|
||||
{
|
||||
try
|
||||
{
|
||||
m_Session.Send (f->buf, f->len); // resend
|
||||
}
|
||||
catch (boost::system::system_error& ec)
|
||||
{
|
||||
LogPrint (eLogError, "Can't resend SSU fragment ", ec.what ());
|
||||
}
|
||||
}
|
||||
|
||||
it->second->numResends++;
|
||||
it->second->nextResendTime += it->second->numResends*RESEND_INTERVAL;
|
||||
it++;
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint (eLogError, "SSU message has not been ACKed after ", MAX_NUM_RESENDS, " attempts. Deleted");
|
||||
it = m_SentMessages.erase (it);
|
||||
}
|
||||
}
|
||||
else
|
||||
it++;
|
||||
}
|
||||
ScheduleResend ();
|
||||
}
|
||||
}
|
||||
|
||||
void SSUData::ScheduleDecay ()
|
||||
{
|
||||
m_DecayTimer.cancel ();
|
||||
m_DecayTimer.expires_from_now (boost::posix_time::seconds(DECAY_INTERVAL));
|
||||
auto s = m_Session.shared_from_this();
|
||||
m_ResendTimer.async_wait ([s](const boost::system::error_code& ecode)
|
||||
{ s->m_Data.HandleDecayTimer (ecode); });
|
||||
}
|
||||
|
||||
void SSUData::HandleDecayTimer (const boost::system::error_code& ecode)
|
||||
{
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
m_ReceivedMessages.clear ();
|
||||
}
|
||||
|
||||
void SSUData::ScheduleIncompleteMessagesCleanup ()
|
||||
{
|
||||
m_IncompleteMessagesCleanupTimer.cancel ();
|
||||
m_IncompleteMessagesCleanupTimer.expires_from_now (boost::posix_time::seconds(INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT));
|
||||
auto s = m_Session.shared_from_this();
|
||||
m_IncompleteMessagesCleanupTimer.async_wait ([s](const boost::system::error_code& ecode)
|
||||
{ s->m_Data.HandleIncompleteMessagesCleanupTimer (ecode); });
|
||||
}
|
||||
|
||||
void SSUData::HandleIncompleteMessagesCleanupTimer (const boost::system::error_code& ecode)
|
||||
{
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
{
|
||||
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
|
||||
for (auto it = m_IncompleteMessages.begin (); it != m_IncompleteMessages.end ();)
|
||||
{
|
||||
if (ts > it->second->lastFragmentInsertTime + INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT)
|
||||
{
|
||||
LogPrint (eLogError, "SSU message ", it->first, " was not completed in ", INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT, " seconds. Deleted");
|
||||
it = m_IncompleteMessages.erase (it);
|
||||
}
|
||||
else
|
||||
it++;
|
||||
}
|
||||
ScheduleIncompleteMessagesCleanup ();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
128
core/transport/SSUData.h
Normal file
128
core/transport/SSUData.h
Normal file
|
@ -0,0 +1,128 @@
|
|||
#ifndef SSU_DATA_H__
|
||||
#define SSU_DATA_H__
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <string.h>
|
||||
#include <map>
|
||||
#include <vector>
|
||||
#include <set>
|
||||
#include <memory>
|
||||
#include <boost/asio.hpp>
|
||||
#include "I2NPProtocol.h"
|
||||
#include "Identity.h"
|
||||
#include "RouterInfo.h"
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
|
||||
const size_t SSU_MTU_V4 = 1484;
|
||||
const size_t SSU_MTU_V6 = 1472;
|
||||
const size_t IPV4_HEADER_SIZE = 20;
|
||||
const size_t IPV6_HEADER_SIZE = 40;
|
||||
const size_t UDP_HEADER_SIZE = 8;
|
||||
const size_t SSU_V4_MAX_PACKET_SIZE = SSU_MTU_V4 - IPV4_HEADER_SIZE - UDP_HEADER_SIZE; // 1456
|
||||
const size_t SSU_V6_MAX_PACKET_SIZE = SSU_MTU_V6 - IPV6_HEADER_SIZE - UDP_HEADER_SIZE; // 1424
|
||||
const int RESEND_INTERVAL = 3; // in seconds
|
||||
const int MAX_NUM_RESENDS = 5;
|
||||
const int DECAY_INTERVAL = 20; // in seconds
|
||||
const int MAX_NUM_RECEIVED_MESSAGES = 1000; // how many msgID we store for duplicates check
|
||||
const int INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds
|
||||
// data flags
|
||||
const uint8_t DATA_FLAG_EXTENDED_DATA_INCLUDED = 0x02;
|
||||
const uint8_t DATA_FLAG_WANT_REPLY = 0x04;
|
||||
const uint8_t DATA_FLAG_REQUEST_PREVIOUS_ACKS = 0x08;
|
||||
const uint8_t DATA_FLAG_EXPLICIT_CONGESTION_NOTIFICATION = 0x10;
|
||||
const uint8_t DATA_FLAG_ACK_BITFIELDS_INCLUDED = 0x40;
|
||||
const uint8_t DATA_FLAG_EXPLICIT_ACKS_INCLUDED = 0x80;
|
||||
|
||||
struct Fragment
|
||||
{
|
||||
int fragmentNum;
|
||||
size_t len;
|
||||
bool isLast;
|
||||
uint8_t buf[SSU_V4_MAX_PACKET_SIZE + 18]; // use biggest
|
||||
|
||||
Fragment () = default;
|
||||
Fragment (int n, const uint8_t * b, int l, bool last):
|
||||
fragmentNum (n), len (l), isLast (last) { memcpy (buf, b, len); };
|
||||
};
|
||||
|
||||
struct FragmentCmp
|
||||
{
|
||||
bool operator() (const std::unique_ptr<Fragment>& f1, const std::unique_ptr<Fragment>& f2) const
|
||||
{
|
||||
return f1->fragmentNum < f2->fragmentNum;
|
||||
};
|
||||
};
|
||||
|
||||
struct IncompleteMessage
|
||||
{
|
||||
std::shared_ptr<I2NPMessage> msg;
|
||||
int nextFragmentNum;
|
||||
uint32_t lastFragmentInsertTime; // in seconds
|
||||
std::set<std::unique_ptr<Fragment>, FragmentCmp> savedFragments;
|
||||
|
||||
IncompleteMessage (std::shared_ptr<I2NPMessage> m): msg (m), nextFragmentNum (0), lastFragmentInsertTime (0) {};
|
||||
void AttachNextFragment (const uint8_t * fragment, size_t fragmentSize);
|
||||
};
|
||||
|
||||
struct SentMessage
|
||||
{
|
||||
std::vector<std::unique_ptr<Fragment> > fragments;
|
||||
uint32_t nextResendTime; // in seconds
|
||||
int numResends;
|
||||
};
|
||||
|
||||
class SSUSession;
|
||||
class SSUData
|
||||
{
|
||||
public:
|
||||
|
||||
SSUData (SSUSession& session);
|
||||
~SSUData ();
|
||||
|
||||
void Start ();
|
||||
void Stop ();
|
||||
|
||||
void ProcessMessage (uint8_t * buf, size_t len);
|
||||
void FlushReceivedMessage ();
|
||||
void Send (std::shared_ptr<i2p::I2NPMessage> msg);
|
||||
|
||||
void UpdatePacketSize (const i2p::data::IdentHash& remoteIdent);
|
||||
|
||||
private:
|
||||
|
||||
void SendMsgAck (uint32_t msgID);
|
||||
void SendFragmentAck (uint32_t msgID, int fragmentNum);
|
||||
void ProcessAcks (uint8_t *& buf, uint8_t flag);
|
||||
void ProcessFragments (uint8_t * buf);
|
||||
void ProcessSentMessageAck (uint32_t msgID);
|
||||
|
||||
void ScheduleResend ();
|
||||
void HandleResendTimer (const boost::system::error_code& ecode);
|
||||
|
||||
void ScheduleDecay ();
|
||||
void HandleDecayTimer (const boost::system::error_code& ecode);
|
||||
|
||||
void ScheduleIncompleteMessagesCleanup ();
|
||||
void HandleIncompleteMessagesCleanupTimer (const boost::system::error_code& ecode);
|
||||
|
||||
void AdjustPacketSize (const i2p::data::RouterInfo& remoteRouter);
|
||||
|
||||
private:
|
||||
|
||||
SSUSession& m_Session;
|
||||
std::map<uint32_t, std::unique_ptr<IncompleteMessage> > m_IncompleteMessages;
|
||||
std::map<uint32_t, std::unique_ptr<SentMessage> > m_SentMessages;
|
||||
std::set<uint32_t> m_ReceivedMessages;
|
||||
boost::asio::deadline_timer m_ResendTimer, m_DecayTimer, m_IncompleteMessagesCleanupTimer;
|
||||
int m_MaxPacketSize, m_PacketSize;
|
||||
i2p::I2NPMessagesHandler m_Handler;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
1090
core/transport/SSUSession.cpp
Normal file
1090
core/transport/SSUSession.cpp
Normal file
File diff suppressed because it is too large
Load diff
159
core/transport/SSUSession.h
Normal file
159
core/transport/SSUSession.h
Normal file
|
@ -0,0 +1,159 @@
|
|||
#ifndef SSU_SESSION_H__
|
||||
#define SSU_SESSION_H__
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <set>
|
||||
#include <memory>
|
||||
#include "crypto/aes.h"
|
||||
#include "crypto/hmac.h"
|
||||
#include "I2NPProtocol.h"
|
||||
#include "TransportSession.h"
|
||||
#include "SSUData.h"
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
#pragma pack(1)
|
||||
// Warning: do not change the order of these variables
|
||||
// (or fix the unsafe casts in SSU.h)
|
||||
struct SSUHeader
|
||||
{
|
||||
uint8_t mac[16];
|
||||
uint8_t iv[16];
|
||||
uint8_t flag;
|
||||
uint32_t time;
|
||||
|
||||
uint8_t GetPayloadType () const { return flag >> 4; };
|
||||
};
|
||||
#pragma pack()
|
||||
|
||||
const int SSU_CONNECT_TIMEOUT = 5; // 5 seconds
|
||||
const int SSU_TERMINATION_TIMEOUT = 330; // 5.5 minutes
|
||||
|
||||
// payload types (4 bits)
|
||||
const uint8_t PAYLOAD_TYPE_SESSION_REQUEST = 0;
|
||||
const uint8_t PAYLOAD_TYPE_SESSION_CREATED = 1;
|
||||
const uint8_t PAYLOAD_TYPE_SESSION_CONFIRMED = 2;
|
||||
const uint8_t PAYLOAD_TYPE_RELAY_REQUEST = 3;
|
||||
const uint8_t PAYLOAD_TYPE_RELAY_RESPONSE = 4;
|
||||
const uint8_t PAYLOAD_TYPE_RELAY_INTRO = 5;
|
||||
const uint8_t PAYLOAD_TYPE_DATA = 6;
|
||||
const uint8_t PAYLOAD_TYPE_PEER_TEST = 7;
|
||||
const uint8_t PAYLOAD_TYPE_SESSION_DESTROYED = 8;
|
||||
|
||||
enum SessionState
|
||||
{
|
||||
eSessionStateUnknown,
|
||||
eSessionStateIntroduced,
|
||||
eSessionStateEstablished,
|
||||
eSessionStateClosed,
|
||||
eSessionStateFailed
|
||||
};
|
||||
|
||||
enum PeerTestParticipant
|
||||
{
|
||||
ePeerTestParticipantUnknown = 0,
|
||||
ePeerTestParticipantAlice1,
|
||||
ePeerTestParticipantAlice2,
|
||||
ePeerTestParticipantBob,
|
||||
ePeerTestParticipantCharlie
|
||||
};
|
||||
|
||||
class SSUServer;
|
||||
class SSUSession: public TransportSession, public std::enable_shared_from_this<SSUSession>
|
||||
{
|
||||
public:
|
||||
|
||||
SSUSession (SSUServer& server, boost::asio::ip::udp::endpoint& remoteEndpoint,
|
||||
std::shared_ptr<const i2p::data::RouterInfo> router = nullptr, bool peerTest = false);
|
||||
void ProcessNextMessage (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
|
||||
~SSUSession ();
|
||||
|
||||
void Connect ();
|
||||
void WaitForConnect ();
|
||||
void Introduce (uint32_t iTag, const uint8_t * iKey);
|
||||
void WaitForIntroduction ();
|
||||
void Close ();
|
||||
void Done ();
|
||||
boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; };
|
||||
bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); };
|
||||
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs);
|
||||
void SendPeerTest (); // Alice
|
||||
|
||||
SessionState GetState () const { return m_State; };
|
||||
size_t GetNumSentBytes () const { return m_NumSentBytes; };
|
||||
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
|
||||
|
||||
void SendKeepAlive ();
|
||||
uint32_t GetRelayTag () const { return m_RelayTag; };
|
||||
uint32_t GetCreationTime () const { return m_CreationTime; };
|
||||
|
||||
void FlushData ();
|
||||
|
||||
private:
|
||||
|
||||
boost::asio::io_service& GetService ();
|
||||
void CreateAESandMacKey (const uint8_t * pubKey);
|
||||
|
||||
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
|
||||
void ProcessMessage (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); // call for established session
|
||||
void ProcessSessionRequest (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
|
||||
void SendSessionRequest ();
|
||||
void SendRelayRequest (uint32_t iTag, const uint8_t * iKey);
|
||||
void ProcessSessionCreated (uint8_t * buf, size_t len);
|
||||
void SendSessionCreated (const uint8_t * x);
|
||||
void ProcessSessionConfirmed (uint8_t * buf, size_t len);
|
||||
void SendSessionConfirmed (const uint8_t * y, const uint8_t * ourAddress, size_t ourAddressLen);
|
||||
void ProcessRelayRequest (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& from);
|
||||
void SendRelayResponse (uint32_t nonce, const boost::asio::ip::udp::endpoint& from,
|
||||
const uint8_t * introKey, const boost::asio::ip::udp::endpoint& to);
|
||||
void SendRelayIntro (SSUSession * session, const boost::asio::ip::udp::endpoint& from);
|
||||
void ProcessRelayResponse (uint8_t * buf, size_t len);
|
||||
void ProcessRelayIntro (uint8_t * buf, size_t len);
|
||||
void Established ();
|
||||
void Failed ();
|
||||
void ScheduleConnectTimer ();
|
||||
void HandleConnectTimer (const boost::system::error_code& ecode);
|
||||
void ProcessPeerTest (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
|
||||
void SendPeerTest (uint32_t nonce, uint32_t address, uint16_t port, const uint8_t * introKey, bool toAddress = true, bool sendAddress = true);
|
||||
void ProcessData (uint8_t * buf, size_t len);
|
||||
void SendSesionDestroyed ();
|
||||
void Send (uint8_t type, const uint8_t * payload, size_t len); // with session key
|
||||
void Send (const uint8_t * buf, size_t size);
|
||||
|
||||
void FillHeaderAndEncrypt (uint8_t payloadType, uint8_t * buf, size_t len, const uint8_t * aesKey, const uint8_t * iv, const uint8_t * macKey);
|
||||
void FillHeaderAndEncrypt (uint8_t payloadType, uint8_t * buf, size_t len); // with session key
|
||||
void Decrypt (uint8_t * buf, size_t len, const uint8_t * aesKey);
|
||||
void DecryptSessionKey (uint8_t * buf, size_t len);
|
||||
bool Validate (uint8_t * buf, size_t len, const uint8_t * macKey);
|
||||
const uint8_t * GetIntroKey () const;
|
||||
|
||||
void ScheduleTermination ();
|
||||
void HandleTerminationTimer (const boost::system::error_code& ecode);
|
||||
|
||||
private:
|
||||
|
||||
friend class SSUData; // TODO: change in later
|
||||
SSUServer& m_Server;
|
||||
boost::asio::ip::udp::endpoint m_RemoteEndpoint;
|
||||
boost::asio::deadline_timer m_Timer;
|
||||
bool m_PeerTest;
|
||||
SessionState m_State;
|
||||
bool m_IsSessionKey;
|
||||
uint32_t m_RelayTag;
|
||||
i2p::crypto::CBCEncryption m_SessionKeyEncryption;
|
||||
i2p::crypto::CBCDecryption m_SessionKeyDecryption;
|
||||
i2p::crypto::AESKey m_SessionKey;
|
||||
i2p::crypto::MACKey m_MacKey;
|
||||
uint32_t m_CreationTime; // seconds since epoch
|
||||
SSUData m_Data;
|
||||
bool m_IsDataReceived;
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
86
core/transport/TransportSession.h
Normal file
86
core/transport/TransportSession.h
Normal file
|
@ -0,0 +1,86 @@
|
|||
#ifndef TRANSPORT_SESSION_H__
|
||||
#define TRANSPORT_SESSION_H__
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include "Identity.h"
|
||||
#include "RouterInfo.h"
|
||||
#include "I2NPProtocol.h"
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
struct DHKeysPair // transient keys for transport sessions
|
||||
{
|
||||
uint8_t publicKey[256];
|
||||
uint8_t privateKey[256];
|
||||
};
|
||||
|
||||
class SignedData
|
||||
{
|
||||
public:
|
||||
|
||||
SignedData () {};
|
||||
void Insert (const uint8_t * buf, size_t len)
|
||||
{
|
||||
m_Stream.write ((char *)buf, len);
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
void Insert (T t)
|
||||
{
|
||||
m_Stream.write ((char *)&t, sizeof (T));
|
||||
}
|
||||
|
||||
bool Verify (const i2p::data::IdentityEx& ident, const uint8_t * signature) const
|
||||
{
|
||||
return ident.Verify ((const uint8_t *)m_Stream.str ().c_str (), m_Stream.str ().size (), signature);
|
||||
}
|
||||
|
||||
void Sign (const i2p::data::PrivateKeys& keys, uint8_t * signature) const
|
||||
{
|
||||
keys.Sign ((const uint8_t *)m_Stream.str ().c_str (), m_Stream.str ().size (), signature);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
std::stringstream m_Stream;
|
||||
};
|
||||
|
||||
class TransportSession
|
||||
{
|
||||
public:
|
||||
|
||||
TransportSession (std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter):
|
||||
m_RemoteRouter (in_RemoteRouter), m_DHKeysPair (nullptr),
|
||||
m_NumSentBytes (0), m_NumReceivedBytes (0)
|
||||
{
|
||||
if (m_RemoteRouter)
|
||||
m_RemoteIdentity = m_RemoteRouter->GetRouterIdentity ();
|
||||
}
|
||||
|
||||
virtual ~TransportSession () { delete m_DHKeysPair; };
|
||||
virtual void Done () = 0;
|
||||
|
||||
std::shared_ptr<const i2p::data::RouterInfo> GetRemoteRouter () { return m_RemoteRouter; };
|
||||
const i2p::data::IdentityEx& GetRemoteIdentity () { return m_RemoteIdentity; };
|
||||
|
||||
size_t GetNumSentBytes () const { return m_NumSentBytes; };
|
||||
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
|
||||
|
||||
virtual void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) = 0;
|
||||
|
||||
protected:
|
||||
|
||||
std::shared_ptr<const i2p::data::RouterInfo> m_RemoteRouter;
|
||||
i2p::data::IdentityEx m_RemoteIdentity;
|
||||
DHKeysPair * m_DHKeysPair; // X - for client and Y - for server
|
||||
size_t m_NumSentBytes, m_NumReceivedBytes;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
505
core/transport/Transports.cpp
Normal file
505
core/transport/Transports.cpp
Normal file
|
@ -0,0 +1,505 @@
|
|||
#include <cryptopp/dh.h>
|
||||
#include "util/Log.h"
|
||||
#include "crypto/CryptoConst.h"
|
||||
#include "RouterContext.h"
|
||||
#include "I2NPProtocol.h"
|
||||
#include "NetDb.h"
|
||||
#include "Transports.h"
|
||||
|
||||
using namespace i2p::data;
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
DHKeysPairSupplier::DHKeysPairSupplier (int size):
|
||||
m_QueueSize (size), m_IsRunning (false), m_Thread (nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
DHKeysPairSupplier::~DHKeysPairSupplier ()
|
||||
{
|
||||
Stop ();
|
||||
}
|
||||
|
||||
void DHKeysPairSupplier::Start ()
|
||||
{
|
||||
m_IsRunning = true;
|
||||
m_Thread = new std::thread (std::bind (&DHKeysPairSupplier::Run, this));
|
||||
}
|
||||
|
||||
void DHKeysPairSupplier::Stop ()
|
||||
{
|
||||
m_IsRunning = false;
|
||||
m_Acquired.notify_one ();
|
||||
if (m_Thread)
|
||||
{
|
||||
m_Thread->join ();
|
||||
delete m_Thread;
|
||||
m_Thread = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void DHKeysPairSupplier::Run ()
|
||||
{
|
||||
while (m_IsRunning)
|
||||
{
|
||||
int num;
|
||||
while ((num = m_QueueSize - m_Queue.size ()) > 0)
|
||||
CreateDHKeysPairs (num);
|
||||
std::unique_lock<std::mutex> l(m_AcquiredMutex);
|
||||
m_Acquired.wait (l); // wait for element gets aquired
|
||||
}
|
||||
}
|
||||
|
||||
void DHKeysPairSupplier::CreateDHKeysPairs (int num)
|
||||
{
|
||||
if (num > 0)
|
||||
{
|
||||
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
|
||||
for (int i = 0; i < num; i++)
|
||||
{
|
||||
i2p::transport::DHKeysPair * pair = new i2p::transport::DHKeysPair ();
|
||||
dh.GenerateKeyPair(m_Rnd, pair->privateKey, pair->publicKey);
|
||||
std::unique_lock<std::mutex> l(m_AcquiredMutex);
|
||||
m_Queue.push (pair);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DHKeysPair * DHKeysPairSupplier::Acquire ()
|
||||
{
|
||||
if (!m_Queue.empty ())
|
||||
{
|
||||
std::unique_lock<std::mutex> l(m_AcquiredMutex);
|
||||
auto pair = m_Queue.front ();
|
||||
m_Queue.pop ();
|
||||
m_Acquired.notify_one ();
|
||||
return pair;
|
||||
}
|
||||
else // queue is empty, create new
|
||||
{
|
||||
DHKeysPair * pair = new DHKeysPair ();
|
||||
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
|
||||
dh.GenerateKeyPair(m_Rnd, pair->privateKey, pair->publicKey);
|
||||
return pair;
|
||||
}
|
||||
}
|
||||
|
||||
void DHKeysPairSupplier::Return (DHKeysPair * pair)
|
||||
{
|
||||
std::unique_lock<std::mutex> l(m_AcquiredMutex);
|
||||
m_Queue.push (pair);
|
||||
}
|
||||
|
||||
Transports transports;
|
||||
|
||||
Transports::Transports ():
|
||||
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_PeerCleanupTimer (m_Service),
|
||||
m_NTCPServer (nullptr), m_SSUServer (nullptr), m_DHKeysPairSupplier (5), // 5 pre-generated keys
|
||||
m_TotalSentBytes(0), m_TotalReceivedBytes(0), m_InBandwidth (0), m_OutBandwidth (0),
|
||||
m_LastInBandwidthUpdateBytes (0), m_LastOutBandwidthUpdateBytes (0), m_LastBandwidthUpdateTime (0)
|
||||
{
|
||||
}
|
||||
|
||||
Transports::~Transports ()
|
||||
{
|
||||
Stop ();
|
||||
}
|
||||
|
||||
void Transports::Start ()
|
||||
{
|
||||
#ifdef USE_UPNP
|
||||
m_UPnP.Start ();
|
||||
LogPrint(eLogInfo, "UPnP started");
|
||||
#endif
|
||||
m_DHKeysPairSupplier.Start ();
|
||||
m_IsRunning = true;
|
||||
m_Thread = new std::thread (std::bind (&Transports::Run, this));
|
||||
// create acceptors
|
||||
auto addresses = context.GetRouterInfo ().GetAddresses ();
|
||||
for (auto& address : addresses)
|
||||
{
|
||||
if (!m_NTCPServer)
|
||||
{
|
||||
m_NTCPServer = new NTCPServer (address.port);
|
||||
m_NTCPServer->Start ();
|
||||
}
|
||||
|
||||
if (address.transportStyle == RouterInfo::eTransportSSU && address.host.is_v4 ())
|
||||
{
|
||||
if (!m_SSUServer)
|
||||
{
|
||||
m_SSUServer = new SSUServer (address.port);
|
||||
LogPrint ("Start listening UDP port ", address.port);
|
||||
m_SSUServer->Start ();
|
||||
DetectExternalIP ();
|
||||
}
|
||||
else
|
||||
LogPrint ("SSU server already exists");
|
||||
}
|
||||
}
|
||||
m_PeerCleanupTimer.expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT));
|
||||
m_PeerCleanupTimer.async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1));
|
||||
}
|
||||
|
||||
void Transports::Stop ()
|
||||
{
|
||||
#ifdef USE_UPNP
|
||||
m_UPnP.Stop ();
|
||||
LogPrint(eLogInfo, "UPnP stopped");
|
||||
#endif
|
||||
m_PeerCleanupTimer.cancel ();
|
||||
m_Peers.clear ();
|
||||
if (m_SSUServer)
|
||||
{
|
||||
m_SSUServer->Stop ();
|
||||
delete m_SSUServer;
|
||||
m_SSUServer = nullptr;
|
||||
}
|
||||
if (m_NTCPServer)
|
||||
{
|
||||
m_NTCPServer->Stop ();
|
||||
delete m_NTCPServer;
|
||||
m_NTCPServer = nullptr;
|
||||
}
|
||||
|
||||
m_DHKeysPairSupplier.Stop ();
|
||||
m_IsRunning = false;
|
||||
m_Service.stop ();
|
||||
if (m_Thread)
|
||||
{
|
||||
m_Thread->join ();
|
||||
delete m_Thread;
|
||||
m_Thread = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void Transports::Run ()
|
||||
{
|
||||
while (m_IsRunning)
|
||||
{
|
||||
try
|
||||
{
|
||||
m_Service.run ();
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LogPrint ("Transports: ", ex.what ());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Transports::UpdateBandwidth ()
|
||||
{
|
||||
uint64_t ts = i2p::util::GetMillisecondsSinceEpoch ();
|
||||
if (m_LastBandwidthUpdateTime > 0)
|
||||
{
|
||||
auto delta = ts - m_LastBandwidthUpdateTime;
|
||||
if (delta > 0)
|
||||
{
|
||||
m_InBandwidth = (m_TotalReceivedBytes - m_LastInBandwidthUpdateBytes)*1000/delta; // per second
|
||||
m_OutBandwidth = (m_TotalSentBytes - m_LastOutBandwidthUpdateBytes)*1000/delta; // per second
|
||||
}
|
||||
}
|
||||
m_LastBandwidthUpdateTime = ts;
|
||||
m_LastInBandwidthUpdateBytes = m_TotalReceivedBytes;
|
||||
m_LastOutBandwidthUpdateBytes = m_TotalSentBytes;
|
||||
}
|
||||
|
||||
bool Transports::IsBandwidthExceeded () const
|
||||
{
|
||||
if (i2p::context.GetRouterInfo ().IsHighBandwidth ()) return false;
|
||||
return std::max (m_InBandwidth, m_OutBandwidth) > LOW_BANDWIDTH_LIMIT;
|
||||
}
|
||||
|
||||
void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg)
|
||||
{
|
||||
SendMessages (ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > {msg });
|
||||
}
|
||||
|
||||
void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs)
|
||||
{
|
||||
m_Service.post (std::bind (&Transports::PostMessages, this, ident, msgs));
|
||||
}
|
||||
|
||||
void Transports::PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs)
|
||||
{
|
||||
if (ident == i2p::context.GetRouterInfo ().GetIdentHash ())
|
||||
{
|
||||
// we send it to ourself
|
||||
for (auto it: msgs)
|
||||
i2p::HandleI2NPMessage (it);
|
||||
return;
|
||||
}
|
||||
auto it = m_Peers.find (ident);
|
||||
if (it == m_Peers.end ())
|
||||
{
|
||||
bool connected = false;
|
||||
try
|
||||
{
|
||||
auto r = netdb.FindRouter (ident);
|
||||
it = m_Peers.insert (std::pair<i2p::data::IdentHash, Peer>(ident, { 0, r, {},
|
||||
i2p::util::GetSecondsSinceEpoch () })).first;
|
||||
connected = ConnectToPeer (ident, it->second);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LogPrint (eLogError, "Transports::PostMessages ", ex.what ());
|
||||
}
|
||||
if (!connected) return;
|
||||
}
|
||||
if (!it->second.sessions.empty ())
|
||||
it->second.sessions.front ()->SendI2NPMessages (msgs);
|
||||
else
|
||||
{
|
||||
for (auto it1: msgs)
|
||||
it->second.delayedMessages.push_back (it1);
|
||||
}
|
||||
}
|
||||
|
||||
bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer)
|
||||
{
|
||||
if (peer.router) // we have RI already
|
||||
{
|
||||
if (!peer.numAttempts) // NTCP
|
||||
{
|
||||
peer.numAttempts++;
|
||||
auto address = peer.router->GetNTCPAddress (!context.SupportsV6 ());
|
||||
if (address)
|
||||
{
|
||||
#if BOOST_VERSION >= 104900
|
||||
if (!address->host.is_unspecified ()) // we have address now
|
||||
#else
|
||||
boost::system::error_code ecode;
|
||||
address->host.to_string (ecode);
|
||||
if (!ecode)
|
||||
#endif
|
||||
{
|
||||
if (!peer.router->UsesIntroducer () && !peer.router->IsUnreachable ())
|
||||
{
|
||||
auto s = std::make_shared<NTCPSession> (*m_NTCPServer, peer.router);
|
||||
m_NTCPServer->Connect (address->host, address->port, s);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
else // we don't have address
|
||||
{
|
||||
if (address->addressString.length () > 0) // trying to resolve
|
||||
{
|
||||
LogPrint (eLogInfo, "Resolving ", address->addressString);
|
||||
NTCPResolve (address->addressString, ident);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (peer.numAttempts == 1)// SSU
|
||||
{
|
||||
peer.numAttempts++;
|
||||
if (m_SSUServer)
|
||||
{
|
||||
if (m_SSUServer->GetSession (peer.router))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
LogPrint (eLogError, "No NTCP and SSU addresses available");
|
||||
peer.Done ();
|
||||
m_Peers.erase (ident);
|
||||
return false;
|
||||
}
|
||||
else // otherwise request RI
|
||||
{
|
||||
LogPrint ("Router not found. Requested");
|
||||
i2p::data::netdb.RequestDestination (ident, std::bind (
|
||||
&Transports::RequestComplete, this, std::placeholders::_1, ident));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void Transports::RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident)
|
||||
{
|
||||
m_Service.post (std::bind (&Transports::HandleRequestComplete, this, r, ident));
|
||||
}
|
||||
|
||||
void Transports::HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident)
|
||||
{
|
||||
auto it = m_Peers.find (ident);
|
||||
if (it != m_Peers.end ())
|
||||
{
|
||||
if (r)
|
||||
{
|
||||
LogPrint ("Router found. Trying to connect");
|
||||
it->second.router = r;
|
||||
ConnectToPeer (ident, it->second);
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint ("Router not found. Failed to send messages");
|
||||
m_Peers.erase (it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Transports::NTCPResolve (const std::string& addr, const i2p::data::IdentHash& ident)
|
||||
{
|
||||
auto resolver = std::make_shared<boost::asio::ip::tcp::resolver>(m_Service);
|
||||
resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""),
|
||||
std::bind (&Transports::HandleNTCPResolve, this,
|
||||
std::placeholders::_1, std::placeholders::_2, ident, resolver));
|
||||
}
|
||||
|
||||
void Transports::HandleNTCPResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it,
|
||||
i2p::data::IdentHash ident, std::shared_ptr<boost::asio::ip::tcp::resolver> resolver)
|
||||
{
|
||||
auto it1 = m_Peers.find (ident);
|
||||
if (it1 != m_Peers.end ())
|
||||
{
|
||||
auto& peer = it1->second;
|
||||
if (!ecode && peer.router)
|
||||
{
|
||||
auto address = (*it).endpoint ().address ();
|
||||
LogPrint (eLogInfo, (*it).host_name (), " has been resolved to ", address);
|
||||
auto addr = peer.router->GetNTCPAddress ();
|
||||
if (addr)
|
||||
{
|
||||
auto s = std::make_shared<NTCPSession> (*m_NTCPServer, peer.router);
|
||||
m_NTCPServer->Connect (address, addr->port, s);
|
||||
return;
|
||||
}
|
||||
}
|
||||
LogPrint (eLogError, "Unable to resolve NTCP address: ", ecode.message ());
|
||||
m_Peers.erase (it1);
|
||||
}
|
||||
}
|
||||
|
||||
void Transports::CloseSession (std::shared_ptr<const i2p::data::RouterInfo> router)
|
||||
{
|
||||
if (!router) return;
|
||||
m_Service.post (std::bind (&Transports::PostCloseSession, this, router));
|
||||
}
|
||||
|
||||
void Transports::PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router)
|
||||
{
|
||||
auto ssuSession = m_SSUServer ? m_SSUServer->FindSession (router) : nullptr;
|
||||
if (ssuSession) // try SSU first
|
||||
{
|
||||
m_SSUServer->DeleteSession (ssuSession);
|
||||
LogPrint ("SSU session closed");
|
||||
}
|
||||
// TODO: delete NTCP
|
||||
}
|
||||
|
||||
void Transports::DetectExternalIP ()
|
||||
{
|
||||
if (m_SSUServer)
|
||||
{
|
||||
i2p::context.SetStatus (eRouterStatusTesting);
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
auto router = i2p::data::netdb.GetRandomPeerTestRouter ();
|
||||
if (router && router->IsSSU ())
|
||||
m_SSUServer->GetSession (router, true); // peer test
|
||||
else
|
||||
{
|
||||
// if not peer test capable routers found pick any
|
||||
router = i2p::data::netdb.GetRandomRouter ();
|
||||
if (router && router->IsSSU ())
|
||||
m_SSUServer->GetSession (router); // no peer test
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
LogPrint (eLogError, "Can't detect external IP. SSU is not available");
|
||||
}
|
||||
|
||||
DHKeysPair * Transports::GetNextDHKeysPair ()
|
||||
{
|
||||
return m_DHKeysPairSupplier.Acquire ();
|
||||
}
|
||||
|
||||
void Transports::ReuseDHKeysPair (DHKeysPair * pair)
|
||||
{
|
||||
m_DHKeysPairSupplier.Return (pair);
|
||||
}
|
||||
|
||||
void Transports::PeerConnected (std::shared_ptr<TransportSession> session)
|
||||
{
|
||||
m_Service.post([session, this]()
|
||||
{
|
||||
auto ident = session->GetRemoteIdentity ().GetIdentHash ();
|
||||
auto it = m_Peers.find (ident);
|
||||
if (it != m_Peers.end ())
|
||||
{
|
||||
it->second.sessions.push_back (session);
|
||||
session->SendI2NPMessages (it->second.delayedMessages);
|
||||
it->second.delayedMessages.clear ();
|
||||
}
|
||||
else // incoming connection
|
||||
m_Peers.insert (std::make_pair (ident, Peer{ 0, nullptr, { session }, i2p::util::GetSecondsSinceEpoch () }));
|
||||
});
|
||||
}
|
||||
|
||||
void Transports::PeerDisconnected (std::shared_ptr<TransportSession> session)
|
||||
{
|
||||
m_Service.post([session, this]()
|
||||
{
|
||||
auto ident = session->GetRemoteIdentity ().GetIdentHash ();
|
||||
auto it = m_Peers.find (ident);
|
||||
if (it != m_Peers.end ())
|
||||
{
|
||||
it->second.sessions.remove (session);
|
||||
if (it->second.sessions.empty ()) // TODO: why?
|
||||
{
|
||||
if (it->second.delayedMessages.size () > 0)
|
||||
ConnectToPeer (ident, it->second);
|
||||
else
|
||||
m_Peers.erase (it);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
bool Transports::IsConnected (const i2p::data::IdentHash& ident) const
|
||||
{
|
||||
auto it = m_Peers.find (ident);
|
||||
return it != m_Peers.end ();
|
||||
}
|
||||
|
||||
void Transports::HandlePeerCleanupTimer (const boost::system::error_code& ecode)
|
||||
{
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
{
|
||||
auto ts = i2p::util::GetSecondsSinceEpoch ();
|
||||
for (auto it = m_Peers.begin (); it != m_Peers.end (); )
|
||||
{
|
||||
if (it->second.sessions.empty () && ts > it->second.creationTime + SESSION_CREATION_TIMEOUT)
|
||||
{
|
||||
LogPrint (eLogError, "Session to peer ", it->first.ToBase64 (), " has not been created in ", SESSION_CREATION_TIMEOUT, " seconds");
|
||||
it = m_Peers.erase (it);
|
||||
}
|
||||
else
|
||||
it++;
|
||||
}
|
||||
UpdateBandwidth (); // TODO: use separate timer(s) for it
|
||||
if (i2p::context.GetStatus () == eRouterStatusTesting) // if still testing, repeat peer test
|
||||
DetectExternalIP ();
|
||||
m_PeerCleanupTimer.expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT));
|
||||
m_PeerCleanupTimer.async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1));
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<const i2p::data::RouterInfo> Transports::GetRandomPeer () const
|
||||
{
|
||||
if(m_Peers.empty()) // ensure m.Peers.size() >= 1
|
||||
return nullptr;
|
||||
|
||||
CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator();
|
||||
auto it = m_Peers.begin();
|
||||
std::advance(it, rnd.GenerateWord32(0, m_Peers.size () - 1));
|
||||
|
||||
return it->second.router;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
160
core/transport/Transports.h
Normal file
160
core/transport/Transports.h
Normal file
|
@ -0,0 +1,160 @@
|
|||
#ifndef TRANSPORTS_H__
|
||||
#define TRANSPORTS_H__
|
||||
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <vector>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <atomic>
|
||||
#include <cryptopp/osrng.h>
|
||||
#include <boost/asio.hpp>
|
||||
#include "TransportSession.h"
|
||||
#include "NTCPSession.h"
|
||||
#include "SSU.h"
|
||||
#include "RouterInfo.h"
|
||||
#include "I2NPProtocol.h"
|
||||
#include "Identity.h"
|
||||
|
||||
#ifdef USE_UPNP
|
||||
#include "UPnP.h"
|
||||
#endif
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
namespace transport
|
||||
{
|
||||
class DHKeysPairSupplier
|
||||
{
|
||||
public:
|
||||
|
||||
DHKeysPairSupplier (int size);
|
||||
~DHKeysPairSupplier ();
|
||||
void Start ();
|
||||
void Stop ();
|
||||
DHKeysPair * Acquire ();
|
||||
void Return (DHKeysPair * pair);
|
||||
|
||||
private:
|
||||
|
||||
void Run ();
|
||||
void CreateDHKeysPairs (int num);
|
||||
|
||||
private:
|
||||
|
||||
const int m_QueueSize;
|
||||
std::queue<DHKeysPair *> m_Queue;
|
||||
|
||||
bool m_IsRunning;
|
||||
std::thread * m_Thread;
|
||||
std::condition_variable m_Acquired;
|
||||
std::mutex m_AcquiredMutex;
|
||||
CryptoPP::AutoSeededRandomPool m_Rnd;
|
||||
};
|
||||
|
||||
struct Peer
|
||||
{
|
||||
int numAttempts;
|
||||
std::shared_ptr<const i2p::data::RouterInfo> router;
|
||||
std::list<std::shared_ptr<TransportSession> > sessions;
|
||||
uint64_t creationTime;
|
||||
std::vector<std::shared_ptr<i2p::I2NPMessage> > delayedMessages;
|
||||
|
||||
void Done ()
|
||||
{
|
||||
for (auto it: sessions)
|
||||
it->Done ();
|
||||
}
|
||||
};
|
||||
|
||||
const size_t SESSION_CREATION_TIMEOUT = 10; // in seconds
|
||||
const uint32_t LOW_BANDWIDTH_LIMIT = 32*1024; // 32KBs
|
||||
class Transports
|
||||
{
|
||||
public:
|
||||
|
||||
Transports ();
|
||||
~Transports ();
|
||||
|
||||
void Start ();
|
||||
void Stop ();
|
||||
|
||||
boost::asio::io_service& GetService () { return m_Service; };
|
||||
i2p::transport::DHKeysPair * GetNextDHKeysPair ();
|
||||
void ReuseDHKeysPair (DHKeysPair * pair);
|
||||
|
||||
void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg);
|
||||
void SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs);
|
||||
void CloseSession (std::shared_ptr<const i2p::data::RouterInfo> router);
|
||||
|
||||
void PeerConnected (std::shared_ptr<TransportSession> session);
|
||||
void PeerDisconnected (std::shared_ptr<TransportSession> session);
|
||||
bool IsConnected (const i2p::data::IdentHash& ident) const;
|
||||
|
||||
void UpdateSentBytes (uint64_t numBytes) { m_TotalSentBytes += numBytes; };
|
||||
void UpdateReceivedBytes (uint64_t numBytes) { m_TotalReceivedBytes += numBytes; };
|
||||
uint64_t GetTotalSentBytes () const { return m_TotalSentBytes; };
|
||||
uint64_t GetTotalReceivedBytes () const { return m_TotalReceivedBytes; };
|
||||
uint32_t GetInBandwidth () const { return m_InBandwidth; }; // bytes per second
|
||||
uint32_t GetOutBandwidth () const { return m_OutBandwidth; }; // bytes per second
|
||||
bool IsBandwidthExceeded () const;
|
||||
size_t GetNumPeers () const { return m_Peers.size (); };
|
||||
std::shared_ptr<const i2p::data::RouterInfo> GetRandomPeer () const;
|
||||
|
||||
private:
|
||||
|
||||
void Run ();
|
||||
void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident);
|
||||
void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident);
|
||||
void PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs);
|
||||
void PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router);
|
||||
bool ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer);
|
||||
void HandlePeerCleanupTimer (const boost::system::error_code& ecode);
|
||||
|
||||
void NTCPResolve (const std::string& addr, const i2p::data::IdentHash& ident);
|
||||
void HandleNTCPResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it,
|
||||
i2p::data::IdentHash ident, std::shared_ptr<boost::asio::ip::tcp::resolver> resolver);
|
||||
|
||||
void UpdateBandwidth ();
|
||||
void DetectExternalIP ();
|
||||
|
||||
private:
|
||||
|
||||
bool m_IsRunning;
|
||||
std::thread * m_Thread;
|
||||
boost::asio::io_service m_Service;
|
||||
boost::asio::io_service::work m_Work;
|
||||
boost::asio::deadline_timer m_PeerCleanupTimer;
|
||||
|
||||
NTCPServer * m_NTCPServer;
|
||||
SSUServer * m_SSUServer;
|
||||
std::map<i2p::data::IdentHash, Peer> m_Peers;
|
||||
|
||||
DHKeysPairSupplier m_DHKeysPairSupplier;
|
||||
|
||||
std::atomic<uint64_t> m_TotalSentBytes, m_TotalReceivedBytes;
|
||||
uint32_t m_InBandwidth, m_OutBandwidth;
|
||||
uint64_t m_LastInBandwidthUpdateBytes, m_LastOutBandwidthUpdateBytes;
|
||||
uint64_t m_LastBandwidthUpdateTime;
|
||||
|
||||
#ifdef USE_UPNP
|
||||
UPnP m_UPnP;
|
||||
#endif
|
||||
|
||||
public:
|
||||
|
||||
// for HTTP only
|
||||
const NTCPServer * GetNTCPServer () const { return m_NTCPServer; };
|
||||
const SSUServer * GetSSUServer () const { return m_SSUServer; };
|
||||
const decltype(m_Peers)& GetPeers () const { return m_Peers; };
|
||||
};
|
||||
|
||||
extern Transports transports;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
Loading…
Add table
Add a link
Reference in a new issue