send messages through Peer

This commit is contained in:
orignal 2015-01-13 21:31:39 -05:00
parent a3352ac1dc
commit 3481161616
2 changed files with 79 additions and 55 deletions

View file

@ -193,65 +193,82 @@ namespace transport
i2p::HandleI2NPMessage (msg); i2p::HandleI2NPMessage (msg);
return; return;
} }
std::shared_ptr<TransportSession> session = m_NTCPServer->FindNTCPSession (ident);
if (!session) auto it = m_Peers.find (ident);
if (it == m_Peers.end ())
{ {
auto r = netdb.FindRouter (ident); auto r = netdb.FindRouter (ident);
if (r) it = m_Peers.insert (std::pair<i2p::data::IdentHash, Peer>(ident, { 0, r, nullptr})).first;
{ if (!ConnectToPeer (ident, it->second))
if (m_SSUServer)
session = m_SSUServer->FindSession (r);
if (!session)
{
// existing session not found. create new
// try NTCP first if message size < 16K
auto address = r->GetNTCPAddress (!context.SupportsV6 ());
if (address && !r->UsesIntroducer () && !r->IsUnreachable () && msg->GetLength () < NTCP_MAX_MESSAGE_SIZE)
{
auto s = std::make_shared<NTCPSession> (*m_NTCPServer, r);
session = s;
m_NTCPServer->Connect (address->host, address->port, s);
}
else
{
// then SSU
if (m_SSUServer)
session = m_SSUServer->GetSession (r);
if (!session)
{
LogPrint ("No NTCP and SSU addresses available");
DeleteI2NPMessage (msg);
}
}
}
}
else
{ {
LogPrint ("Router not found. Requested"); DeleteI2NPMessage (msg);
i2p::data::netdb.RequestDestination (ident); return;
auto resendTimer = new boost::asio::deadline_timer (m_Service);
resendTimer->expires_from_now (boost::posix_time::seconds(5)); // 5 seconds
resendTimer->async_wait (boost::bind (&Transports::HandleResendTimer,
this, boost::asio::placeholders::error, resendTimer, ident, msg));
} }
} }
if (session) if (it->second.session)
session->SendI2NPMessage (msg); it->second.session->SendI2NPMessage (msg);
else
it->second.delayedMessages.push_back (msg);
} }
void Transports::HandleResendTimer (const boost::system::error_code& ecode, bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer)
boost::asio::deadline_timer * timer, const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg)
{ {
auto r = netdb.FindRouter (ident); if (peer.router) // we have RI already
if (r) {
{ if (!peer.numAttempts) // NTCP
LogPrint ("Router found. Sending message"); {
PostMessage (ident, msg); peer.numAttempts++;
auto address = peer.router->GetNTCPAddress (!context.SupportsV6 ());
if (address && !peer.router->UsesIntroducer () && !peer.router->IsUnreachable ())
{
auto s = std::make_shared<NTCPSession> (*m_NTCPServer, peer.router);
m_NTCPServer->Connect (address->host, address->port, s);
return true;
}
}
else if (peer.numAttempts == 1)// SSU
{
peer.numAttempts++;
if (m_SSUServer)
{
if (m_SSUServer->GetSession (peer.router))
return true;
}
}
LogPrint (eLogError, "No NTCP and SSU addresses available");
m_Peers.erase (ident);
return false;
} }
else else // otherwise request RI
{ {
LogPrint ("Router not found. Failed to send message"); LogPrint ("Router not found. Requested");
DeleteI2NPMessage (msg); i2p::data::netdb.RequestDestination (ident);
auto resendTimer = new boost::asio::deadline_timer (m_Service);
resendTimer->expires_from_now (boost::posix_time::seconds(5)); // 5 seconds
resendTimer->async_wait (boost::bind (&Transports::HandleResendTimer,
this, boost::asio::placeholders::error, resendTimer, ident));
}
return true;
}
void Transports::HandleResendTimer (const boost::system::error_code& ecode,
boost::asio::deadline_timer * timer, const i2p::data::IdentHash& ident)
{
auto it = m_Peers.find (ident);
if (it != m_Peers.end ())
{
auto r = netdb.FindRouter (ident);
if (r)
{
LogPrint ("Router found. Trying to connect");
it->second.router = r;
ConnectToPeer (ident, it->second);
}
else
{
LogPrint ("Router not found. Failed to send messages");
m_Peers.erase (it);
}
} }
delete timer; delete timer;
} }
@ -304,9 +321,10 @@ namespace transport
it->second.session = session; it->second.session = session;
for (auto it1: it->second.delayedMessages) for (auto it1: it->second.delayedMessages)
session->SendI2NPMessage (it1); session->SendI2NPMessage (it1);
it->second.delayedMessages.clear ();
} }
/* else // incoming connection else // incoming connection
m_Peers[ident] = { nullptr, session };*/ m_Peers[ident] = { 0, nullptr, session };
}); });
} }
@ -317,8 +335,12 @@ namespace transport
auto ident = session->GetRemoteIdentity ().GetIdentHash (); auto ident = session->GetRemoteIdentity ().GetIdentHash ();
auto it = m_Peers.find (ident); auto it = m_Peers.find (ident);
if (it != m_Peers.end ()) if (it != m_Peers.end ())
m_Peers.erase (it); {
// TODO:: check for delayed messages if (it->second.delayedMessages.size () > 0)
ConnectToPeer (ident, it->second);
else
m_Peers.erase (it);
}
}); });
} }
} }

View file

@ -53,6 +53,7 @@ namespace transport
struct Peer struct Peer
{ {
int numAttempts;
std::shared_ptr<const i2p::data::RouterInfo> router; std::shared_ptr<const i2p::data::RouterInfo> router;
std::shared_ptr<TransportSession> session; std::shared_ptr<TransportSession> session;
std::list<i2p::I2NPMessage *> delayedMessages; std::list<i2p::I2NPMessage *> delayedMessages;
@ -88,10 +89,11 @@ namespace transport
void Run (); void Run ();
void HandleResendTimer (const boost::system::error_code& ecode, boost::asio::deadline_timer * timer, void HandleResendTimer (const boost::system::error_code& ecode, boost::asio::deadline_timer * timer,
const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); const i2p::data::IdentHash& ident);
void PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); void PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg);
void PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router); void PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router);
bool ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer);
void DetectExternalIP (); void DetectExternalIP ();
private: private: