fixed race codition between RouterInfo's buffer persist and update

This commit is contained in:
orignal 2024-05-08 19:09:03 -04:00
parent 425ef2cfe5
commit b98b3a87b0
4 changed files with 52 additions and 32 deletions

View file

@ -698,7 +698,7 @@ namespace data
expirationTimeout = i2p::context.IsFloodfill () ? NETDB_FLOODFILL_EXPIRATION_TIMEOUT*1000LL : expirationTimeout = i2p::context.IsFloodfill () ? NETDB_FLOODFILL_EXPIRATION_TIMEOUT*1000LL :
NETDB_MIN_EXPIRATION_TIMEOUT*1000LL + (NETDB_MAX_EXPIRATION_TIMEOUT - NETDB_MIN_EXPIRATION_TIMEOUT)*1000LL*NETDB_MIN_ROUTERS/total; NETDB_MIN_EXPIRATION_TIMEOUT*1000LL + (NETDB_MAX_EXPIRATION_TIMEOUT - NETDB_MIN_EXPIRATION_TIMEOUT)*1000LL*NETDB_MIN_ROUTERS/total;
std::list<std::pair<std::string, std::shared_ptr<RouterInfo> > > saveToDisk; std::list<std::pair<std::string, std::shared_ptr<RouterInfo::Buffer> > > saveToDisk;
std::list<std::string> removeFromDisk; std::list<std::string> removeFromDisk;
auto own = i2p::context.GetSharedRouterInfo (); auto own = i2p::context.GetSharedRouterInfo ();
@ -711,7 +711,14 @@ namespace data
if (it.second->GetBuffer ()) if (it.second->GetBuffer ())
{ {
// we have something to save // we have something to save
saveToDisk.push_back(std::make_pair(ident, it.second)); std::shared_ptr<RouterInfo::Buffer> buffer;
{
std::lock_guard<std::mutex> l(m_RouterInfosMutex); // possible collision between DeleteBuffer and Update
buffer = it.second->GetSharedBuffer ();
it.second->DeleteBuffer ();
}
if (buffer && !it.second->IsUnreachable ()) // don't save bad router
saveToDisk.push_back(std::make_pair(ident, buffer));
it.second->SetUnreachable (false); it.second->SetUnreachable (false);
} }
it.second->SetUpdated (false); it.second->SetUpdated (false);
@ -800,15 +807,11 @@ namespace data
} }
} }
void NetDb::PersistRouters (std::list<std::pair<std::string, std::shared_ptr<RouterInfo> > >&& update, void NetDb::PersistRouters (std::list<std::pair<std::string, std::shared_ptr<RouterInfo::Buffer> > >&& update,
std::list<std::string>&& remove) std::list<std::string>&& remove)
{ {
for (auto it: update) for (auto it: update)
{ RouterInfo::SaveToFile (m_Storage.Path(it.first), it.second);
it.second->SaveToFile (m_Storage.Path(it.first));
std::lock_guard<std::mutex> l(m_RouterInfosMutex); // possible collision between DeleteBuffer and Update
it.second->DeleteBuffer ();
}
for (auto it: remove) for (auto it: remove)
m_Storage.Remove (it); m_Storage.Remove (it);
} }

View file

@ -146,7 +146,7 @@ namespace data
void Load (); void Load ();
bool LoadRouterInfo (const std::string& path, uint64_t ts); bool LoadRouterInfo (const std::string& path, uint64_t ts);
void SaveUpdated (); void SaveUpdated ();
void PersistRouters (std::list<std::pair<std::string, std::shared_ptr<RouterInfo> > >&& update, void PersistRouters (std::list<std::pair<std::string, std::shared_ptr<RouterInfo::Buffer> > >&& update,
std::list<std::string>&& remove); std::list<std::string>&& remove);
void Run (); // exploratory thread void Run (); // exploratory thread
void Explore (int numDestinations); void Explore (int numDestinations);

View file

@ -34,6 +34,7 @@ namespace data
{ {
if (len > size ()) len = size (); if (len > size ()) len = size ();
memcpy (data (), buf, len); memcpy (data (), buf, len);
m_BufferLen = len;
} }
RouterInfo::RouterInfo (): m_Buffer (nullptr) RouterInfo::RouterInfo (): m_Buffer (nullptr)
@ -60,7 +61,7 @@ namespace data
{ {
m_Addresses = boost::make_shared<Addresses>(); // create empty list m_Addresses = boost::make_shared<Addresses>(); // create empty list
m_Buffer = buf; m_Buffer = buf;
m_BufferLen = len; if (m_Buffer) m_Buffer->SetBufferLen (len);
ReadFromBuffer (true); ReadFromBuffer (true);
} }
else else
@ -129,8 +130,8 @@ namespace data
if (s.is_open ()) if (s.is_open ())
{ {
s.seekg (0,std::ios::end); s.seekg (0,std::ios::end);
m_BufferLen = s.tellg (); size_t bufferLen = s.tellg ();
if (m_BufferLen < 40 || m_BufferLen > MAX_RI_BUFFER_SIZE) if (bufferLen < 40 || bufferLen > MAX_RI_BUFFER_SIZE)
{ {
LogPrint(eLogError, "RouterInfo: File ", fullPath, " is malformed"); LogPrint(eLogError, "RouterInfo: File ", fullPath, " is malformed");
return false; return false;
@ -138,7 +139,8 @@ namespace data
s.seekg(0, std::ios::beg); s.seekg(0, std::ios::beg);
if (!m_Buffer) if (!m_Buffer)
m_Buffer = NewBuffer (); m_Buffer = NewBuffer ();
s.read((char *)m_Buffer->data (), m_BufferLen); s.read((char *)m_Buffer->data (), bufferLen);
m_Buffer->SetBufferLen (bufferLen);
} }
else else
{ {
@ -163,11 +165,12 @@ namespace data
m_IsUnreachable = true; m_IsUnreachable = true;
return; return;
} }
m_RouterIdentity = NewIdentity (m_Buffer->data (), m_BufferLen); size_t bufferLen = m_Buffer->GetBufferLen ();
m_RouterIdentity = NewIdentity (m_Buffer->data (), bufferLen);
size_t identityLen = m_RouterIdentity->GetFullLen (); size_t identityLen = m_RouterIdentity->GetFullLen ();
if (identityLen >= m_BufferLen) if (identityLen >= bufferLen)
{ {
LogPrint (eLogError, "RouterInfo: Identity length ", identityLen, " exceeds buffer size ", m_BufferLen); LogPrint (eLogError, "RouterInfo: Identity length ", identityLen, " exceeds buffer size ", bufferLen);
m_IsUnreachable = true; m_IsUnreachable = true;
return; return;
} }
@ -181,7 +184,7 @@ namespace data
return; return;
} }
// verify signature // verify signature
int l = m_BufferLen - m_RouterIdentity->GetSignatureLen (); int l = bufferLen - m_RouterIdentity->GetSignatureLen ();
if (l < 0 || !m_RouterIdentity->Verify ((uint8_t *)m_Buffer->data (), l, (uint8_t *)m_Buffer->data () + l)) if (l < 0 || !m_RouterIdentity->Verify ((uint8_t *)m_Buffer->data (), l, (uint8_t *)m_Buffer->data () + l))
{ {
LogPrint (eLogError, "RouterInfo: Signature verification failed"); LogPrint (eLogError, "RouterInfo: Signature verification failed");
@ -191,7 +194,7 @@ namespace data
} }
// parse RI // parse RI
std::stringstream str; std::stringstream str;
str.write ((const char *)m_Buffer->data () + identityLen, m_BufferLen - identityLen); str.write ((const char *)m_Buffer->data () + identityLen, bufferLen - identityLen);
ReadFromStream (str); ReadFromStream (str);
if (!str) if (!str)
{ {
@ -625,6 +628,19 @@ namespace data
return m_Buffer->data (); return m_Buffer->data ();
} }
bool RouterInfo::SaveToFile (const std::string& fullPath, std::shared_ptr<Buffer> buf)
{
if (!buf) return false;
std::ofstream f (fullPath, std::ofstream::binary | std::ofstream::out);
if (!f.is_open ())
{
LogPrint (eLogError, "RouterInfo: Can't save to ", fullPath);
return false;
}
f.write ((char *)buf->data (), buf->GetBufferLen ());
return true;
}
bool RouterInfo::SaveToFile (const std::string& fullPath) bool RouterInfo::SaveToFile (const std::string& fullPath)
{ {
if (m_IsUnreachable) return false; // don't save bad router if (m_IsUnreachable) return false; // don't save bad router
@ -633,14 +649,7 @@ namespace data
LogPrint (eLogWarning, "RouterInfo: Can't save, m_Buffer == NULL"); LogPrint (eLogWarning, "RouterInfo: Can't save, m_Buffer == NULL");
return false; return false;
} }
std::ofstream f (fullPath, std::ofstream::binary | std::ofstream::out); return SaveToFile (fullPath, m_Buffer);
if (!f.is_open ())
{
LogPrint (eLogError, "RouterInfo: Can't save to ", fullPath);
return false;
}
f.write ((char *)m_Buffer->data (), m_BufferLen);
return true;
} }
size_t RouterInfo::ReadString (char * str, size_t len, std::istream& s) const size_t RouterInfo::ReadString (char * str, size_t len, std::istream& s) const
@ -1108,7 +1117,7 @@ namespace data
m_Buffer = NewBuffer (); m_Buffer = NewBuffer ();
if (len > m_Buffer->size ()) len = m_Buffer->size (); if (len > m_Buffer->size ()) len = m_Buffer->size ();
memcpy (m_Buffer->data (), buf, len); memcpy (m_Buffer->data (), buf, len);
m_BufferLen = len; m_Buffer->SetBufferLen (len);
} }
std::shared_ptr<RouterInfo::Buffer> RouterInfo::NewBuffer () const std::shared_ptr<RouterInfo::Buffer> RouterInfo::NewBuffer () const

View file

@ -188,6 +188,13 @@ namespace data
Buffer () = default; Buffer () = default;
Buffer (const uint8_t * buf, size_t len); Buffer (const uint8_t * buf, size_t len);
size_t GetBufferLen () const { return m_BufferLen; };
void SetBufferLen (size_t len) { m_BufferLen = len; };
private:
size_t m_BufferLen = 0;
}; };
typedef std::array<std::shared_ptr<Address>, eNumTransports> Addresses; typedef std::array<std::shared_ptr<Address>, eNumTransports> Addresses;
@ -272,18 +279,20 @@ namespace data
const uint8_t * GetBuffer () const { return m_Buffer ? m_Buffer->data () : nullptr; }; const uint8_t * GetBuffer () const { return m_Buffer ? m_Buffer->data () : nullptr; };
const uint8_t * LoadBuffer (const std::string& fullPath); // load if necessary const uint8_t * LoadBuffer (const std::string& fullPath); // load if necessary
size_t GetBufferLen () const { return m_BufferLen; }; size_t GetBufferLen () const { return m_Buffer ? m_Buffer->GetBufferLen () : 0; };
void DeleteBuffer () { m_Buffer = nullptr; };
std::shared_ptr<Buffer> GetSharedBuffer () const { return m_Buffer; };
bool IsUpdated () const { return m_IsUpdated; }; bool IsUpdated () const { return m_IsUpdated; };
void SetUpdated (bool updated) { m_IsUpdated = updated; }; void SetUpdated (bool updated) { m_IsUpdated = updated; };
bool SaveToFile (const std::string& fullPath); bool SaveToFile (const std::string& fullPath);
static bool SaveToFile (const std::string& fullPath, std::shared_ptr<Buffer> buf);
std::shared_ptr<RouterProfile> GetProfile () const; std::shared_ptr<RouterProfile> GetProfile () const;
void DropProfile () { m_Profile = nullptr; }; void DropProfile () { m_Profile = nullptr; };
bool HasProfile () const { return (bool)m_Profile; }; bool HasProfile () const { return (bool)m_Profile; };
bool Update (const uint8_t * buf, size_t len); bool Update (const uint8_t * buf, size_t len);
void DeleteBuffer () { m_Buffer = nullptr; };
bool IsNewer (const uint8_t * buf, size_t len) const; bool IsNewer (const uint8_t * buf, size_t len) const;
/** return true if we are in a router family and the signature is valid */ /** return true if we are in a router family and the signature is valid */
@ -300,7 +309,7 @@ namespace data
RouterInfo (); RouterInfo ();
uint8_t * GetBufferPointer (size_t offset = 0 ) { return m_Buffer->data () + offset; }; uint8_t * GetBufferPointer (size_t offset = 0 ) { return m_Buffer->data () + offset; };
void UpdateBuffer (const uint8_t * buf, size_t len); void UpdateBuffer (const uint8_t * buf, size_t len);
void SetBufferLen (size_t len) { m_BufferLen = len; }; void SetBufferLen (size_t len) { if (m_Buffer) m_Buffer->SetBufferLen (len); };
void RefreshTimestamp (); void RefreshTimestamp ();
CompatibleTransports GetReachableTransports () const { return m_ReachableTransports; }; CompatibleTransports GetReachableTransports () const { return m_ReachableTransports; };
void SetReachableTransports (CompatibleTransports transports) { m_ReachableTransports = transports; }; void SetReachableTransports (CompatibleTransports transports) { m_ReachableTransports = transports; };
@ -328,7 +337,6 @@ namespace data
FamilyID m_FamilyID; FamilyID m_FamilyID;
std::shared_ptr<const IdentityEx> m_RouterIdentity; std::shared_ptr<const IdentityEx> m_RouterIdentity;
std::shared_ptr<Buffer> m_Buffer; std::shared_ptr<Buffer> m_Buffer;
size_t m_BufferLen;
uint64_t m_Timestamp; // in milliseconds uint64_t m_Timestamp; // in milliseconds
boost::shared_ptr<Addresses> m_Addresses; // TODO: use std::shared_ptr and std::atomic_store for gcc >= 4.9 boost::shared_ptr<Addresses> m_Addresses; // TODO: use std::shared_ptr and std::atomic_store for gcc >= 4.9
bool m_IsUpdated, m_IsUnreachable, m_IsFloodfill; bool m_IsUpdated, m_IsUnreachable, m_IsFloodfill;