diff --git a/libi2pd/NetDb.cpp b/libi2pd/NetDb.cpp index 18b6694b..29f5d6ef 100644 --- a/libi2pd/NetDb.cpp +++ b/libi2pd/NetDb.cpp @@ -50,7 +50,6 @@ namespace data { m_Storage.SetPlace(i2p::fs::GetDataDir()); m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64); - InitProfilesStorage (); m_Families.LoadCertificates (); Load (); @@ -75,6 +74,9 @@ namespace data m_Floodfills.Insert (i2p::context.GetSharedRouterInfo ()); i2p::config::GetOption("persist.profiles", m_PersistProfiles); + if (m_PersistProfiles) { + LoadProfilesDB (); + } m_IsRunning = true; m_Thread = new std::thread (std::bind (&NetDb::Run, this)); @@ -84,9 +86,11 @@ namespace data { if (m_IsRunning) { - if (m_PersistProfiles) - SaveProfiles (); - DeleteObsoleteProfiles (); + if (m_PersistProfiles) { + PruneExpiredProfiles (); + SaveProfilesDB (); + } + ClearProfilesDB (); m_RouterInfos.clear (); m_Floodfills.Clear (); if (m_Thread) @@ -175,8 +179,8 @@ namespace data if (ts - lastProfilesCleanup >= (uint64_t)(i2p::data::PEER_PROFILE_AUTOCLEAN_TIMEOUT + profilesCleanupVariance) || ts + i2p::data::PEER_PROFILE_AUTOCLEAN_TIMEOUT < lastProfilesCleanup) { - if (m_PersistProfiles) PersistProfiles (); - DeleteObsoleteProfiles (); + PruneExpiredProfiles (); + if (m_PersistProfiles) SaveProfilesDB (); lastProfilesCleanup = ts; profilesCleanupVariance = (rand () % (2 * i2p::data::PEER_PROFILE_AUTOCLEAN_VARIANCE) - i2p::data::PEER_PROFILE_AUTOCLEAN_VARIANCE); } diff --git a/libi2pd/Profiling.cpp b/libi2pd/Profiling.cpp index 754a2ce3..8af5de94 100644 --- a/libi2pd/Profiling.cpp +++ b/libi2pd/Profiling.cpp @@ -11,7 +11,7 @@ #include #include #include -#include +#include #include "Base.h" #include "FS.h" #include "Log.h" @@ -22,161 +22,121 @@ namespace i2p { namespace data { - static i2p::fs::HashedStorage g_ProfilesStorage("peerProfiles", "p", "profile-", "txt"); static std::unordered_map > g_Profiles; static std::mutex g_ProfilesMutex; - static boost::posix_time::ptime GetTime () + static uint64_t GetTime () { - return boost::posix_time::second_clock::local_time(); + return i2p::util::GetSecondsSinceEpoch (); } - + RouterProfile::RouterProfile (): - m_LastUpdateTime (GetTime ()), m_IsUpdated (false), - m_LastDeclineTime (0), m_LastUnreachableTime (0), + m_LastUpdateTime (0), m_LastDeclineTime (0), m_LastUnreachableTime (0), m_NumTunnelsAgreed (0), m_NumTunnelsDeclined (0), m_NumTunnelsNonReplied (0), m_NumTimesTaken (0), m_NumTimesRejected (0), m_HasConnected (false) { } - void RouterProfile::UpdateTime () + std::string RouterProfile::Dump (const std::string& peerid) { - m_LastUpdateTime = GetTime (); - m_IsUpdated = true; - } - - void RouterProfile::Save (const IdentHash& identHash) - { - // fill sections - boost::property_tree::ptree participation; - participation.put (PEER_PROFILE_PARTICIPATION_AGREED, m_NumTunnelsAgreed); - participation.put (PEER_PROFILE_PARTICIPATION_DECLINED, m_NumTunnelsDeclined); - participation.put (PEER_PROFILE_PARTICIPATION_NON_REPLIED, m_NumTunnelsNonReplied); - boost::property_tree::ptree usage; - usage.put (PEER_PROFILE_USAGE_TAKEN, m_NumTimesTaken); - usage.put (PEER_PROFILE_USAGE_REJECTED, m_NumTimesRejected); - usage.put (PEER_PROFILE_USAGE_CONNECTED, m_HasConnected); - // fill property tree boost::property_tree::ptree pt; - pt.put (PEER_PROFILE_LAST_UPDATE_TIME, boost::posix_time::to_simple_string (m_LastUpdateTime)); - if (m_LastUnreachableTime) - pt.put (PEER_PROFILE_LAST_UNREACHABLE_TIME, m_LastUnreachableTime); - pt.put_child (PEER_PROFILE_SECTION_PARTICIPATION, participation); - pt.put_child (PEER_PROFILE_SECTION_USAGE, usage); + std::stringstream ss; - // save to file - std::string ident = identHash.ToBase64 (); - std::string path = g_ProfilesStorage.Path(ident); + pt.put(PEER_PROFILE_PEER_ID, peerid); + /* "times" hash */ + pt.put(PEER_PROFILE_LAST_UPDATE_TIME, m_LastUpdateTime); + pt.put(PEER_PROFILE_LAST_DECLINE_TIME, m_LastDeclineTime); + pt.put(PEER_PROFILE_LAST_UNREACHABLE_TIME, m_LastUnreachableTime); + /* "tunnels" hash */ + pt.put(PEER_PROFILE_PARTICIPATION_AGREED, m_NumTunnelsAgreed); + pt.put(PEER_PROFILE_PARTICIPATION_DECLINED, m_NumTunnelsDeclined); + pt.put(PEER_PROFILE_PARTICIPATION_NON_REPLIED, m_NumTunnelsNonReplied); + /* "usage" hash */ + pt.put(PEER_PROFILE_USAGE_TAKEN, m_NumTimesTaken); + pt.put(PEER_PROFILE_USAGE_REJECTED, m_NumTimesRejected); + pt.put(PEER_PROFILE_USAGE_CONNECTED, m_HasConnected); try { - boost::property_tree::write_ini (path, pt); + /* convert ptree to single line json string */ + boost::property_tree::write_json (ss, pt, false); } catch (std::exception& ex) { /* boost exception verbose enough */ - LogPrint (eLogError, "Profiling: ", ex.what ()); + LogPrint (eLogError, "Profiling: can't serialize data to json -- ", ex.what ()); } + return ss.str(); } - void RouterProfile::Load (const IdentHash& identHash) + std::string RouterProfile::Load (const std::string& jsondata) { - std::string ident = identHash.ToBase64 (); - std::string path = g_ProfilesStorage.Path(ident); boost::property_tree::ptree pt; + std::stringstream ss(jsondata); + std::string peerid = ""; - if (!i2p::fs::Exists(path)) - { - LogPrint(eLogWarning, "Profiling: No profile yet for ", ident); - return; - } - - try - { - boost::property_tree::read_ini (path, pt); - } catch (std::exception& ex) - { + try { + boost::property_tree::read_json (ss, pt); + } catch (std::exception& ex) { /* boost exception verbose enough */ - LogPrint (eLogError, "Profiling: ", ex.what ()); - return; + LogPrint (eLogError, "Profiling: can't parse json data -- ", ex.what ()); + return std::string(""); } - try - { - auto t = pt.get (PEER_PROFILE_LAST_UPDATE_TIME, ""); - if (t.length () > 0) - m_LastUpdateTime = boost::posix_time::time_from_string (t); - if ((GetTime () - m_LastUpdateTime).hours () < PEER_PROFILE_EXPIRATION_TIMEOUT) - { - m_LastUnreachableTime = pt.get (PEER_PROFILE_LAST_UNREACHABLE_TIME, 0); - try - { - // read participations - auto participations = pt.get_child (PEER_PROFILE_SECTION_PARTICIPATION); - m_NumTunnelsAgreed = participations.get (PEER_PROFILE_PARTICIPATION_AGREED, 0); - m_NumTunnelsDeclined = participations.get (PEER_PROFILE_PARTICIPATION_DECLINED, 0); - m_NumTunnelsNonReplied = participations.get (PEER_PROFILE_PARTICIPATION_NON_REPLIED, 0); - } - catch (boost::property_tree::ptree_bad_path& ex) - { - LogPrint (eLogWarning, "Profiling: Missing section ", PEER_PROFILE_SECTION_PARTICIPATION, " in profile for ", ident); - } - try - { - // read usage - auto usage = pt.get_child (PEER_PROFILE_SECTION_USAGE); - m_NumTimesTaken = usage.get (PEER_PROFILE_USAGE_TAKEN, 0); - m_NumTimesRejected = usage.get (PEER_PROFILE_USAGE_REJECTED, 0); - m_HasConnected = usage.get (PEER_PROFILE_USAGE_CONNECTED, false); - } - catch (boost::property_tree::ptree_bad_path& ex) - { - LogPrint (eLogWarning, "Profiling: Missing section ", PEER_PROFILE_SECTION_USAGE, " in profile for ", ident); - } - } - else - *this = RouterProfile (); + try { + peerid = pt.get(PEER_PROFILE_PEER_ID); + } catch (std::exception& ex) { + LogPrint (eLogError, "Profiling: Can't read profile data: missing peerid"); + return std::string(""); } - catch (std::exception& ex) - { - LogPrint (eLogError, "Profiling: Can't read profile ", ident, " :", ex.what ()); + try { + /* "lasttime" hash */ + m_LastUpdateTime = pt.get(PEER_PROFILE_LAST_UPDATE_TIME, 0); + m_LastDeclineTime = pt.get(PEER_PROFILE_LAST_DECLINE_TIME, 0); + m_LastUnreachableTime = pt.get(PEER_PROFILE_LAST_UNREACHABLE_TIME, 0); + /* "tunnels" hash */ + m_NumTunnelsAgreed = pt.get(PEER_PROFILE_PARTICIPATION_AGREED, 0); + m_NumTunnelsDeclined = pt.get(PEER_PROFILE_PARTICIPATION_DECLINED, 0); + m_NumTunnelsNonReplied = pt.get(PEER_PROFILE_PARTICIPATION_NON_REPLIED, 0); + /* "usage" hash */ + m_NumTimesTaken = pt.get(PEER_PROFILE_USAGE_TAKEN, 0); + m_NumTimesRejected = pt.get(PEER_PROFILE_USAGE_REJECTED, 0); + m_HasConnected = pt.get(PEER_PROFILE_USAGE_CONNECTED, false); + } catch (boost::property_tree::ptree_bad_path& ex) { + LogPrint (eLogError, "Profiling: Can't read profile data: ", ex.what()); } + return peerid; } void RouterProfile::TunnelBuildResponse (uint8_t ret) { - UpdateTime (); - if (ret > 0) - { + if (ret > 0) { m_NumTunnelsDeclined++; - m_LastDeclineTime = i2p::util::GetSecondsSinceEpoch (); - } - else - { - m_NumTunnelsAgreed++; + m_LastDeclineTime = GetTime (); + } else { + m_NumTunnelsAgreed++; m_LastDeclineTime = 0; } + m_LastUpdateTime = GetTime (); } void RouterProfile::TunnelNonReplied () { - m_NumTunnelsNonReplied++; - UpdateTime (); + m_NumTunnelsNonReplied++; if (m_NumTunnelsNonReplied > 2*m_NumTunnelsAgreed && m_NumTunnelsNonReplied > 3) - { - m_LastDeclineTime = i2p::util::GetSecondsSinceEpoch (); - } + m_LastDeclineTime = GetTime (); + m_LastUpdateTime = GetTime (); } void RouterProfile::Unreachable () { - m_LastUnreachableTime = i2p::util::GetSecondsSinceEpoch (); - UpdateTime (); + m_LastUnreachableTime = GetTime (); + m_LastUpdateTime = GetTime (); } void RouterProfile::Connected () { m_HasConnected = true; - UpdateTime (); - } - + m_LastUpdateTime = GetTime (); + } + bool RouterProfile::IsLowPartcipationRate () const { return 4*m_NumTunnelsAgreed < m_NumTunnelsDeclined; // < 20% rate @@ -223,7 +183,7 @@ namespace data m_LastUnreachableTime = 0; return (bool)m_LastUnreachableTime; } - + bool RouterProfile::IsUseful() const { return m_NumTunnelsAgreed >= PEER_PROFILE_USEFUL_THRESHOLD || @@ -240,85 +200,102 @@ namespace data auto it = g_Profiles.find (identHash); if (it != g_Profiles.end ()) return it->second; - } + } + LogPrint(eLogDebug, "Profiling: creating new profile for ", identHash.ToBase64()); auto profile = std::make_shared (); - profile->Load (identHash); // if possible std::unique_lock l(g_ProfilesMutex); g_Profiles.emplace (identHash, profile); return profile; } - void InitProfilesStorage () - { - g_ProfilesStorage.SetPlace(i2p::fs::GetDataDir()); - g_ProfilesStorage.Init(i2p::data::GetBase64SubstitutionTable(), 64); - } - - void PersistProfiles () - { - auto ts = GetTime (); - std::list > > tmp; - { - std::unique_lock l(g_ProfilesMutex); - for (auto it = g_Profiles.begin (); it != g_Profiles.end ();) - { - if ((ts - it->second->GetLastUpdateTime ()).total_seconds () > PEER_PROFILE_PERSIST_INTERVAL) - { - if (it->second->IsUpdated ()) - tmp.push_back (std::make_pair (it->first, it->second)); - it = g_Profiles.erase (it); - } - else - it++; - } + void LoadProfilesDB () { + unsigned int loaded = 0, linenum = 0; + static std::unordered_map > new_db; + IdentHash identHash; + std::string oldDBDir = i2p::fs::DataDirPath("peerProfiles"); + auto DBPath = i2p::fs::DataDirPath(PEER_PROFILES_DB_FILENAME); + if (i2p::fs::Exists(oldDBDir)) { + std::string oldDBBak = oldDBDir + ".bak"; + LogPrint(eLogInfo, "Profiling: old peerProfiles/ directory still exists, you may safely remove it"); + std::rename(oldDBDir.c_str(), oldDBBak.c_str()); } - for (auto& it: tmp) - if (it.second) it.second->Save (it.first); - } + if (!i2p::fs::Exists(DBPath)) + return; /* no database yet */ - void SaveProfiles () - { - std::unordered_map > tmp; - { - std::unique_lock l(g_ProfilesMutex); - tmp = g_Profiles; - g_Profiles.clear (); + std::ifstream in (DBPath); + if (!in.is_open()) { + LogPrint (eLogError, "Profiling: can't open profiles database ", DBPath); + return; } - auto ts = GetTime (); - for (auto& it: tmp) - if (it.second->IsUseful() && it.second->IsUpdated () && (ts - it.second->GetLastUpdateTime ()).total_seconds () < PEER_PROFILE_EXPIRATION_TIMEOUT*3600) - it.second->Save (it.first); - } - - void DeleteObsoleteProfiles () - { - { - auto ts = GetTime (); - std::unique_lock l(g_ProfilesMutex); - for (auto it = g_Profiles.begin (); it != g_Profiles.end ();) - { - if ((ts - it->second->GetLastUpdateTime ()).total_seconds () >= PEER_PROFILE_EXPIRATION_TIMEOUT*3600) - it = g_Profiles.erase (it); - else - it++; - } - } - - struct stat st; - std::time_t now = std::time(nullptr); - std::vector files; - g_ProfilesStorage.Traverse(files); - for (const auto& path: files) { - if (stat(path.c_str(), &st) != 0) { - LogPrint(eLogWarning, "Profiling: Can't stat(): ", path); + std::string line; + while (!(in.eof() || in.fail())) { + std::getline(in, line); linenum++; + if (line.empty()) continue; + if (line[0] != '{') { + LogPrint(eLogError, "Profiling: ignore profile data at line ", linenum); continue; } - if (now - st.st_mtime >= PEER_PROFILE_EXPIRATION_TIMEOUT*3600) { - LogPrint(eLogDebug, "Profiling: Removing expired peer profile: ", path); - i2p::fs::Remove(path); + auto profile = std::make_shared (); + std::string peerid = profile->Load(line); + if (peerid.empty()) + continue; /* load failed, errors logged */ + identHash.FromBase64(peerid); + new_db.emplace(identHash, profile); + loaded++; + } + LogPrint (eLogInfo, "Profiling: loaded ", loaded, " profiles"); + + { /* replace exiting database with just loaded */ + std::unique_lock l(g_ProfilesMutex); + g_Profiles.clear (); + g_Profiles = new_db; + } + return; + } + + void PruneExpiredProfiles () { + unsigned int pruned = 0; + auto ts = GetTime (); + std::unique_lock l(g_ProfilesMutex); + for (auto it = g_Profiles.begin (); it != g_Profiles.end (); ) { + if ((ts - it->second->GetLastUpdateTime ()) >= PEER_PROFILE_EXPIRATION_TIMEOUT * 3600) { + it = g_Profiles.erase (it); + pruned++; + } else { + it++; } } + LogPrint(eLogInfo, "Profiling: pruned ", pruned, " expired peer profiles, ", g_Profiles.size(), " remains"); + } + + void SaveProfilesDB () { + unsigned int saved = 0; + auto DBPath = i2p::fs::DataDirPath(PEER_PROFILES_DB_FILENAME); + auto DBPathNew = DBPath + ".new"; + std::ofstream out (DBPathNew); + std::unique_lock l(g_ProfilesMutex); + if (!out.is_open()) { + LogPrint(eLogError, "Profiling: can't open database file ", DBPathNew); + return; + } + auto ts = GetTime (); + /* save "old enough" profiles */ + for (auto& it : g_Profiles) { + if (it.second->IsUseful() && (ts - it.second->GetLastUpdateTime ()) < PEER_PROFILE_PERSIST_INTERVAL) + continue; /* too new */ + out << it.second->Dump(it.first.ToBase64()); + saved++; + } + out.flush(); + out.close(); + LogPrint(eLogDebug, "Profiling: db path is", DBPath); + std::rename(DBPathNew.c_str(), DBPath.c_str()); + LogPrint(eLogInfo, "Profiling: saved ", saved, " peer profiles"); + } + + void ClearProfilesDB () { + g_Profiles.clear(); } } } diff --git a/libi2pd/Profiling.h b/libi2pd/Profiling.h index 4dabffbd..77d2b45d 100644 --- a/libi2pd/Profiling.h +++ b/libi2pd/Profiling.h @@ -16,19 +16,26 @@ namespace i2p { namespace data { - // sections - const char PEER_PROFILE_SECTION_PARTICIPATION[] = "participation"; - const char PEER_PROFILE_SECTION_USAGE[] = "usage"; + const char PEER_PROFILES_DB_FILENAME[] = "peerProfiles.dat"; + /** example json peer profile (pretty-printed): + { + "peerid": "", + "lasttime": { "update": 123456789, "decline": 123456789, "unreachable": 123456789 }, + "tunnels": { "agreed": 17. "declined": 4, "noreply: 2 }, + "usage": { "taken": 10, "rejected": 3 } + } */ // params - const char PEER_PROFILE_LAST_UPDATE_TIME[] = "lastupdatetime"; - const char PEER_PROFILE_LAST_UNREACHABLE_TIME[] = "lastunreachabletime"; - const char PEER_PROFILE_PARTICIPATION_AGREED[] = "agreed"; - const char PEER_PROFILE_PARTICIPATION_DECLINED[] = "declined"; - const char PEER_PROFILE_PARTICIPATION_NON_REPLIED[] = "nonreplied"; - const char PEER_PROFILE_USAGE_TAKEN[] = "taken"; - const char PEER_PROFILE_USAGE_REJECTED[] = "rejected"; - const char PEER_PROFILE_USAGE_CONNECTED[] = "connected"; - + const char PEER_PROFILE_PEER_ID[] = "peerid"; + const char PEER_PROFILE_LAST_UPDATE_TIME[] = "lasttime.update"; + const char PEER_PROFILE_LAST_DECLINE_TIME[] = "lasttime.decline"; + const char PEER_PROFILE_LAST_UNREACHABLE_TIME[] = "lasttime.unreachable"; + const char PEER_PROFILE_PARTICIPATION_AGREED[] = "tunnels.agreed"; + const char PEER_PROFILE_PARTICIPATION_DECLINED[] = "tunnels.declined"; + const char PEER_PROFILE_PARTICIPATION_NON_REPLIED[] = "tunnels.noreply"; + const char PEER_PROFILE_USAGE_TAKEN[] = "usage.taken"; + const char PEER_PROFILE_USAGE_REJECTED[] = "usage.rejected"; + const char PEER_PROFILE_USAGE_CONNECTED[] = "usage.connected"; + const int PEER_PROFILE_EXPIRATION_TIMEOUT = 36; // in hours (1.5 days) const int PEER_PROFILE_AUTOCLEAN_TIMEOUT = 6 * 3600; // in seconds (6 hours) const int PEER_PROFILE_AUTOCLEAN_VARIANCE = 3600; // in seconds (1 hour) @@ -44,11 +51,12 @@ namespace data RouterProfile (); RouterProfile& operator= (const RouterProfile& ) = default; - void Save (const IdentHash& identHash); - void Load (const IdentHash& identHash); + std::string Dump (const std::string& peerid); + std::string Load (const std::string& jsondata); bool IsBad (); bool IsUnreachable (); + bool IsUseful() const; bool IsReal () const { return m_HasConnected || m_NumTunnelsAgreed > 0 || m_NumTunnelsDeclined > 0; } void TunnelBuildResponse (uint8_t ret); @@ -57,15 +65,10 @@ namespace data void Unreachable (); void Connected (); - boost::posix_time::ptime GetLastUpdateTime () const { return m_LastUpdateTime; }; - bool IsUpdated () const { return m_IsUpdated; }; - - bool IsUseful() const; + uint64_t GetLastUpdateTime () const { return m_LastUpdateTime; }; private: - void UpdateTime (); - bool IsAlwaysDeclining () const { return !m_NumTunnelsAgreed && m_NumTunnelsDeclined >= 5; }; bool IsLowPartcipationRate () const; bool IsLowReplyRate () const; @@ -73,9 +76,10 @@ namespace data private: - boost::posix_time::ptime m_LastUpdateTime; // TODO: use std::chrono - bool m_IsUpdated; - uint64_t m_LastDeclineTime, m_LastUnreachableTime; // in seconds + // lasttime + uint64_t m_LastUpdateTime; + uint64_t m_LastDeclineTime; + uint64_t m_LastUnreachableTime; // participation uint32_t m_NumTunnelsAgreed; uint32_t m_NumTunnelsDeclined; @@ -87,10 +91,14 @@ namespace data }; std::shared_ptr GetRouterProfile (const IdentHash& identHash); - void InitProfilesStorage (); - void DeleteObsoleteProfiles (); - void SaveProfiles (); - void PersistProfiles (); + + /** database file operations */ + void LoadProfilesDB (); /*< read saved peer profiles from file to memory */ + void SaveProfilesDB (); /*< serialize and write to file known peer profiles */ + + /** memory database operations */ + void PruneExpiredProfiles (); /*< discard peer profiles inactive for long time */ + void ClearProfilesDB (); /*< discard ALL known peer profiles */ } }