Making NetDb and Profiling use generic storage

This commit is contained in:
hakunamtu 2018-03-05 14:02:20 +03:00
parent 8cf18fd2e3
commit 262d22b903
4 changed files with 107 additions and 60 deletions

View file

@ -25,7 +25,7 @@ namespace data
{ {
NetDb netdb; NetDb netdb;
NetDb::NetDb (): m_IsRunning (false), m_Thread (nullptr), m_Reseeder (nullptr), m_Storage("netDb", "r", "routerInfo-", "dat"), m_FloodfillBootstrap(nullptr), m_HiddenMode(false) NetDb::NetDb (): m_IsRunning (false), m_Thread (nullptr), m_Reseeder (nullptr), m_Storage(nullptr), m_FloodfillBootstrap(nullptr), m_HiddenMode(false)
{ {
} }
@ -37,8 +37,8 @@ namespace data
void NetDb::Start () void NetDb::Start ()
{ {
m_Storage.SetPlace(i2p::fs::GetDataDir()); m_Storage.reset(new FsIdentStorage("netDb", "r", "routerInfo-", "dat"));
m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64); m_Storage->Init();
InitProfilesStorage (); InitProfilesStorage ();
m_Families.LoadCertificates (); m_Families.LoadCertificates ();
Load (); Load ();
@ -54,10 +54,16 @@ namespace data
void NetDb::Stop () void NetDb::Stop ()
{ {
if (m_IsRunning) if (m_IsRunning)
{
if(BeginProfilesStorageUpdate()) //TODO Error handling
{ {
for (auto& it: m_RouterInfos) for (auto& it: m_RouterInfos)
it.second->SaveProfile (); it.second->SaveProfile ();
DeleteObsoleteProfiles (); DeleteObsoleteProfiles();
EndProfilesStorageUpdate();
}
m_Storage->DeInit();
DeInitProfilesStorage();
m_RouterInfos.clear (); m_RouterInfos.clear ();
m_Floodfills.clear (); m_Floodfills.clear ();
if (m_Thread) if (m_Thread)
@ -360,9 +366,9 @@ namespace data
i2p::transport::transports.SendMessages(ih, requests); i2p::transport::transports.SendMessages(ih, requests);
} }
bool NetDb::LoadRouterInfo (const std::string & path) bool NetDb::LoadRouterInfo (const char *buffer, size_t len)
{ {
auto r = std::make_shared<RouterInfo>(path); auto r = std::make_shared<RouterInfo>((uint8_t *)buffer, len, false);
if (r->GetRouterIdentity () && !r->IsUnreachable () && if (r->GetRouterIdentity () && !r->IsUnreachable () &&
(!r->UsesIntroducer () || m_LastLoad < r->GetTimestamp () + NETDB_INTRODUCEE_EXPIRATION_TIMEOUT*1000LL)) // 1 hour (!r->UsesIntroducer () || m_LastLoad < r->GetTimestamp () + NETDB_INTRODUCEE_EXPIRATION_TIMEOUT*1000LL)) // 1 hour
{ {
@ -374,8 +380,7 @@ namespace data
} }
else else
{ {
LogPrint(eLogWarning, "NetDb: RI from ", path, " is invalid. Delete"); LogPrint(eLogWarning, "NetDb: RI from ", r->GetIdentHash(), " is invalid.");
i2p::fs::Remove(path);
} }
return true; return true;
} }
@ -387,14 +392,6 @@ namespace data
v(entry.first, entry.second); v(entry.first, entry.second);
} }
void NetDb::VisitStoredRouterInfos(RouterInfoVisitor v)
{
m_Storage.Iterate([v] (const std::string & filename) {
auto ri = std::make_shared<i2p::data::RouterInfo>(filename);
v(ri);
});
}
void NetDb::VisitRouterInfos(RouterInfoVisitor v) void NetDb::VisitRouterInfos(RouterInfoVisitor v)
{ {
std::unique_lock<std::mutex> lock(m_RouterInfosMutex); std::unique_lock<std::mutex> lock(m_RouterInfosMutex);
@ -455,10 +452,16 @@ namespace data
m_Floodfills.clear (); m_Floodfills.clear ();
m_LastLoad = i2p::util::GetSecondsSinceEpoch(); m_LastLoad = i2p::util::GetSecondsSinceEpoch();
std::vector<std::string> files; m_Storage->BeginUpdate();
m_Storage.Traverse(files); m_Storage->Iterate([&](const IdentHash &ident, const StorageRecord &record)
for (const auto& path : files) {
LoadRouterInfo(path); if (!LoadRouterInfo(record.data.get(), record.len))
{
if (m_Storage->Remove(ident))
LogPrint(eLogInfo, "Deleting invalid RI from ", ident.ToBase64());
}
});
m_Storage->EndUpdate();
LogPrint (eLogInfo, "NetDb: ", m_RouterInfos.size(), " routers loaded (", m_Floodfills.size (), " floodfils)"); LogPrint (eLogInfo, "NetDb: ", m_RouterInfos.size(), " routers loaded (", m_Floodfills.size (), " floodfils)");
} }
@ -475,13 +478,18 @@ namespace data
expirationTimeout = i2p::context.IsFloodfill () ? NETDB_FLOODFILL_EXPIRATION_TIMEOUT*1000LL : expirationTimeout = i2p::context.IsFloodfill () ? NETDB_FLOODFILL_EXPIRATION_TIMEOUT*1000LL :
NETDB_MIN_EXPIRATION_TIMEOUT*1000LL + (NETDB_MAX_EXPIRATION_TIMEOUT - NETDB_MIN_EXPIRATION_TIMEOUT)*1000LL*NETDB_MIN_ROUTERS/total; NETDB_MIN_EXPIRATION_TIMEOUT*1000LL + (NETDB_MAX_EXPIRATION_TIMEOUT - NETDB_MIN_EXPIRATION_TIMEOUT)*1000LL*NETDB_MIN_ROUTERS/total;
m_Storage->BeginUpdate();
for (auto& it: m_RouterInfos) for (auto& it: m_RouterInfos)
{ {
std::string ident = it.second->GetIdentHashBase64(); IdentHash ident = it.second->GetIdentHash();
std::string path = m_Storage.Path(ident);
if (it.second->IsUpdated ()) if (it.second->IsUpdated ())
{ {
it.second->SaveToFile (path); if (it.second->GetBuffer()) {
StorageRecord record((char *)it.second->GetBuffer(), it.second->GetBufferLen());
m_Storage->Store(ident, record);
} else {
LogPrint (eLogError, "RouterInfo: Can't save, m_Buffer == NULL, Bleat!", it.second->GetIdentHashBase64());
}
it.second->SetUpdated (false); it.second->SetUpdated (false);
it.second->SetUnreachable (false); it.second->SetUnreachable (false);
it.second->DeleteBuffer (); it.second->DeleteBuffer ();
@ -501,17 +509,19 @@ namespace data
if (it.second->IsUnreachable ()) if (it.second->IsUnreachable ())
{ {
// delete RI file // delete RI file
m_Storage.Remove(ident); m_Storage->Remove(ident);
deletedCount++; deletedCount++;
if (total - deletedCount < NETDB_MIN_ROUTERS) checkForExpiration = false; if (total - deletedCount < NETDB_MIN_ROUTERS) checkForExpiration = false;
} }
} // m_RouterInfos iteration } // m_RouterInfos iteration
m_Storage->EndUpdate();
if (updatedCount > 0) if (updatedCount > 0)
LogPrint (eLogInfo, "NetDb: saved ", updatedCount, " new/updated routers"); LogPrint (eLogInfo, "NetDb: saved ", updatedCount, " new/updated routers");
if (deletedCount > 0) if (deletedCount > 0)
{ {
LogPrint (eLogInfo, "NetDb: deleting ", deletedCount, " unreachable routers"); LogPrint (eLogInfo, "NetDb: deleting ", deletedCount, " unreachable routers");
BeginProfilesStorageUpdate(); //TODO: Error handling
// clean up RouterInfos table // clean up RouterInfos table
{ {
std::unique_lock<std::mutex> l(m_RouterInfosMutex); std::unique_lock<std::mutex> l(m_RouterInfosMutex);
@ -526,6 +536,7 @@ namespace data
++it; ++it;
} }
} }
EndProfilesStorageUpdate(); //TODO: Error handling
// clean up expired floodfiils // clean up expired floodfiils
{ {
std::unique_lock<std::mutex> l(m_FloodfillsMutex); std::unique_lock<std::mutex> l(m_FloodfillsMutex);
@ -829,7 +840,11 @@ namespace data
if (router) if (router)
{ {
LogPrint (eLogDebug, "NetDb: requested RouterInfo ", key, " found"); LogPrint (eLogDebug, "NetDb: requested RouterInfo ", key, " found");
router->LoadBuffer (); if (!router->GetBuffer())
{
StorageRecord record = m_Storage->Fetch(router->GetIdentHash());
router->LoadBuffer((const uint8_t*)record.data.get(), record.len);
}
if (router->GetBuffer ()) if (router->GetBuffer ())
replyMsg = CreateDatabaseStoreMsg (router); replyMsg = CreateDatabaseStoreMsg (router);
} }

View file

@ -7,6 +7,7 @@
#include <list> #include <list>
#include <string> #include <string>
#include <thread> #include <thread>
#include <memory>
#include <mutex> #include <mutex>
#include "Base.h" #include "Base.h"
@ -19,6 +20,7 @@
#include "Tunnel.h" #include "Tunnel.h"
#include "TunnelPool.h" #include "TunnelPool.h"
#include "Reseed.h" #include "Reseed.h"
#include "Storage.h"
#include "NetDbRequests.h" #include "NetDbRequests.h"
#include "Family.h" #include "Family.h"
@ -93,8 +95,6 @@ namespace data
/** visit all lease sets we currently store */ /** visit all lease sets we currently store */
void VisitLeaseSets(LeaseSetVisitor v); void VisitLeaseSets(LeaseSetVisitor v);
/** visit all router infos we have currently on disk, usually insanely expensive, does not access in memory RI */
void VisitStoredRouterInfos(RouterInfoVisitor v);
/** visit all router infos we have loaded in memory, cheaper than VisitLocalRouterInfos but locks access while visiting */ /** visit all router infos we have loaded in memory, cheaper than VisitLocalRouterInfos but locks access while visiting */
void VisitRouterInfos(RouterInfoVisitor v); void VisitRouterInfos(RouterInfoVisitor v);
/** visit N random router that match using filter, then visit them with a visitor, return number of RouterInfos that were visited */ /** visit N random router that match using filter, then visit them with a visitor, return number of RouterInfos that were visited */
@ -105,7 +105,7 @@ namespace data
private: private:
void Load (); void Load ();
bool LoadRouterInfo (const std::string & path); bool LoadRouterInfo (const char *buffer, size_t len);
void SaveUpdated (); void SaveUpdated ();
void Run (); // exploratory thread void Run (); // exploratory thread
void Explore (int numDestinations); void Explore (int numDestinations);
@ -135,7 +135,7 @@ namespace data
GzipInflator m_Inflator; GzipInflator m_Inflator;
Reseeder * m_Reseeder; Reseeder * m_Reseeder;
Families m_Families; Families m_Families;
i2p::fs::HashedStorage m_Storage; std::unique_ptr<IdentStorage> m_Storage;
friend class NetDbRequests; friend class NetDbRequests;
NetDbRequests m_Requests; NetDbRequests m_Requests;

View file

@ -1,4 +1,4 @@
#include <sys/stat.h> #include <memory>
#include <boost/property_tree/ptree.hpp> #include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp> #include <boost/property_tree/ini_parser.hpp>
#include "Base.h" #include "Base.h"
@ -10,7 +10,7 @@ namespace i2p
{ {
namespace data namespace data
{ {
i2p::fs::HashedStorage m_ProfilesStorage("peerProfiles", "p", "profile-", "txt"); std::unique_ptr<IdentStorage> m_ProfilesStorage(nullptr);
RouterProfile::RouterProfile (): RouterProfile::RouterProfile ():
m_LastUpdateTime (boost::posix_time::second_clock::local_time()), m_LastUpdateTime (boost::posix_time::second_clock::local_time()),
@ -45,12 +45,15 @@ namespace data
pt.put_child (PEER_PROFILE_SECTION_PARTICIPATION, participation); pt.put_child (PEER_PROFILE_SECTION_PARTICIPATION, participation);
pt.put_child (PEER_PROFILE_SECTION_USAGE, usage); pt.put_child (PEER_PROFILE_SECTION_USAGE, usage);
// save to file // save to string stream
std::string ident = identHash.ToBase64 (); std::string ident = identHash.ToBase64 ();
std::string path = m_ProfilesStorage.Path(ident); std::ostringstream ss;
try { try {
boost::property_tree::write_ini (path, pt); boost::property_tree::write_ini (ss, pt);
std::string str = ss.str();
StorageRecord record(str.c_str(), str.length());
m_ProfilesStorage->Store(identHash, record);
} catch (std::exception& ex) { } catch (std::exception& ex) {
/* boost exception verbose enough */ /* boost exception verbose enough */
LogPrint (eLogError, "Profiling: ", ex.what ()); LogPrint (eLogError, "Profiling: ", ex.what ());
@ -59,19 +62,20 @@ namespace data
void RouterProfile::Load (const IdentHash& identHash) void RouterProfile::Load (const IdentHash& identHash)
{ {
std::string ident = identHash.ToBase64 (); StorageRecord record = m_ProfilesStorage->Fetch(identHash);
std::string path = m_ProfilesStorage.Path(ident);
boost::property_tree::ptree pt;
if (!i2p::fs::Exists(path)) if (!record.IsValid())
{ {
LogPrint(eLogWarning, "Profiling: no profile yet for ", ident); LogPrint(eLogWarning, "Profiling: no profile yet for ", identHash.ToBase64());
return; return;
} }
std::istringstream ss(std::string(record.data.get(), record.data.get() + record.len));
boost::property_tree::ptree pt;
try try
{ {
boost::property_tree::read_ini (path, pt); boost::property_tree::read_ini (ss, pt);
} catch (std::exception& ex) } catch (std::exception& ex)
{ {
/* boost exception verbose enough */ /* boost exception verbose enough */
@ -96,7 +100,7 @@ namespace data
} }
catch (boost::property_tree::ptree_bad_path& ex) catch (boost::property_tree::ptree_bad_path& ex)
{ {
LogPrint (eLogWarning, "Profiling: Missing section ", PEER_PROFILE_SECTION_PARTICIPATION, " in profile for ", ident); LogPrint (eLogWarning, "Profiling: Missing section ", PEER_PROFILE_SECTION_PARTICIPATION, " in profile for ", identHash.ToBase64());
} }
try try
{ {
@ -107,7 +111,7 @@ namespace data
} }
catch (boost::property_tree::ptree_bad_path& ex) catch (boost::property_tree::ptree_bad_path& ex)
{ {
LogPrint (eLogWarning, "Missing section ", PEER_PROFILE_SECTION_USAGE, " in profile for ", ident); LogPrint (eLogWarning, "Missing section ", PEER_PROFILE_SECTION_USAGE, " in profile for ", identHash.ToBase64());
} }
} }
else else
@ -115,7 +119,7 @@ namespace data
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
LogPrint (eLogError, "Profiling: Can't read profile ", ident, " :", ex.what ()); LogPrint (eLogError, "Profiling: Can't read profile ", identHash.ToBase64(), " :", ex.what ());
} }
} }
@ -169,27 +173,51 @@ namespace data
void InitProfilesStorage () void InitProfilesStorage ()
{ {
m_ProfilesStorage.SetPlace(i2p::fs::GetDataDir()); m_ProfilesStorage.reset(new FsIdentStorage("peerProfiles", "p", "profile-", "txt"));
m_ProfilesStorage.Init(i2p::data::GetBase64SubstitutionTable(), 64); m_ProfilesStorage->Init();
} }
void DeleteObsoleteProfiles () void DeleteObsoleteProfiles ()
{ {
struct stat st; boost::posix_time::ptime now = boost::posix_time::from_time_t(std::time(nullptr));
std::time_t now = std::time(nullptr); m_ProfilesStorage->Iterate([now](const IdentHash &ident, const StorageRecord &record) {
std::istringstream ss(std::string(record.data.get(), record.data.get() + record.len));
boost::property_tree::ptree pt;
try
{
boost::property_tree::read_ini (ss, pt);
} catch (std::exception& ex)
{
/* boost exception verbose enough */
LogPrint (eLogError, "Deleting obsolete profile: ", ex.what ());
return;
}
std::vector<std::string> files; auto t = pt.get (PEER_PROFILE_LAST_UPDATE_TIME, "");
m_ProfilesStorage.Traverse(files); boost::posix_time::ptime lastUpdate = now;
for (const auto& path: files) { if (t.length () > 0)
if (stat(path.c_str(), &st) != 0) { lastUpdate = boost::posix_time::time_from_string (t);
LogPrint(eLogWarning, "Profiling: Can't stat(): ", path); if((now - lastUpdate).total_seconds()/ 3600 >= PEER_PROFILE_EXPIRATION_TIMEOUT) {
continue; LogPrint(eLogDebug, "Profiling: removing expired peer profile: ", ident.ToBase64());
m_ProfilesStorage->Remove(ident);
} }
if (((now - st.st_mtime) / 3600) >= PEER_PROFILE_EXPIRATION_TIMEOUT) { });
LogPrint(eLogDebug, "Profiling: removing expired peer profile: ", path);
i2p::fs::Remove(path);
} }
bool BeginProfilesStorageUpdate()
{
return m_ProfilesStorage->BeginUpdate();
} }
bool EndProfilesStorageUpdate()
{
return m_ProfilesStorage->EndUpdate();
}
void DeInitProfilesStorage()
{
m_ProfilesStorage->DeInit();
} }
} }
} }

View file

@ -4,6 +4,7 @@
#include <memory> #include <memory>
#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/posix_time/posix_time.hpp>
#include "Identity.h" #include "Identity.h"
#include "Storage.h"
namespace i2p namespace i2p
{ {
@ -61,6 +62,9 @@ namespace data
std::shared_ptr<RouterProfile> GetRouterProfile (const IdentHash& identHash); std::shared_ptr<RouterProfile> GetRouterProfile (const IdentHash& identHash);
void InitProfilesStorage (); void InitProfilesStorage ();
void DeleteObsoleteProfiles (); void DeleteObsoleteProfiles ();
bool BeginProfilesStorageUpdate();
bool EndProfilesStorageUpdate();
void DeInitProfilesStorage();
} }
} }