moved current outbound tunnel to destination

This commit is contained in:
orignal 2014-10-07 10:33:17 -04:00
parent 3b90aa2fe1
commit 3de29143bc
6 changed files with 47 additions and 39 deletions

View file

@ -11,7 +11,7 @@ namespace i2p
namespace stream namespace stream
{ {
StreamingDestination::StreamingDestination (boost::asio::io_service& service, bool isPublic): StreamingDestination::StreamingDestination (boost::asio::io_service& service, bool isPublic):
m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic) m_Service (service), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{ {
m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
@ -22,7 +22,7 @@ namespace stream
} }
StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic): StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic):
m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic) m_Service (service), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{ {
std::ifstream s(fullPath.c_str (), std::ifstream::binary); std::ifstream s(fullPath.c_str (), std::ifstream::binary);
if (s.is_open ()) if (s.is_open ())
@ -56,7 +56,7 @@ namespace stream
} }
StreamingDestination::StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic): StreamingDestination::StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic):
m_Service (service), m_Keys (keys), m_LeaseSet (nullptr), m_IsPublic (isPublic) m_Service (service), m_Keys (keys), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{ {
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
@ -78,6 +78,19 @@ namespace stream
delete m_LeaseSet; delete m_LeaseSet;
} }
void StreamingDestination::SendTunnelDataMsgs (const std::vector<i2p::tunnel::TunnelMessageBlock>& msgs)
{
m_CurrentOutboundTunnel = m_Pool->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
if (m_CurrentOutboundTunnel)
m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs);
else
{
LogPrint ("No outbound tunnels in the pool");
for (auto it: msgs)
DeleteI2NPMessage (it.data);
}
}
void StreamingDestination::HandleNextPacket (Packet * packet) void StreamingDestination::HandleNextPacket (Packet * packet)
{ {
uint32_t sendStreamID = packet->GetSendStreamID (); uint32_t sendStreamID = packet->GetSendStreamID ();

View file

@ -32,6 +32,7 @@ namespace stream
void ResetAcceptor () { m_Acceptor = nullptr; }; void ResetAcceptor () { m_Acceptor = nullptr; };
bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
void HandleNextPacket (Packet * packet); void HandleNextPacket (Packet * packet);
void SendTunnelDataMsgs (const std::vector<i2p::tunnel::TunnelMessageBlock>& msgs);
// implements LocalDestination // implements LocalDestination
const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; }; const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; };
@ -39,6 +40,7 @@ namespace stream
const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; }; const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; };
void SetLeaseSetUpdated (); void SetLeaseSetUpdated ();
void HandleDataMessage (const uint8_t * buf, size_t len); void HandleDataMessage (const uint8_t * buf, size_t len);
void ResetCurrentOutboundTunnel () { m_CurrentOutboundTunnel = nullptr; };
private: private:
@ -54,6 +56,7 @@ namespace stream
uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256]; uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256];
i2p::tunnel::TunnelPool * m_Pool; i2p::tunnel::TunnelPool * m_Pool;
i2p::tunnel::OutboundTunnel * m_CurrentOutboundTunnel;
i2p::data::LeaseSet * m_LeaseSet; i2p::data::LeaseSet * m_LeaseSet;
bool m_IsPublic; bool m_IsPublic;

View file

@ -14,8 +14,7 @@ namespace stream
const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0),
m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false),
m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (&remote), m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (&remote),
m_RoutingSession (nullptr), m_CurrentOutboundTunnel (nullptr), m_RoutingSession (nullptr), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service)
m_ReceiveTimer (m_Service), m_ResendTimer (m_Service)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease (); UpdateCurrentRemoteLease ();
@ -24,7 +23,7 @@ namespace stream
Stream::Stream (boost::asio::io_service& service, StreamingDestination * local): Stream::Stream (boost::asio::io_service& service, StreamingDestination * local):
m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local),
m_RemoteLeaseSet (nullptr), m_RoutingSession (nullptr), m_CurrentOutboundTunnel (nullptr), m_RemoteLeaseSet (nullptr), m_RoutingSession (nullptr),
m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) m_ReceiveTimer (m_Service), m_ResendTimer (m_Service)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
@ -103,7 +102,7 @@ namespace stream
{ {
// we have received duplicate. Most likely our outbound tunnel is dead // we have received duplicate. Most likely our outbound tunnel is dead
LogPrint ("Duplicate message ", receivedSeqn, " received"); LogPrint ("Duplicate message ", receivedSeqn, " received");
m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel m_LocalDestination->ResetCurrentOutboundTunnel (); // pick another outbound tunnel
UpdateCurrentRemoteLease (); // pick another lease UpdateCurrentRemoteLease (); // pick another lease
SendQuickAck (); // resend ack for previous message again SendQuickAck (); // resend ack for previous message again
delete packet; // packet dropped delete packet; // packet dropped
@ -418,35 +417,29 @@ namespace stream
m_LeaseSetUpdated = false; m_LeaseSetUpdated = false;
} }
m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
if (m_CurrentOutboundTunnel) if (ts >= m_CurrentRemoteLease.endDate)
{ UpdateCurrentRemoteLease ();
auto ts = i2p::util::GetMillisecondsSinceEpoch (); if (ts < m_CurrentRemoteLease.endDate)
if (ts >= m_CurrentRemoteLease.endDate) {
UpdateCurrentRemoteLease (); std::vector<i2p::tunnel::TunnelMessageBlock> msgs;
if (ts < m_CurrentRemoteLease.endDate) for (auto it: packets)
{ {
std::vector<i2p::tunnel::TunnelMessageBlock> msgs; auto msg = m_RoutingSession->WrapSingleMessage (
for (auto it: packets) CreateDataMessage (this, it->GetBuffer (), it->GetLength ()),
{ leaseSet);
auto msg = m_RoutingSession->WrapSingleMessage ( msgs.push_back (i2p::tunnel::TunnelMessageBlock
CreateDataMessage (this, it->GetBuffer (), it->GetLength ()), {
leaseSet); i2p::tunnel::eDeliveryTypeTunnel,
msgs.push_back (i2p::tunnel::TunnelMessageBlock m_CurrentRemoteLease.tunnelGateway, m_CurrentRemoteLease.tunnelID,
{ msg
i2p::tunnel::eDeliveryTypeTunnel, });
m_CurrentRemoteLease.tunnelGateway, m_CurrentRemoteLease.tunnelID, leaseSet = nullptr; // send leaseSet only one time
msg }
}); m_LocalDestination->SendTunnelDataMsgs (msgs);
leaseSet = nullptr; // send leaseSet only one time
}
m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs);
}
else
LogPrint ("All leases are expired");
} }
else else
LogPrint ("No outbound tunnels in the pool"); LogPrint ("All leases are expired");
} }
void Stream::ScheduleResend () void Stream::ScheduleResend ()
@ -476,7 +469,7 @@ namespace stream
} }
if (packets.size () > 0) if (packets.size () > 0)
{ {
m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel m_LocalDestination->ResetCurrentOutboundTunnel (); // pick another outbound tunnel
UpdateCurrentRemoteLease (); // pick another lease UpdateCurrentRemoteLease (); // pick another lease
SendPackets (packets); SendPackets (packets);
} }

View file

@ -129,7 +129,6 @@ namespace stream
const i2p::data::LeaseSet * m_RemoteLeaseSet; const i2p::data::LeaseSet * m_RemoteLeaseSet;
i2p::garlic::GarlicRoutingSession * m_RoutingSession; i2p::garlic::GarlicRoutingSession * m_RoutingSession;
i2p::data::Lease m_CurrentRemoteLease; i2p::data::Lease m_CurrentRemoteLease;
i2p::tunnel::OutboundTunnel * m_CurrentOutboundTunnel;
std::queue<Packet *> m_ReceiveQueue; std::queue<Packet *> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets; std::set<Packet *, PacketCmp> m_SavedPackets;
std::set<Packet *, PacketCmp> m_SentPackets; std::set<Packet *, PacketCmp> m_SentPackets;

View file

@ -188,7 +188,7 @@ namespace tunnel
m_Gateway.SendTunnelDataMsg (block); m_Gateway.SendTunnelDataMsg (block);
} }
void OutboundTunnel::SendTunnelDataMsg (std::vector<TunnelMessageBlock> msgs) void OutboundTunnel::SendTunnelDataMsg (const std::vector<TunnelMessageBlock>& msgs)
{ {
std::unique_lock<std::mutex> l(m_SendMutex); std::unique_lock<std::mutex> l(m_SendMutex);
for (auto& it : msgs) for (auto& it : msgs)

View file

@ -78,7 +78,7 @@ namespace tunnel
OutboundTunnel (TunnelConfig * config): Tunnel (config), m_Gateway (this) {}; OutboundTunnel (TunnelConfig * config): Tunnel (config), m_Gateway (this) {};
void SendTunnelDataMsg (const uint8_t * gwHash, uint32_t gwTunnel, i2p::I2NPMessage * msg); void SendTunnelDataMsg (const uint8_t * gwHash, uint32_t gwTunnel, i2p::I2NPMessage * msg);
void SendTunnelDataMsg (std::vector<TunnelMessageBlock> msgs); // multiple messages void SendTunnelDataMsg (const std::vector<TunnelMessageBlock>& msgs); // multiple messages
const i2p::data::RouterInfo * GetEndpointRouter () const const i2p::data::RouterInfo * GetEndpointRouter () const
{ return GetTunnelConfig ()->GetLastHop ()->router; }; { return GetTunnelConfig ()->GetLastHop ()->router; };
size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); }; size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); };