diff --git a/libi2pd/NetDb.cpp b/libi2pd/NetDb.cpp index b136dfd5..0cd74bd4 100644 --- a/libi2pd/NetDb.cpp +++ b/libi2pd/NetDb.cpp @@ -25,7 +25,7 @@ namespace data { NetDb netdb; - NetDb::NetDb (): m_IsRunning (false), m_Thread (nullptr), m_Reseeder (nullptr), m_Storage("netDb", "r", "routerInfo-", "dat"), m_FloodfillBootstrap(nullptr), m_HiddenMode(false) + NetDb::NetDb (): m_IsRunning (false), m_Thread (nullptr), m_Reseeder (nullptr), m_Storage(nullptr), m_FloodfillBootstrap(nullptr), m_HiddenMode(false) { } @@ -37,8 +37,8 @@ namespace data void NetDb::Start () { - m_Storage.SetPlace(i2p::fs::GetDataDir()); - m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64); + m_Storage.reset(new FsIdentStorage("netDb", "r", "routerInfo-", "dat")); + m_Storage->Init(); InitProfilesStorage (); m_Families.LoadCertificates (); Load (); @@ -55,9 +55,15 @@ namespace data { if (m_IsRunning) { - for (auto& it: m_RouterInfos) - it.second->SaveProfile (); - DeleteObsoleteProfiles (); + if(BeginProfilesStorageUpdate()) //TODO Error handling + { + for (auto& it: m_RouterInfos) + it.second->SaveProfile (); + DeleteObsoleteProfiles(); + EndProfilesStorageUpdate(); + } + m_Storage->DeInit(); + DeInitProfilesStorage(); m_RouterInfos.clear (); m_Floodfills.clear (); if (m_Thread) @@ -360,9 +366,9 @@ namespace data i2p::transport::transports.SendMessages(ih, requests); } - bool NetDb::LoadRouterInfo (const std::string & path) + bool NetDb::LoadRouterInfo (const char *buffer, size_t len) { - auto r = std::make_shared(path); + auto r = std::make_shared((uint8_t *)buffer, len, false); if (r->GetRouterIdentity () && !r->IsUnreachable () && (!r->UsesIntroducer () || m_LastLoad < r->GetTimestamp () + NETDB_INTRODUCEE_EXPIRATION_TIMEOUT*1000LL)) // 1 hour { @@ -374,8 +380,7 @@ namespace data } else { - LogPrint(eLogWarning, "NetDb: RI from ", path, " is invalid. Delete"); - i2p::fs::Remove(path); + LogPrint(eLogWarning, "NetDb: RI from ", r->GetIdentHash(), " is invalid."); } return true; } @@ -387,14 +392,6 @@ namespace data v(entry.first, entry.second); } - void NetDb::VisitStoredRouterInfos(RouterInfoVisitor v) - { - m_Storage.Iterate([v] (const std::string & filename) { - auto ri = std::make_shared(filename); - v(ri); - }); - } - void NetDb::VisitRouterInfos(RouterInfoVisitor v) { std::unique_lock lock(m_RouterInfosMutex); @@ -455,10 +452,16 @@ namespace data m_Floodfills.clear (); m_LastLoad = i2p::util::GetSecondsSinceEpoch(); - std::vector files; - m_Storage.Traverse(files); - for (const auto& path : files) - LoadRouterInfo(path); + m_Storage->BeginUpdate(); + m_Storage->Iterate([&](const IdentHash &ident, const StorageRecord &record) + { + if (!LoadRouterInfo(record.data.get(), record.len)) + { + if (m_Storage->Remove(ident)) + LogPrint(eLogInfo, "Deleting invalid RI from ", ident.ToBase64()); + } + }); + m_Storage->EndUpdate(); LogPrint (eLogInfo, "NetDb: ", m_RouterInfos.size(), " routers loaded (", m_Floodfills.size (), " floodfils)"); } @@ -475,13 +478,18 @@ namespace data expirationTimeout = i2p::context.IsFloodfill () ? NETDB_FLOODFILL_EXPIRATION_TIMEOUT*1000LL : NETDB_MIN_EXPIRATION_TIMEOUT*1000LL + (NETDB_MAX_EXPIRATION_TIMEOUT - NETDB_MIN_EXPIRATION_TIMEOUT)*1000LL*NETDB_MIN_ROUTERS/total; + m_Storage->BeginUpdate(); for (auto& it: m_RouterInfos) { - std::string ident = it.second->GetIdentHashBase64(); - std::string path = m_Storage.Path(ident); + IdentHash ident = it.second->GetIdentHash(); if (it.second->IsUpdated ()) { - it.second->SaveToFile (path); + if (it.second->GetBuffer()) { + StorageRecord record((char *)it.second->GetBuffer(), it.second->GetBufferLen()); + m_Storage->Store(ident, record); + } else { + LogPrint (eLogError, "RouterInfo: Can't save, m_Buffer == NULL, Bleat!", it.second->GetIdentHashBase64()); + } it.second->SetUpdated (false); it.second->SetUnreachable (false); it.second->DeleteBuffer (); @@ -501,17 +509,19 @@ namespace data if (it.second->IsUnreachable ()) { // delete RI file - m_Storage.Remove(ident); + m_Storage->Remove(ident); deletedCount++; if (total - deletedCount < NETDB_MIN_ROUTERS) checkForExpiration = false; } } // m_RouterInfos iteration + m_Storage->EndUpdate(); if (updatedCount > 0) LogPrint (eLogInfo, "NetDb: saved ", updatedCount, " new/updated routers"); if (deletedCount > 0) { LogPrint (eLogInfo, "NetDb: deleting ", deletedCount, " unreachable routers"); + BeginProfilesStorageUpdate(); //TODO: Error handling // clean up RouterInfos table { std::unique_lock l(m_RouterInfosMutex); @@ -526,6 +536,7 @@ namespace data ++it; } } + EndProfilesStorageUpdate(); //TODO: Error handling // clean up expired floodfiils { std::unique_lock l(m_FloodfillsMutex); @@ -829,7 +840,11 @@ namespace data if (router) { LogPrint (eLogDebug, "NetDb: requested RouterInfo ", key, " found"); - router->LoadBuffer (); + if (!router->GetBuffer()) + { + StorageRecord record = m_Storage->Fetch(router->GetIdentHash()); + router->LoadBuffer((const uint8_t*)record.data.get(), record.len); + } if (router->GetBuffer ()) replyMsg = CreateDatabaseStoreMsg (router); } diff --git a/libi2pd/NetDb.hpp b/libi2pd/NetDb.hpp index 18377b4f..2637602f 100644 --- a/libi2pd/NetDb.hpp +++ b/libi2pd/NetDb.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include "Base.h" @@ -19,6 +20,7 @@ #include "Tunnel.h" #include "TunnelPool.h" #include "Reseed.h" +#include "Storage.h" #include "NetDbRequests.h" #include "Family.h" @@ -93,8 +95,6 @@ namespace data /** visit all lease sets we currently store */ void VisitLeaseSets(LeaseSetVisitor v); - /** visit all router infos we have currently on disk, usually insanely expensive, does not access in memory RI */ - void VisitStoredRouterInfos(RouterInfoVisitor v); /** visit all router infos we have loaded in memory, cheaper than VisitLocalRouterInfos but locks access while visiting */ void VisitRouterInfos(RouterInfoVisitor v); /** visit N random router that match using filter, then visit them with a visitor, return number of RouterInfos that were visited */ @@ -105,7 +105,7 @@ namespace data private: void Load (); - bool LoadRouterInfo (const std::string & path); + bool LoadRouterInfo (const char *buffer, size_t len); void SaveUpdated (); void Run (); // exploratory thread void Explore (int numDestinations); @@ -135,7 +135,7 @@ namespace data GzipInflator m_Inflator; Reseeder * m_Reseeder; Families m_Families; - i2p::fs::HashedStorage m_Storage; + std::unique_ptr m_Storage; friend class NetDbRequests; NetDbRequests m_Requests; diff --git a/libi2pd/Profiling.cpp b/libi2pd/Profiling.cpp index 3840eb32..aab3a6c6 100644 --- a/libi2pd/Profiling.cpp +++ b/libi2pd/Profiling.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include "Base.h" @@ -10,7 +10,7 @@ namespace i2p { namespace data { - i2p::fs::HashedStorage m_ProfilesStorage("peerProfiles", "p", "profile-", "txt"); + std::unique_ptr m_ProfilesStorage(nullptr); RouterProfile::RouterProfile (): m_LastUpdateTime (boost::posix_time::second_clock::local_time()), @@ -45,12 +45,15 @@ namespace data pt.put_child (PEER_PROFILE_SECTION_PARTICIPATION, participation); pt.put_child (PEER_PROFILE_SECTION_USAGE, usage); - // save to file + // save to string stream std::string ident = identHash.ToBase64 (); - std::string path = m_ProfilesStorage.Path(ident); + std::ostringstream ss; try { - boost::property_tree::write_ini (path, pt); + boost::property_tree::write_ini (ss, pt); + std::string str = ss.str(); + StorageRecord record(str.c_str(), str.length()); + m_ProfilesStorage->Store(identHash, record); } catch (std::exception& ex) { /* boost exception verbose enough */ LogPrint (eLogError, "Profiling: ", ex.what ()); @@ -59,19 +62,20 @@ namespace data void RouterProfile::Load (const IdentHash& identHash) { - std::string ident = identHash.ToBase64 (); - std::string path = m_ProfilesStorage.Path(ident); - boost::property_tree::ptree pt; + StorageRecord record = m_ProfilesStorage->Fetch(identHash); - if (!i2p::fs::Exists(path)) + if (!record.IsValid()) { - LogPrint(eLogWarning, "Profiling: no profile yet for ", ident); + LogPrint(eLogWarning, "Profiling: no profile yet for ", identHash.ToBase64()); return; } + std::istringstream ss(std::string(record.data.get(), record.data.get() + record.len)); + boost::property_tree::ptree pt; + try { - boost::property_tree::read_ini (path, pt); + boost::property_tree::read_ini (ss, pt); } catch (std::exception& ex) { /* boost exception verbose enough */ @@ -96,7 +100,7 @@ namespace data } catch (boost::property_tree::ptree_bad_path& ex) { - LogPrint (eLogWarning, "Profiling: Missing section ", PEER_PROFILE_SECTION_PARTICIPATION, " in profile for ", ident); + LogPrint (eLogWarning, "Profiling: Missing section ", PEER_PROFILE_SECTION_PARTICIPATION, " in profile for ", identHash.ToBase64()); } try { @@ -107,7 +111,7 @@ namespace data } catch (boost::property_tree::ptree_bad_path& ex) { - LogPrint (eLogWarning, "Missing section ", PEER_PROFILE_SECTION_USAGE, " in profile for ", ident); + LogPrint (eLogWarning, "Missing section ", PEER_PROFILE_SECTION_USAGE, " in profile for ", identHash.ToBase64()); } } else @@ -115,7 +119,7 @@ namespace data } catch (std::exception& ex) { - LogPrint (eLogError, "Profiling: Can't read profile ", ident, " :", ex.what ()); + LogPrint (eLogError, "Profiling: Can't read profile ", identHash.ToBase64(), " :", ex.what ()); } } @@ -169,27 +173,51 @@ namespace data void InitProfilesStorage () { - m_ProfilesStorage.SetPlace(i2p::fs::GetDataDir()); - m_ProfilesStorage.Init(i2p::data::GetBase64SubstitutionTable(), 64); + m_ProfilesStorage.reset(new FsIdentStorage("peerProfiles", "p", "profile-", "txt")); + m_ProfilesStorage->Init(); + } void DeleteObsoleteProfiles () { - struct stat st; - std::time_t now = std::time(nullptr); + boost::posix_time::ptime now = boost::posix_time::from_time_t(std::time(nullptr)); + m_ProfilesStorage->Iterate([now](const IdentHash &ident, const StorageRecord &record) { + std::istringstream ss(std::string(record.data.get(), record.data.get() + record.len)); + boost::property_tree::ptree pt; + try + { + boost::property_tree::read_ini (ss, pt); + } catch (std::exception& ex) + { + /* boost exception verbose enough */ + LogPrint (eLogError, "Deleting obsolete profile: ", ex.what ()); + return; + } - std::vector files; - m_ProfilesStorage.Traverse(files); - for (const auto& path: files) { - if (stat(path.c_str(), &st) != 0) { - LogPrint(eLogWarning, "Profiling: Can't stat(): ", path); - continue; + auto t = pt.get (PEER_PROFILE_LAST_UPDATE_TIME, ""); + boost::posix_time::ptime lastUpdate = now; + if (t.length () > 0) + lastUpdate = boost::posix_time::time_from_string (t); + if((now - lastUpdate).total_seconds()/ 3600 >= PEER_PROFILE_EXPIRATION_TIMEOUT) { + LogPrint(eLogDebug, "Profiling: removing expired peer profile: ", ident.ToBase64()); + m_ProfilesStorage->Remove(ident); } - if (((now - st.st_mtime) / 3600) >= PEER_PROFILE_EXPIRATION_TIMEOUT) { - LogPrint(eLogDebug, "Profiling: removing expired peer profile: ", path); - i2p::fs::Remove(path); - } - } + }); + } + + bool BeginProfilesStorageUpdate() + { + return m_ProfilesStorage->BeginUpdate(); + } + + bool EndProfilesStorageUpdate() + { + return m_ProfilesStorage->EndUpdate(); + } + + void DeInitProfilesStorage() + { + m_ProfilesStorage->DeInit(); } } } diff --git a/libi2pd/Profiling.h b/libi2pd/Profiling.h index 4ba6702f..2a59d7a7 100644 --- a/libi2pd/Profiling.h +++ b/libi2pd/Profiling.h @@ -4,6 +4,7 @@ #include #include #include "Identity.h" +#include "Storage.h" namespace i2p { @@ -61,6 +62,9 @@ namespace data std::shared_ptr GetRouterProfile (const IdentHash& identHash); void InitProfilesStorage (); void DeleteObsoleteProfiles (); + bool BeginProfilesStorageUpdate(); + bool EndProfilesStorageUpdate(); + void DeInitProfilesStorage(); } }