postpone reading from file and updating router profile
Some checks are pending
Build Debian packages / ${{ matrix.dist }} (bookworm) (push) Waiting to run
Build Debian packages / ${{ matrix.dist }} (bullseye) (push) Waiting to run
Build Debian packages / ${{ matrix.dist }} (buster) (push) Waiting to run
Build on FreeBSD / with UPnP (push) Waiting to run
Build on OSX / With USE_UPNP=${{ matrix.with_upnp }} (no) (push) Waiting to run
Build on OSX / With USE_UPNP=${{ matrix.with_upnp }} (yes) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (clang-x86_64, x64-clang, clang, CLANG64) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (i686, x86, gcc, MINGW32) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (ucrt-x86_64, x64-ucrt, gcc, UCRT64) (push) Waiting to run
Build on Windows / ${{ matrix.arch }} (x86_64, x64, gcc, MINGW64) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (clang-x86_64, x64-clang, clang, CLANG64) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (i686, x86, gcc, MINGW32) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (ucrt-x86_64, x64-ucrt, gcc, UCRT64) (push) Waiting to run
Build on Windows / CMake ${{ matrix.arch }} (x86_64, x64, gcc, MINGW64) (push) Waiting to run
Build on Windows / XP (push) Waiting to run
Build on Ubuntu / Make with USE_UPNP=${{ matrix.with_upnp }} (no) (push) Waiting to run
Build on Ubuntu / Make with USE_UPNP=${{ matrix.with_upnp }} (yes) (push) Waiting to run
Build on Ubuntu / CMake with -DWITH_UPNP=${{ matrix.with_upnp }} (OFF) (push) Waiting to run
Build on Ubuntu / CMake with -DWITH_UPNP=${{ matrix.with_upnp }} (ON) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (amd64, linux/amd64) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (arm64, linux/arm64) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (armv7, linux/arm/v7) (push) Waiting to run
Build containers / Building container for ${{ matrix.platform }} (i386, linux/386) (push) Waiting to run
Build containers / Pushing merged manifest (push) Blocked by required conditions

This commit is contained in:
orignal 2025-01-18 18:26:16 -05:00
parent b6319d78bf
commit c600b834e3
5 changed files with 86 additions and 14 deletions

View file

@ -119,8 +119,9 @@ namespace data
i2p::util::SetThreadName("NetDB"); i2p::util::SetThreadName("NetDB");
uint64_t lastManage = 0; uint64_t lastManage = 0;
uint64_t lastProfilesCleanup = i2p::util::GetMonotonicMilliseconds (), lastObsoleteProfilesCleanup = lastProfilesCleanup; uint64_t lastProfilesCleanup = i2p::util::GetMonotonicMilliseconds (),
int16_t profilesCleanupVariance = 0, obsoleteProfilesCleanVariance = 0; lastObsoleteProfilesCleanup = lastProfilesCleanup, lastApplingProfileUpdates = lastProfilesCleanup;
int16_t profilesCleanupVariance = 0, obsoleteProfilesCleanVariance = 0, applingProfileUpdatesVariance = 0;
std::list<std::shared_ptr<const I2NPMessage> > msgs; std::list<std::shared_ptr<const I2NPMessage> > msgs;
while (m_IsRunning) while (m_IsRunning)
@ -199,6 +200,19 @@ namespace data
lastObsoleteProfilesCleanup = mts; lastObsoleteProfilesCleanup = mts;
obsoleteProfilesCleanVariance = rand () % i2p::data::PEER_PROFILE_OBSOLETE_PROFILES_CLEAN_VARIANCE; obsoleteProfilesCleanVariance = rand () % i2p::data::PEER_PROFILE_OBSOLETE_PROFILES_CLEAN_VARIANCE;
} }
if (mts >= lastApplingProfileUpdates + i2p::data::PEER_PROFILE_APPLY_POSTPONED_TIMEOUT + applingProfileUpdatesVariance)
{
bool isAppling = m_ApplingProfileUpdates.valid ();
if (isAppling && m_ApplingProfileUpdates.wait_for(std::chrono::seconds(0)) == std::future_status::ready) // still active?
{
m_ApplingProfileUpdates.get ();
isAppling = false;
}
if (!isAppling)
m_ApplingProfileUpdates = i2p::data::FlushPostponedRouterProfileUpdates ();
lastApplingProfileUpdates = mts;
applingProfileUpdatesVariance = rand () % i2p::data::PEER_PROFILE_APPLY_POSTPONED_TIMEOUT_VARIANCE;
}
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2024, The PurpleI2P Project * Copyright (c) 2013-2025, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -185,7 +185,7 @@ namespace data
std::shared_ptr<NetDbRequests> m_Requests; std::shared_ptr<NetDbRequests> m_Requests;
bool m_PersistProfiles; bool m_PersistProfiles;
std::future<void> m_SavingProfiles, m_DeletingProfiles, m_PersistingRouters; std::future<void> m_SavingProfiles, m_DeletingProfiles, m_ApplingProfileUpdates, m_PersistingRouters;
std::vector<std::shared_ptr<const RouterInfo> > m_ExploratorySelection; std::vector<std::shared_ptr<const RouterInfo> > m_ExploratorySelection;
uint64_t m_LastExploratorySelectionUpdateTime; // in monotonic seconds uint64_t m_LastExploratorySelectionUpdateTime; // in monotonic seconds

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2024, The PurpleI2P Project * Copyright (c) 2013-2025, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -27,7 +27,9 @@ namespace data
static i2p::fs::HashedStorage g_ProfilesStorage("peerProfiles", "p", "profile-", "txt"); static i2p::fs::HashedStorage g_ProfilesStorage("peerProfiles", "p", "profile-", "txt");
static std::unordered_map<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > g_Profiles; static std::unordered_map<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > g_Profiles;
static std::mutex g_ProfilesMutex; static std::mutex g_ProfilesMutex;
static std::list<std::pair<i2p::data::IdentHash, std::function<void (std::shared_ptr<RouterProfile>)> > > g_PostponedUpdates;
static std::mutex g_PostponedUpdatesMutex;
RouterProfile::RouterProfile (): RouterProfile::RouterProfile ():
m_IsUpdated (false), m_LastDeclineTime (0), m_LastUnreachableTime (0), m_IsUpdated (false), m_LastDeclineTime (0), m_LastUnreachableTime (0),
m_LastUpdateTime (i2p::util::GetSecondsSinceEpoch ()), m_LastUpdateTime (i2p::util::GetSecondsSinceEpoch ()),
@ -259,14 +261,14 @@ namespace data
} }
auto profile = netdb.NewRouterProfile (); auto profile = netdb.NewRouterProfile ();
profile->Load (identHash); // if possible profile->Load (identHash); // if possible
std::unique_lock<std::mutex> l(g_ProfilesMutex); std::lock_guard<std::mutex> l(g_ProfilesMutex);
g_Profiles.emplace (identHash, profile); g_Profiles.emplace (identHash, profile);
return profile; return profile;
} }
bool IsRouterBanned (const IdentHash& identHash) bool IsRouterBanned (const IdentHash& identHash)
{ {
std::unique_lock<std::mutex> l(g_ProfilesMutex); std::lock_guard<std::mutex> l(g_ProfilesMutex);
auto it = g_Profiles.find (identHash); auto it = g_Profiles.find (identHash);
if (it != g_Profiles.end ()) if (it != g_Profiles.end ())
return it->second->IsUnreachable (); return it->second->IsUnreachable ();
@ -278,7 +280,7 @@ namespace data
g_ProfilesStorage.SetPlace(i2p::fs::GetDataDir()); g_ProfilesStorage.SetPlace(i2p::fs::GetDataDir());
g_ProfilesStorage.Init(i2p::data::GetBase64SubstitutionTable(), 64); g_ProfilesStorage.Init(i2p::data::GetBase64SubstitutionTable(), 64);
} }
static void SaveProfilesToDisk (std::list<std::pair<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > >&& profiles) static void SaveProfilesToDisk (std::list<std::pair<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > >&& profiles)
{ {
for (auto& it: profiles) for (auto& it: profiles)
@ -290,7 +292,7 @@ namespace data
auto ts = i2p::util::GetSecondsSinceEpoch (); auto ts = i2p::util::GetSecondsSinceEpoch ();
std::list<std::pair<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > > tmp; std::list<std::pair<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > > tmp;
{ {
std::unique_lock<std::mutex> l(g_ProfilesMutex); std::lock_guard<std::mutex> l(g_ProfilesMutex);
for (auto it = g_Profiles.begin (); it != g_Profiles.end ();) for (auto it = g_Profiles.begin (); it != g_Profiles.end ();)
{ {
if (ts - it->second->GetLastUpdateTime () > PEER_PROFILE_PERSIST_INTERVAL) if (ts - it->second->GetLastUpdateTime () > PEER_PROFILE_PERSIST_INTERVAL)
@ -312,7 +314,7 @@ namespace data
{ {
std::unordered_map<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > tmp; std::unordered_map<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > tmp;
{ {
std::unique_lock<std::mutex> l(g_ProfilesMutex); std::lock_guard<std::mutex> l(g_ProfilesMutex);
std::swap (tmp, g_Profiles); std::swap (tmp, g_Profiles);
} }
auto ts = i2p::util::GetSecondsSinceEpoch (); auto ts = i2p::util::GetSecondsSinceEpoch ();
@ -347,7 +349,7 @@ namespace data
{ {
{ {
auto ts = i2p::util::GetSecondsSinceEpoch (); auto ts = i2p::util::GetSecondsSinceEpoch ();
std::unique_lock<std::mutex> l(g_ProfilesMutex); std::lock_guard<std::mutex> l(g_ProfilesMutex);
for (auto it = g_Profiles.begin (); it != g_Profiles.end ();) for (auto it = g_Profiles.begin (); it != g_Profiles.end ();)
{ {
if (ts - it->second->GetLastUpdateTime () >= PEER_PROFILE_EXPIRATION_TIMEOUT) if (ts - it->second->GetLastUpdateTime () >= PEER_PROFILE_EXPIRATION_TIMEOUT)
@ -359,5 +361,47 @@ namespace data
return std::async (std::launch::async, DeleteFilesFromDisk); return std::async (std::launch::async, DeleteFilesFromDisk);
} }
bool UpdateRouterProfile (const IdentHash& identHash, std::function<void (std::shared_ptr<RouterProfile>)> update)
{
if (!update) return true;
std::shared_ptr<RouterProfile> profile;
{
std::lock_guard<std::mutex> l(g_ProfilesMutex);
auto it = g_Profiles.find (identHash);
if (it != g_Profiles.end ())
profile = it->second;
}
if (profile)
{
update (profile);
return true;
}
// postpone
std::lock_guard<std::mutex> l(g_PostponedUpdatesMutex);
g_PostponedUpdates.emplace_back (identHash, update);
return false;
}
static void ApplyPostponedUpdates (std::list<std::pair<i2p::data::IdentHash, std::function<void (std::shared_ptr<RouterProfile>)> > >&& updates)
{
for (const auto& [ident, update] : updates)
{
auto profile = GetRouterProfile (ident);
update (profile);
}
}
std::future<void> FlushPostponedRouterProfileUpdates ()
{
if (g_PostponedUpdates.empty ()) return std::future<void>();
std::list<std::pair<i2p::data::IdentHash, std::function<void (std::shared_ptr<RouterProfile>)> > > updates;
{
std::lock_guard<std::mutex> l(g_PostponedUpdatesMutex);
g_PostponedUpdates.swap (updates);
}
return std::async (std::launch::async, ApplyPostponedUpdates, std::move (updates));
}
} }
} }

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2024, The PurpleI2P Project * Copyright (c) 2013-2025, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -11,6 +11,7 @@
#include <memory> #include <memory>
#include <future> #include <future>
#include <functional>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include "Identity.h" #include "Identity.h"
@ -44,6 +45,8 @@ namespace data
const int PEER_PROFILE_UNREACHABLE_INTERVAL = 480; // in seconds (8 minutes) const int PEER_PROFILE_UNREACHABLE_INTERVAL = 480; // in seconds (8 minutes)
const int PEER_PROFILE_USEFUL_THRESHOLD = 3; const int PEER_PROFILE_USEFUL_THRESHOLD = 3;
const int PEER_PROFILE_ALWAYS_DECLINING_NUM = 5; // num declines in row to consider always declined const int PEER_PROFILE_ALWAYS_DECLINING_NUM = 5; // num declines in row to consider always declined
const int PEER_PROFILE_APPLY_POSTPONED_TIMEOUT = 2100; // in milliseconds
const int PEER_PROFILE_APPLY_POSTPONED_TIMEOUT_VARIANCE = 500; // in milliseconds
class RouterProfile class RouterProfile
{ {
@ -108,6 +111,8 @@ namespace data
std::future<void> DeleteObsoleteProfiles (); std::future<void> DeleteObsoleteProfiles ();
void SaveProfiles (); void SaveProfiles ();
std::future<void> PersistProfiles (); std::future<void> PersistProfiles ();
bool UpdateRouterProfile (const IdentHash& identHash, std::function<void (std::shared_ptr<RouterProfile>)> update); // return true if updated immediately, and false if postponed
std::future<void> FlushPostponedRouterProfileUpdates ();
} }
} }

View file

@ -1206,7 +1206,16 @@ namespace transport
return false; return false;
} }
if (!m_Address->published) if (!m_Address->published)
ri->GetProfile ()->SetLastEndpoint (m_RemoteEndpoint); {
if (ri->HasProfile ())
ri->GetProfile ()->SetLastEndpoint (m_RemoteEndpoint);
else
i2p::data::UpdateRouterProfile (ri->GetIdentHash (),
[ep = m_RemoteEndpoint](std::shared_ptr<i2p::data::RouterProfile> profile)
{
if (profile) profile->SetLastEndpoint (ep);
});
}
SetRemoteIdentity (ri->GetRouterIdentity ()); SetRemoteIdentity (ri->GetRouterIdentity ());
AdjustMaxPayloadSize (); AdjustMaxPayloadSize ();
m_Server.AddSessionByRouterHash (shared_from_this ()); // we know remote router now m_Server.AddSessionByRouterHash (shared_from_this ()); // we know remote router now