diff --git a/libi2pd/NetDb.cpp b/libi2pd/NetDb.cpp index 48c0a690..eb029f59 100644 --- a/libi2pd/NetDb.cpp +++ b/libi2pd/NetDb.cpp @@ -119,8 +119,9 @@ namespace data i2p::util::SetThreadName("NetDB"); uint64_t lastManage = 0; - uint64_t lastProfilesCleanup = i2p::util::GetMonotonicMilliseconds (), lastObsoleteProfilesCleanup = lastProfilesCleanup; - int16_t profilesCleanupVariance = 0, obsoleteProfilesCleanVariance = 0; + uint64_t lastProfilesCleanup = i2p::util::GetMonotonicMilliseconds (), + lastObsoleteProfilesCleanup = lastProfilesCleanup, lastApplingProfileUpdates = lastProfilesCleanup; + int16_t profilesCleanupVariance = 0, obsoleteProfilesCleanVariance = 0, applingProfileUpdatesVariance = 0; std::list > msgs; while (m_IsRunning) @@ -199,6 +200,19 @@ namespace data lastObsoleteProfilesCleanup = mts; obsoleteProfilesCleanVariance = rand () % i2p::data::PEER_PROFILE_OBSOLETE_PROFILES_CLEAN_VARIANCE; } + if (mts >= lastApplingProfileUpdates + i2p::data::PEER_PROFILE_APPLY_POSTPONED_TIMEOUT + applingProfileUpdatesVariance) + { + bool isAppling = m_ApplingProfileUpdates.valid (); + if (isAppling && m_ApplingProfileUpdates.wait_for(std::chrono::seconds(0)) == std::future_status::ready) // still active? + { + m_ApplingProfileUpdates.get (); + isAppling = false; + } + if (!isAppling) + m_ApplingProfileUpdates = i2p::data::FlushPostponedRouterProfileUpdates (); + lastApplingProfileUpdates = mts; + applingProfileUpdatesVariance = rand () % i2p::data::PEER_PROFILE_APPLY_POSTPONED_TIMEOUT_VARIANCE; + } } catch (std::exception& ex) { diff --git a/libi2pd/NetDb.hpp b/libi2pd/NetDb.hpp index 9d8b875a..700941b8 100644 --- a/libi2pd/NetDb.hpp +++ b/libi2pd/NetDb.hpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2024, The PurpleI2P Project +* Copyright (c) 2013-2025, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -185,7 +185,7 @@ namespace data std::shared_ptr m_Requests; bool m_PersistProfiles; - std::future m_SavingProfiles, m_DeletingProfiles, m_PersistingRouters; + std::future m_SavingProfiles, m_DeletingProfiles, m_ApplingProfileUpdates, m_PersistingRouters; std::vector > m_ExploratorySelection; uint64_t m_LastExploratorySelectionUpdateTime; // in monotonic seconds diff --git a/libi2pd/Profiling.cpp b/libi2pd/Profiling.cpp index 59d60c6d..a48f2112 100644 --- a/libi2pd/Profiling.cpp +++ b/libi2pd/Profiling.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2024, The PurpleI2P Project +* Copyright (c) 2013-2025, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -27,7 +27,9 @@ namespace data static i2p::fs::HashedStorage g_ProfilesStorage("peerProfiles", "p", "profile-", "txt"); static std::unordered_map > g_Profiles; static std::mutex g_ProfilesMutex; - + static std::list)> > > g_PostponedUpdates; + static std::mutex g_PostponedUpdatesMutex; + RouterProfile::RouterProfile (): m_IsUpdated (false), m_LastDeclineTime (0), m_LastUnreachableTime (0), m_LastUpdateTime (i2p::util::GetSecondsSinceEpoch ()), @@ -259,14 +261,14 @@ namespace data } auto profile = netdb.NewRouterProfile (); profile->Load (identHash); // if possible - std::unique_lock l(g_ProfilesMutex); + std::lock_guard l(g_ProfilesMutex); g_Profiles.emplace (identHash, profile); return profile; } bool IsRouterBanned (const IdentHash& identHash) { - std::unique_lock l(g_ProfilesMutex); + std::lock_guard l(g_ProfilesMutex); auto it = g_Profiles.find (identHash); if (it != g_Profiles.end ()) return it->second->IsUnreachable (); @@ -278,7 +280,7 @@ namespace data g_ProfilesStorage.SetPlace(i2p::fs::GetDataDir()); g_ProfilesStorage.Init(i2p::data::GetBase64SubstitutionTable(), 64); } - + static void SaveProfilesToDisk (std::list > >&& profiles) { for (auto& it: profiles) @@ -290,7 +292,7 @@ namespace data auto ts = i2p::util::GetSecondsSinceEpoch (); std::list > > tmp; { - std::unique_lock l(g_ProfilesMutex); + std::lock_guard l(g_ProfilesMutex); for (auto it = g_Profiles.begin (); it != g_Profiles.end ();) { if (ts - it->second->GetLastUpdateTime () > PEER_PROFILE_PERSIST_INTERVAL) @@ -312,7 +314,7 @@ namespace data { std::unordered_map > tmp; { - std::unique_lock l(g_ProfilesMutex); + std::lock_guard l(g_ProfilesMutex); std::swap (tmp, g_Profiles); } auto ts = i2p::util::GetSecondsSinceEpoch (); @@ -347,7 +349,7 @@ namespace data { { auto ts = i2p::util::GetSecondsSinceEpoch (); - std::unique_lock l(g_ProfilesMutex); + std::lock_guard l(g_ProfilesMutex); for (auto it = g_Profiles.begin (); it != g_Profiles.end ();) { if (ts - it->second->GetLastUpdateTime () >= PEER_PROFILE_EXPIRATION_TIMEOUT) @@ -359,5 +361,47 @@ namespace data return std::async (std::launch::async, DeleteFilesFromDisk); } + + bool UpdateRouterProfile (const IdentHash& identHash, std::function)> update) + { + if (!update) return true; + std::shared_ptr profile; + { + std::lock_guard l(g_ProfilesMutex); + auto it = g_Profiles.find (identHash); + if (it != g_Profiles.end ()) + profile = it->second; + } + if (profile) + { + update (profile); + return true; + } + // postpone + std::lock_guard l(g_PostponedUpdatesMutex); + g_PostponedUpdates.emplace_back (identHash, update); + return false; + } + + static void ApplyPostponedUpdates (std::list)> > >&& updates) + { + for (const auto& [ident, update] : updates) + { + auto profile = GetRouterProfile (ident); + update (profile); + } + } + + std::future FlushPostponedRouterProfileUpdates () + { + if (g_PostponedUpdates.empty ()) return std::future(); + + std::list)> > > updates; + { + std::lock_guard l(g_PostponedUpdatesMutex); + g_PostponedUpdates.swap (updates); + } + return std::async (std::launch::async, ApplyPostponedUpdates, std::move (updates)); + } } } diff --git a/libi2pd/Profiling.h b/libi2pd/Profiling.h index 5d85cec3..998f9d19 100644 --- a/libi2pd/Profiling.h +++ b/libi2pd/Profiling.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2024, The PurpleI2P Project +* Copyright (c) 2013-2025, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -11,6 +11,7 @@ #include #include +#include #include #include "Identity.h" @@ -44,6 +45,8 @@ namespace data const int PEER_PROFILE_UNREACHABLE_INTERVAL = 480; // in seconds (8 minutes) const int PEER_PROFILE_USEFUL_THRESHOLD = 3; const int PEER_PROFILE_ALWAYS_DECLINING_NUM = 5; // num declines in row to consider always declined + const int PEER_PROFILE_APPLY_POSTPONED_TIMEOUT = 2100; // in milliseconds + const int PEER_PROFILE_APPLY_POSTPONED_TIMEOUT_VARIANCE = 500; // in milliseconds class RouterProfile { @@ -108,6 +111,8 @@ namespace data std::future DeleteObsoleteProfiles (); void SaveProfiles (); std::future PersistProfiles (); + bool UpdateRouterProfile (const IdentHash& identHash, std::function)> update); // return true if updated immediately, and false if postponed + std::future FlushPostponedRouterProfileUpdates (); } } diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 15147985..fd877a9b 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -1206,7 +1206,16 @@ namespace transport return false; } if (!m_Address->published) - ri->GetProfile ()->SetLastEndpoint (m_RemoteEndpoint); + { + if (ri->HasProfile ()) + ri->GetProfile ()->SetLastEndpoint (m_RemoteEndpoint); + else + i2p::data::UpdateRouterProfile (ri->GetIdentHash (), + [ep = m_RemoteEndpoint](std::shared_ptr profile) + { + if (profile) profile->SetLastEndpoint (ep); + }); + } SetRemoteIdentity (ri->GetRouterIdentity ()); AdjustMaxPayloadSize (); m_Server.AddSessionByRouterHash (shared_from_this ()); // we know remote router now