NetDb/NetDbRequests split

This commit is contained in:
orignal 2015-04-09 12:45:00 -04:00
parent 01913d2b14
commit 950f250d66
8 changed files with 256 additions and 182 deletions

162
NetDb.cpp
View file

@ -21,49 +21,6 @@ namespace i2p
{
namespace data
{
I2NPMessage * RequestedDestination::CreateRequestMessage (std::shared_ptr<const RouterInfo> router,
std::shared_ptr<const i2p::tunnel::InboundTunnel> replyTunnel)
{
I2NPMessage * msg = i2p::CreateRouterInfoDatabaseLookupMsg (m_Destination,
replyTunnel->GetNextIdentHash (), replyTunnel->GetNextTunnelID (), m_IsExploratory,
&m_ExcludedPeers);
m_ExcludedPeers.insert (router->GetIdentHash ());
m_CreationTime = i2p::util::GetSecondsSinceEpoch ();
return msg;
}
I2NPMessage * RequestedDestination::CreateRequestMessage (const IdentHash& floodfill)
{
I2NPMessage * msg = i2p::CreateRouterInfoDatabaseLookupMsg (m_Destination,
i2p::context.GetRouterInfo ().GetIdentHash () , 0, false, &m_ExcludedPeers);
m_ExcludedPeers.insert (floodfill);
m_CreationTime = i2p::util::GetSecondsSinceEpoch ();
return msg;
}
void RequestedDestination::ClearExcludedPeers ()
{
m_ExcludedPeers.clear ();
}
void RequestedDestination::Success (std::shared_ptr<RouterInfo> r)
{
if (m_RequestComplete)
{
m_RequestComplete (r);
m_RequestComplete = nullptr;
}
}
void RequestedDestination::Fail ()
{
if (m_RequestComplete)
{
m_RequestComplete (nullptr);
m_RequestComplete = nullptr;
}
}
#ifndef _WIN32
const char NetDb::m_NetDbPath[] = "/netDb";
#else
@ -120,7 +77,7 @@ namespace data
m_Thread = 0;
}
m_LeaseSets.clear();
m_RequestedDestinations.clear ();
m_Requests.Stop ();
}
void NetDb::Run ()
@ -164,7 +121,7 @@ namespace data
uint64_t ts = i2p::util::GetSecondsSinceEpoch ();
if (ts - lastManageRequest >= 15) // manage requests every 15 seconds
{
ManageRequests ();
m_Requests.ManageRequests ();
lastManageRequest = ts;
}
if (ts - lastSave >= 60) // save routers, manage leasesets and validate subscriptions every minute
@ -189,7 +146,7 @@ namespace data
numRouters = 800/numRouters;
if (numRouters < 1) numRouters = 1;
if (numRouters > 9) numRouters = 9;
ManageRequests ();
m_Requests.ManageRequests ();
Explore (numRouters);
lastExploratory = ts;
}
@ -234,13 +191,7 @@ namespace data
}
}
// take care about requested destination
auto it = m_RequestedDestinations.find (ident);
if (it != m_RequestedDestinations.end ())
{
it->second->Success (r);
std::unique_lock<std::mutex> l(m_RequestedDestinationsMutex);
m_RequestedDestinations.erase (it);
}
m_Requests.RequestComplete (ident, r);
}
void NetDb::AddLeaseSet (const IdentHash& ident, const uint8_t * buf, int len,
@ -487,28 +438,20 @@ namespace data
void NetDb::RequestDestination (const IdentHash& destination, RequestedDestination::RequestComplete requestComplete)
{
// request RouterInfo directly
auto dest = new RequestedDestination (destination, false); // non-exploratory
dest->SetRequestComplete (requestComplete);
auto dest = m_Requests.CreateRequest (destination, false, requestComplete); // non-exploratory
if (!dest)
{
std::unique_lock<std::mutex> l(m_RequestedDestinationsMutex);
if (!m_RequestedDestinations.insert (std::make_pair (destination,
std::unique_ptr<RequestedDestination> (dest))).second) // not inserted
{
LogPrint (eLogWarning, "Destination ", destination.ToBase64(), " is requested already");
return;
}
LogPrint (eLogWarning, "Destination ", destination.ToBase64(), " is requested already");
return;
}
auto floodfill = GetClosestFloodfill (destination, dest->GetExcludedPeers ());
auto floodfill = netdb.GetClosestFloodfill (destination, dest->GetExcludedPeers ());
if (floodfill)
transports.SendMessage (floodfill->GetIdentHash (), dest->CreateRequestMessage (floodfill->GetIdentHash ()));
else
{
LogPrint (eLogError, "No floodfills found");
dest->Fail ();
std::unique_lock<std::mutex> l(m_RequestedDestinationsMutex);
m_RequestedDestinations.erase (destination);
m_Requests.RequestComplete (destination, nullptr);
}
}
@ -609,10 +552,10 @@ namespace data
key[l] = 0;
int num = buf[32]; // num
LogPrint ("DatabaseSearchReply for ", key, " num=", num);
auto it = m_RequestedDestinations.find (IdentHash (buf));
if (it != m_RequestedDestinations.end ())
IdentHash ident (buf);
auto dest = m_Requests.FindRequest (ident);
if (dest)
{
auto& dest = it->second;
bool deleteDest = true;
if (num > 0)
{
@ -659,18 +602,12 @@ namespace data
}
if (deleteDest)
{
// no more requests for the destinationation. delete it
it->second->Fail ();
m_RequestedDestinations.erase (it);
}
m_Requests.RequestComplete (ident, nullptr);
}
else
{
// no more requests for detination possible. delete it
it->second->Fail ();
m_RequestedDestinations.erase (it);
}
m_Requests.RequestComplete (ident, nullptr);
}
else
LogPrint ("Requested destination for ", key, " not found");
@ -832,15 +769,11 @@ namespace data
for (int i = 0; i < numDestinations; i++)
{
rnd.GenerateBlock (randomHash, 32);
auto dest = new RequestedDestination (randomHash, true); // exploratory
{
std::unique_lock<std::mutex> l(m_RequestedDestinationsMutex);
if (!m_RequestedDestinations.insert (std::make_pair (randomHash,
std::unique_ptr<RequestedDestination> (dest))).second) // not inserted
{
LogPrint (eLogWarning, "Exploratory destination is requested already");
return;
}
auto dest = m_Requests.CreateRequest (randomHash, true); // exploratory
if (!dest)
{
LogPrint (eLogWarning, "Exploratory destination is requested already");
return;
}
auto floodfill = GetClosestFloodfill (randomHash, dest->GetExcludedPeers ());
if (floodfill && !floodfills.count (floodfill.get ())) // request floodfill only once
@ -867,10 +800,7 @@ namespace data
i2p::transport::transports.SendMessage (floodfill->GetIdentHash (), dest->CreateRequestMessage (floodfill->GetIdentHash ()));
}
else
{
std::unique_lock<std::mutex> l(m_RequestedDestinationsMutex);
m_RequestedDestinations.erase (dest->GetDestination ());
}
m_Requests.RequestComplete (randomHash, nullptr);
}
if (throughTunnels && msgs.size () > 0)
outbound->SendTunnelDataMsg (msgs);
@ -1028,53 +958,5 @@ namespace data
it++;
}
}
void NetDb::ManageRequests ()
{
uint64_t ts = i2p::util::GetSecondsSinceEpoch ();
std::unique_lock<std::mutex> l(m_RequestedDestinationsMutex);
for (auto it = m_RequestedDestinations.begin (); it != m_RequestedDestinations.end ();)
{
auto& dest = it->second;
bool done = false;
if (ts < dest->GetCreationTime () + 60) // request is worthless after 1 minute
{
if (ts > dest->GetCreationTime () + 5) // no response for 5 seconds
{
auto count = dest->GetExcludedPeers ().size ();
if (!dest->IsExploratory () && count < 7)
{
auto pool = i2p::tunnel::tunnels.GetExploratoryPool ();
auto outbound = pool->GetNextOutboundTunnel ();
auto inbound = pool->GetNextInboundTunnel ();
auto nextFloodfill = GetClosestFloodfill (dest->GetDestination (), dest->GetExcludedPeers ());
if (nextFloodfill && outbound && inbound)
outbound->SendTunnelDataMsg (nextFloodfill->GetIdentHash (), 0,
dest->CreateRequestMessage (nextFloodfill, inbound));
else
{
done = true;
if (!inbound) LogPrint (eLogWarning, "No inbound tunnels");
if (!outbound) LogPrint (eLogWarning, "No outbound tunnels");
if (!nextFloodfill) LogPrint (eLogWarning, "No more floodfills");
}
}
else
{
if (!dest->IsExploratory ())
LogPrint (eLogWarning, dest->GetDestination ().ToBase64 (), " not found after 7 attempts");
done = true;
}
}
}
else // delete obsolete request
done = true;
if (done)
it = m_RequestedDestinations.erase (it);
else
it++;
}
}
}
}