Merge branch 'transport_failsafe' into restricted_routes

This commit is contained in:
Jeff Becker 2016-06-18 09:01:53 -04:00
commit 004a93a841
No known key found for this signature in database
GPG key ID: AB950234D6EA286B
15 changed files with 174 additions and 103 deletions

View file

@ -46,7 +46,7 @@ namespace transport
int num;
while ((num = m_QueueSize - m_Queue.size ()) > 0)
CreateDHKeysPairs (num);
std::unique_lock<std::mutex> l(m_AcquiredMutex);
std::unique_lock<std::mutex> l(m_AcquiredMutex);
m_Acquired.wait (l); // wait for element gets aquired
}
}
@ -60,7 +60,7 @@ namespace transport
{
auto pair = std::make_shared<i2p::crypto::DHKeys> ();
pair->GenerateKeys ();
std::unique_lock<std::mutex> l(m_AcquiredMutex);
std::unique_lock<std::mutex> l(m_AcquiredMutex);
m_Queue.push (pair);
}
}
@ -69,7 +69,7 @@ namespace transport
std::shared_ptr<i2p::crypto::DHKeys> DHKeysPairSupplier::Acquire ()
{
{
std::unique_lock<std::mutex> l(m_AcquiredMutex);
std::unique_lock<std::mutex> l(m_AcquiredMutex);
if (!m_Queue.empty ())
{
auto pair = m_Queue.front ();
@ -86,7 +86,7 @@ namespace transport
void DHKeysPairSupplier::Return (std::shared_ptr<i2p::crypto::DHKeys> pair)
{
std::unique_lock<std::mutex> l(m_AcquiredMutex);
std::unique_lock<std::mutex> l(m_AcquiredMutex);
m_Queue.push (pair);
}
@ -105,7 +105,7 @@ namespace transport
Stop ();
}
void Transports::Start ()
void Transports::Start (bool enableNTCP, bool enableSSU)
{
m_DHKeysPairSupplier.Start ();
m_IsRunning = true;
@ -114,19 +114,33 @@ namespace transport
auto& addresses = context.GetRouterInfo ().GetAddresses ();
for (auto address : addresses)
{
if (!m_NTCPServer)
{
if (!m_NTCPServer && enableNTCP)
{
m_NTCPServer = new NTCPServer ();
m_NTCPServer->Start ();
if (!(m_NTCPServer->IsBoundV6() || m_NTCPServer->IsBoundV4())) {
/** failed to bind to NTCP */
LogPrint(eLogError, "Transports: failed to bind to TCP");
m_NTCPServer->Stop();
delete m_NTCPServer;
m_NTCPServer = nullptr;
}
}
if (address->transportStyle == RouterInfo::eTransportSSU && address->host.is_v4 ())
{
if (!m_SSUServer)
if (!m_SSUServer && enableSSU)
{
m_SSUServer = new SSUServer (address->port);
LogPrint (eLogInfo, "Transports: Start listening UDP port ", address->port);
m_SSUServer->Start ();
try {
m_SSUServer->Start ();
} catch ( std::exception & ex ) {
LogPrint(eLogError, "Transports: Failed to bind to UDP port", address->port);
delete m_SSUServer;
m_SSUServer = nullptr;
continue;
}
DetectExternalIP ();
}
else
@ -206,7 +220,7 @@ namespace transport
void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg)
{
SendMessages (ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > {msg });
SendMessages (ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > {msg });
}
void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs)
@ -231,7 +245,7 @@ namespace transport
{
auto r = netdb.FindRouter (ident);
{
std::unique_lock<std::mutex> l(m_PeersMutex);
std::unique_lock<std::mutex> l(m_PeersMutex);
it = m_Peers.insert (std::pair<i2p::data::IdentHash, Peer>(ident, { 0, r, {},
i2p::util::GetSecondsSinceEpoch (), {} })).first;
}
@ -288,7 +302,7 @@ namespace transport
}
}
else
LogPrint (eLogWarning, "Transports: NTCP address is not present for ", i2p::data::GetIdentHashAbbreviation (ident), ", trying SSU");
LogPrint (eLogDebug, "Transports: NTCP address is not present for ", i2p::data::GetIdentHashAbbreviation (ident), ", trying SSU");
}
if (peer.numAttempts == 1)// SSU
{
@ -320,7 +334,7 @@ namespace transport
}
LogPrint (eLogError, "Transports: No NTCP or SSU addresses available");
peer.Done ();
std::unique_lock<std::mutex> l(m_PeersMutex);
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (ident);
return false;
}
@ -352,7 +366,7 @@ namespace transport
else
{
LogPrint (eLogError, "Transports: RouterInfo not found, Failed to send messages");
std::unique_lock<std::mutex> l(m_PeersMutex);
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (it);
}
}
@ -396,7 +410,7 @@ namespace transport
}
}
LogPrint (eLogError, "Transports: Unable to resolve NTCP address: ", ecode.message ());
std::unique_lock<std::mutex> l(m_PeersMutex);
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (it1);
}
}
@ -438,7 +452,7 @@ namespace transport
}
}
LogPrint (eLogError, "Transports: Unable to resolve SSU address: ", ecode.message ());
std::unique_lock<std::mutex> l(m_PeersMutex);
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (it1);
}
}
@ -446,7 +460,7 @@ namespace transport
void Transports::CloseSession (std::shared_ptr<const i2p::data::RouterInfo> router)
{
if (!router) return;
m_Service.post (std::bind (&Transports::PostCloseSession, this, router));
m_Service.post (std::bind (&Transports::PostCloseSession, this, router));
}
void Transports::PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router)
@ -458,6 +472,12 @@ namespace transport
LogPrint (eLogDebug, "Transports: SSU session closed");
}
// TODO: delete NTCP
auto ntcpSession = m_NTCPServer ? m_NTCPServer->FindNTCPSession(router->GetIdentHash()) : nullptr;
if (ntcpSession)
{
m_NTCPServer->RemoveNTCPSession(ntcpSession);
LogPrint(eLogDebug, "Transports: NTCP session closed");
}
}
void Transports::DetectExternalIP ()
@ -468,14 +488,14 @@ namespace transport
for (int i = 0; i < 5; i++)
{
auto router = i2p::data::netdb.GetRandomPeerTestRouter ();
if (router && router->IsSSU (!context.SupportsV6 ()))
m_SSUServer->CreateSession (router, true); // peer test
if (router && router->IsSSU (!context.SupportsV6 ()))
m_SSUServer->CreateSession (router, true); // peer test
else
{
// if not peer test capable routers found pick any
router = i2p::data::netdb.GetRandomRouter ();
if (router && router->IsSSU ())
m_SSUServer->CreateSession (router); // no peer test
m_SSUServer->CreateSession (router); // no peer test
}
}
}
@ -498,7 +518,7 @@ namespace transport
statusChanged = true;
i2p::context.SetStatus (eRouterStatusTesting); // first time only
}
m_SSUServer->CreateSession (router, true); // peer test
m_SSUServer->CreateSession (router, true); // peer test
}
}
}
@ -517,7 +537,7 @@ namespace transport
void Transports::PeerConnected (std::shared_ptr<TransportSession> session)
{
m_Service.post([session, this]()
{
{
auto remoteIdentity = session->GetRemoteIdentity ();
if (!remoteIdentity) return;
auto ident = remoteIdentity->GetIdentHash ();
@ -530,7 +550,7 @@ namespace transport
// check if first message is our DatabaseStore (publishing)
auto firstMsg = it->second.delayedMessages[0];
if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore &&
i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ())
i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ())
sendDatabaseStore = false; // we have it in the list already
}
if (sendDatabaseStore)
@ -542,7 +562,7 @@ namespace transport
else // incoming connection
{
session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore
std::unique_lock<std::mutex> l(m_PeersMutex);
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.insert (std::make_pair (ident, Peer{ 0, nullptr, { session }, i2p::util::GetSecondsSinceEpoch (), {} }));
}
});
@ -551,7 +571,7 @@ namespace transport
void Transports::PeerDisconnected (std::shared_ptr<TransportSession> session)
{
m_Service.post([session, this]()
{
{
auto remoteIdentity = session->GetRemoteIdentity ();
if (!remoteIdentity) return;
auto ident = remoteIdentity->GetIdentHash ();
@ -565,7 +585,7 @@ namespace transport
ConnectToPeer (ident, it->second);
else
{
std::unique_lock<std::mutex> l(m_PeersMutex);
std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.erase (it);
}
}
@ -590,14 +610,14 @@ namespace transport
if (it->second.sessions.empty () && ts > it->second.creationTime + SESSION_CREATION_TIMEOUT)
{
LogPrint (eLogWarning, "Transports: Session to peer ", it->first.ToBase64 (), " has not been created in ", SESSION_CREATION_TIMEOUT, " seconds");
std::unique_lock<std::mutex> l(m_PeersMutex);
std::unique_lock<std::mutex> l(m_PeersMutex);
it = m_Peers.erase (it);
}
else
it++;
}
UpdateBandwidth (); // TODO: use separate timer(s) for it
if (i2p::context.GetStatus () == eRouterStatusTesting) // if still testing, repeat peer test
if (i2p::context.GetStatus () == eRouterStatusTesting) // if still testing, repeat peer test
DetectExternalIP ();
m_PeerCleanupTimer.expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT));
m_PeerCleanupTimer.async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1));