atomics have been used for threads stop

This commit is contained in:
brain5lug 2017-10-01 00:46:49 +03:00
parent 8c09a7429c
commit dfa1482ab2
15 changed files with 259 additions and 212 deletions

View file

@ -10,7 +10,6 @@
namespace i2p {
namespace log {
static Log logger;
/**
* @brief Maps our loglevel to their symbolic name
*/
@ -70,17 +69,21 @@ namespace log {
void Log::Start ()
{
if (!m_IsRunning)
{
m_IsRunning = true;
// separate load and store for atomic is valid here
// because only one thread changes m_IsRunning
if (!m_IsRunning.load())
{
m_IsRunning.store(true);
m_Thread = new std::thread (std::bind (&Log::Run, this));
}
}
void Log::Stop ()
{
switch (m_Destination)
if (m_IsRunning.load())
{
switch (m_Destination)
{
#ifndef _WIN32
case eLogSyslog :
closelog();
@ -88,20 +91,22 @@ namespace log {
#endif
case eLogFile:
case eLogStream:
if (m_LogStream) m_LogStream->flush();
if (m_LogStream) m_LogStream->flush();
break;
default:
/* do nothing */
break;
}
m_IsRunning.store(false);
m_Queue.WakeUp ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = nullptr;
}
}
m_IsRunning = false;
m_Queue.WakeUp ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = nullptr;
}
}
void Log::SetLogLevel (const std::string& level) {
@ -162,13 +167,13 @@ namespace log {
void Log::Run ()
{
Reopen ();
while (m_IsRunning)
while (m_IsRunning.load(std::memory_order_acquire))
{
std::shared_ptr<LogMsg> msg;
while (msg = m_Queue.Get ())
Process (msg);
if (m_LogStream) m_LogStream->flush();
if (m_IsRunning)
if (m_IsRunning.load(std::memory_order_acquire))
m_Queue.Wait ();
}
}
@ -215,6 +220,7 @@ namespace log {
}
Log & Logger() {
static Log logger;
return logger;
}
} // log

View file

@ -17,6 +17,7 @@
#include <chrono>
#include <memory>
#include <thread>
#include <atomic>
#include "Queue.h"
#ifndef _WIN32
@ -59,7 +60,7 @@ namespace log {
i2p::util::Queue<std::shared_ptr<LogMsg> > m_Queue;
bool m_HasColors;
std::string m_TimeFormat;
volatile bool m_IsRunning;
std::atomic_bool m_IsRunning;
std::thread * m_Thread;
private:
@ -84,8 +85,8 @@ namespace log {
Log ();
~Log ();
LogType GetLogType () { return m_Destination; };
LogLevel GetLogLevel () { return m_MinLevel; };
LogType GetLogType () { return m_Destination; }
LogLevel GetLogLevel () { return m_MinLevel; }
void Start ();
void Stop ();
@ -112,7 +113,7 @@ namespace log {
* @brief Sets format for timestamps in log
* @param format String with timestamp format
*/
void SetTimeFormat (std::string format) { m_TimeFormat = format; };
void SetTimeFormat (std::string format) { m_TimeFormat = format; }
#ifndef _WIN32
/**
@ -145,8 +146,8 @@ namespace log {
std::string text; /**< message text as single string */
LogLevel level; /**< message level */
std::thread::id tid; /**< id of thread that generated message */
LogMsg (LogLevel lvl, std::time_t ts, const std::string & txt): timestamp(ts), text(txt), level(lvl) {};
LogMsg (LogLevel lvl, std::time_t ts, const std::string & txt): timestamp(ts), text(txt), level(lvl) {}
};
Log & Logger();
@ -184,7 +185,7 @@ void LogPrint (LogLevel level, TArgs&&... args) noexcept
std::stringstream ss("");
LogPrint (ss, std::forward<TArgs>(args)...);
auto msg = std::make_shared<i2p::log::LogMsg>(level, std::time(nullptr), ss.str());
msg->tid = std::this_thread::get_id();
log.Append(msg);

View file

@ -22,7 +22,7 @@ using namespace i2p::transport;
namespace i2p
{
namespace data
{
{
NetDb netdb;
NetDb::NetDb (): m_IsRunning (false), m_Thread (nullptr), m_Reseeder (nullptr), m_Storage("netDb", "r", "routerInfo-", "dat"), m_FloodfillBootstrap(nullptr), m_HiddenMode(false)
@ -31,65 +31,70 @@ namespace data
NetDb::~NetDb ()
{
Stop ();
Stop ();
delete m_Reseeder;
}
}
void NetDb::Start ()
{
m_Storage.SetPlace(i2p::fs::GetDataDir());
m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64);
InitProfilesStorage ();
m_Families.LoadCertificates ();
Load ();
// separate load and store for atomic is valid here
// because only one thread changes m_IsRunning
if (!m_IsRunning.load())
{
m_Storage.SetPlace(i2p::fs::GetDataDir());
m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64);
InitProfilesStorage ();
m_Families.LoadCertificates ();
Load ();
uint16_t threshold; i2p::config::GetOption("reseed.threshold", threshold);
if (m_RouterInfos.size () < threshold) // reseed if # of router less than threshold
Reseed ();
uint16_t threshold; i2p::config::GetOption("reseed.threshold", threshold);
if (m_RouterInfos.size () < threshold) // reseed if # of router less than threshold
Reseed ();
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&NetDb::Run, this));
m_IsRunning.store(true);
m_Thread = new std::thread (std::bind (&NetDb::Run, this));
}
}
void NetDb::Stop ()
{
if (m_IsRunning)
{
if (m_IsRunning.load())
{
for (auto& it: m_RouterInfos)
it.second->SaveProfile ();
DeleteObsoleteProfiles ();
m_RouterInfos.clear ();
m_Floodfills.clear ();
if (m_Thread)
{
m_IsRunning = false;
{
m_IsRunning.store(false);
m_Queue.WakeUp ();
m_Thread->join ();
delete m_Thread;
m_Thread = 0;
m_Thread = nullptr;
}
m_LeaseSets.clear();
m_Requests.Stop ();
}
}
}
void NetDb::Run ()
{
uint32_t lastSave = 0, lastPublish = 0, lastExploratory = 0, lastManageRequest = 0, lastDestinationCleanup = 0;
while (m_IsRunning)
{
uint64_t lastSave = 0, lastPublish = 0, lastExploratory = 0, lastManageRequest = 0, lastDestinationCleanup = 0;
while (m_IsRunning.load(std::memory_order_acquire))
{
try
{
{
auto msg = m_Queue.GetNextWithTimeout (15000); // 15 sec
if (msg)
{
int numMsgs = 0;
{
int numMsgs = 0;
while (msg)
{
LogPrint(eLogDebug, "NetDb: got request with type ", (int) msg->GetTypeID ());
switch (msg->GetTypeID ())
{
case eI2NPDatabaseStore:
case eI2NPDatabaseStore:
HandleDatabaseStoreMsg (msg);
break;
case eI2NPDatabaseSearchReply:
@ -97,7 +102,7 @@ namespace data
break;
case eI2NPDatabaseLookup:
HandleDatabaseLookupMsg (msg);
break;
break;
default: // WTF?
LogPrint (eLogError, "NetDb: unexpected message type ", (int) msg->GetTypeID ());
//i2p::HandleI2NPMessage (msg);
@ -105,23 +110,25 @@ namespace data
if (numMsgs > 100) break;
msg = m_Queue.Get ();
numMsgs++;
}
}
if (!m_IsRunning) break;
}
}
if (!m_IsRunning.load(std::memory_order_acquire))
break;
uint64_t ts = i2p::util::GetSecondsSinceEpoch ();
if (ts - lastManageRequest >= 15) // manage requests every 15 seconds
{
m_Requests.ManageRequests ();
lastManageRequest = ts;
}
}
if (ts - lastSave >= 60) // save routers, manage leasesets and validate subscriptions every minute
{
if (lastSave)
{
SaveUpdated ();
ManageLeaseSets ();
}
}
lastSave = ts;
}
if (ts - lastDestinationCleanup >= i2p::garlic::INCOMING_TAGS_EXPIRATION_TIMEOUT)
@ -129,23 +136,23 @@ namespace data
i2p::context.CleanupDestination ();
lastDestinationCleanup = ts;
}
if (ts - lastPublish >= NETDB_PUBLISH_INTERVAL && !m_HiddenMode) // publish
{
Publish ();
lastPublish = ts;
}
}
if (ts - lastExploratory >= 30) // exploratory every 30 seconds
{
{
auto numRouters = m_RouterInfos.size ();
if (numRouters == 0)
{
throw std::runtime_error("No known routers, reseed seems to be totally failed");
throw std::runtime_error("No known routers, reseed seems to be totally failed");
}
else // we have peers now
m_FloodfillBootstrap = nullptr;
if (numRouters < 2500 || ts - lastExploratory >= 90)
{
{
numRouters = 800/numRouters;
if (numRouters < 1) numRouters = 1;
if (numRouters > 9) numRouters = 9;
@ -153,15 +160,15 @@ namespace data
if(!m_HiddenMode)
Explore (numRouters);
lastExploratory = ts;
}
}
}
}
}
catch (std::exception& ex)
{
LogPrint (eLogError, "NetDb: runtime exception: ", ex.what ());
}
}
}
}
}
}
bool NetDb::AddRouterInfo (const uint8_t * buf, int len)
{
@ -171,11 +178,11 @@ namespace data
return false;
}
void NetDb::SetHidden(bool hide) {
// TODO: remove reachable addresses from router info
m_HiddenMode = hide;
}
void NetDb::SetHidden(bool hide) {
// TODO: remove reachable addresses from router info
m_HiddenMode = hide;
}
bool NetDb::AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len)
{
bool updated = true;
@ -330,7 +337,7 @@ namespace data
}
}
m_Reseeder->Bootstrap ();
m_Reseeder->Bootstrap ();
}
void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills)
@ -392,7 +399,7 @@ namespace data
void NetDb::VisitStoredRouterInfos(RouterInfoVisitor v)
{
m_Storage.Iterate([v] (const std::string & filename) {
auto ri = std::make_shared<i2p::data::RouterInfo>(filename);
auto ri = std::make_shared<i2p::data::RouterInfo>(filename);
v(ri);
});
}

View file

@ -8,6 +8,7 @@
#include <string>
#include <thread>
#include <mutex>
#include <atomic>
#include "Base.h"
#include "Gzip.h"
@ -25,7 +26,7 @@
namespace i2p
{
namespace data
{
{
const int NETDB_MIN_ROUTERS = 90;
const int NETDB_FLOODFILL_EXPIRATION_TIMEOUT = 60*60; // 1 hour, in seconds
const int NETDB_INTRODUCEE_EXPIRATION_TIMEOUT = 65*60;
@ -41,7 +42,7 @@ namespace data
/** function for visiting a router info and determining if we want to use it */
typedef std::function<bool(std::shared_ptr<const i2p::data::RouterInfo>)> RouterInfoFilter;
class NetDb
{
public:
@ -51,7 +52,7 @@ namespace data
void Start ();
void Stop ();
bool AddRouterInfo (const uint8_t * buf, int len);
bool AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len);
bool AddLeaseSet (const IdentHash& ident, const uint8_t * buf, int len, std::shared_ptr<i2p::tunnel::InboundTunnel> from);
@ -60,7 +61,7 @@ namespace data
std::shared_ptr<RouterProfile> FindRouterProfile (const IdentHash& ident) const;
void RequestDestination (const IdentHash& destination, RequestedDestination::RequestComplete requestComplete = nullptr);
void RequestDestinationFrom (const IdentHash& destination, const IdentHash & from, bool exploritory, RequestedDestination::RequestComplete requestComplete = nullptr);
void RequestDestinationFrom (const IdentHash& destination, const IdentHash & from, bool exploritory, RequestedDestination::RequestComplete requestComplete = nullptr);
void HandleDatabaseStoreMsg (std::shared_ptr<const I2NPMessage> msg);
void HandleDatabaseSearchReplyMsg (std::shared_ptr<const I2NPMessage> msg);
@ -75,21 +76,21 @@ namespace data
std::vector<IdentHash> GetClosestFloodfills (const IdentHash& destination, size_t num,
std::set<IdentHash>& excluded, bool closeThanUsOnly = false) const;
std::shared_ptr<const RouterInfo> GetClosestNonFloodfill (const IdentHash& destination, const std::set<IdentHash>& excluded) const;
std::shared_ptr<const RouterInfo> GetRandomRouterInFamily(const std::string & fam) const;
std::shared_ptr<const RouterInfo> GetRandomRouterInFamily(const std::string & fam) const;
void SetUnreachable (const IdentHash& ident, bool unreachable);
void PostI2NPMsg (std::shared_ptr<const I2NPMessage> msg);
/** set hidden mode, aka don't publish our RI to netdb and don't explore */
void SetHidden(bool hide);
/** set hidden mode, aka don't publish our RI to netdb and don't explore */
void SetHidden(bool hide);
void Reseed ();
Families& GetFamilies () { return m_Families; };
Families& GetFamilies () { return m_Families; }
// for web interface
int GetNumRouters () const { return m_RouterInfos.size (); };
int GetNumFloodfills () const { return m_Floodfills.size (); };
int GetNumLeaseSets () const { return m_LeaseSets.size (); };
int GetNumRouters () const { return m_RouterInfos.size (); }
int GetNumFloodfills () const { return m_Floodfills.size (); }
int GetNumLeaseSets () const { return m_LeaseSets.size (); }
/** visit all lease sets we currently store */
void VisitLeaseSets(LeaseSetVisitor v);
@ -100,7 +101,7 @@ namespace data
/** visit N random router that match using filter, then visit them with a visitor, return number of RouterInfos that were visited */
size_t VisitRandomRouterInfos(RouterInfoFilter f, RouterInfoVisitor v, size_t n);
void ClearRouterInfos () { m_RouterInfos.clear (); };
void ClearRouterInfos () { m_RouterInfos.clear (); }
private:
@ -115,8 +116,8 @@ namespace data
void ReseedFromFloodfill(const RouterInfo & ri, int numRouters=40, int numFloodfills=20);
template<typename Filter>
std::shared_ptr<const RouterInfo> GetRandomRouter (Filter filter) const;
template<typename Filter>
std::shared_ptr<const RouterInfo> GetRandomRouter (Filter filter) const;
private:
@ -127,9 +128,9 @@ namespace data
mutable std::mutex m_FloodfillsMutex;
std::list<std::shared_ptr<RouterInfo> > m_Floodfills;
bool m_IsRunning;
std::atomic_bool m_IsRunning;
uint64_t m_LastLoad;
std::thread * m_Thread;
std::thread * m_Thread;
i2p::util::Queue<std::shared_ptr<const I2NPMessage> > m_Queue; // of I2NPDatabaseStoreMsg
GzipInflator m_Inflator;
@ -142,10 +143,10 @@ namespace data
/** router info we are bootstrapping from or nullptr if we are not currently doing that*/
std::shared_ptr<RouterInfo> m_FloodfillBootstrap;
/** true if in hidden mode */
bool m_HiddenMode;
/** true if in hidden mode */
bool m_HiddenMode;
};
extern NetDb netdb;

View file

@ -445,19 +445,25 @@ namespace tunnel
void Tunnels::Start ()
{
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&Tunnels::Run, this));
if (!m_IsRunning.load())
{
m_IsRunning.store(true);
m_Thread = new std::thread (std::bind (&Tunnels::Run, this));
}
}
void Tunnels::Stop ()
{
m_IsRunning = false;
m_Queue.WakeUp ();
if (m_Thread)
if (m_IsRunning.load())
{
m_Thread->join ();
delete m_Thread;
m_Thread = 0;
m_IsRunning.store(false);
m_Queue.WakeUp ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = 0;
}
}
}
@ -466,7 +472,7 @@ namespace tunnel
std::this_thread::sleep_for (std::chrono::seconds(1)); // wait for other parts are ready
uint64_t lastTs = 0;
while (m_IsRunning)
while (m_IsRunning.load(std::memory_order_acquire))
{
try
{

View file

@ -10,6 +10,7 @@
#include <thread>
#include <mutex>
#include <memory>
#include <atomic>
#include "Queue.h"
#include "Crypto.h"
#include "TunnelConfig.h"
@ -61,7 +62,7 @@ namespace tunnel
#endif
}
const int TUNNEL_EXPIRATION_TIMEOUT = 660; // 11 minutes
const int TUNNEL_EXPIRATION_THRESHOLD = 60; // 1 minute
const int TUNNEL_RECREATION_THRESHOLD = 90; // 1.5 minutes
@ -99,20 +100,20 @@ namespace tunnel
std::shared_ptr<const TunnelConfig> GetTunnelConfig () const { return m_Config; }
std::vector<std::shared_ptr<const i2p::data::IdentityEx> > GetPeers () const;
std::vector<std::shared_ptr<const i2p::data::IdentityEx> > GetInvertedPeers () const;
TunnelState GetState () const { return m_State; };
TunnelState GetState () const { return m_State; }
void SetState (TunnelState state);
bool IsEstablished () const { return m_State == eTunnelStateEstablished; };
bool IsFailed () const { return m_State == eTunnelStateFailed; };
bool IsRecreated () const { return m_IsRecreated; };
void SetIsRecreated () { m_IsRecreated = true; };
bool IsEstablished () const { return m_State == eTunnelStateEstablished; }
bool IsFailed () const { return m_State == eTunnelStateFailed; }
bool IsRecreated () const { return m_IsRecreated; }
void SetIsRecreated () { m_IsRecreated = true; }
virtual bool IsInbound() const = 0;
std::shared_ptr<TunnelPool> GetTunnelPool () const { return m_Pool; };
void SetTunnelPool (std::shared_ptr<TunnelPool> pool) { m_Pool = pool; };
std::shared_ptr<TunnelPool> GetTunnelPool () const { return m_Pool; }
void SetTunnelPool (std::shared_ptr<TunnelPool> pool) { m_Pool = pool; }
bool HandleTunnelBuildResponse (uint8_t * msg, size_t len);
virtual void Print (std::stringstream&) const {};
virtual void Print (std::stringstream&) const {}
// implements TunnelBase
void SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg);
@ -145,12 +146,12 @@ namespace tunnel
public:
OutboundTunnel (std::shared_ptr<const TunnelConfig> config):
Tunnel (config), m_Gateway (this), m_EndpointIdentHash (config->GetLastIdentHash ()) {};
Tunnel (config), m_Gateway (this), m_EndpointIdentHash (config->GetLastIdentHash ()) {}
void SendTunnelDataMsg (const uint8_t * gwHash, uint32_t gwTunnel, std::shared_ptr<i2p::I2NPMessage> msg);
virtual void SendTunnelDataMsg (const std::vector<TunnelMessageBlock>& msgs); // multiple messages
const i2p::data::IdentHash& GetEndpointIdentHash () const { return m_EndpointIdentHash; };
virtual size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); };
const i2p::data::IdentHash& GetEndpointIdentHash () const { return m_EndpointIdentHash; }
virtual size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); }
void Print (std::stringstream& s) const;
// implements TunnelBase
@ -169,14 +170,14 @@ namespace tunnel
{
public:
InboundTunnel (std::shared_ptr<const TunnelConfig> config): Tunnel (config), m_Endpoint (true) {};
InboundTunnel (std::shared_ptr<const TunnelConfig> config): Tunnel (config), m_Endpoint (true) {}
void HandleTunnelDataMsg (std::shared_ptr<const I2NPMessage> msg);
virtual size_t GetNumReceivedBytes () const { return m_Endpoint.GetNumReceivedBytes (); };
virtual size_t GetNumReceivedBytes () const { return m_Endpoint.GetNumReceivedBytes (); }
void Print (std::stringstream& s) const;
bool IsInbound() const { return true; }
// override TunnelBase
void Cleanup () { m_Endpoint.Cleanup (); };
void Cleanup () { m_Endpoint.Cleanup (); }
private:
@ -190,7 +191,7 @@ namespace tunnel
ZeroHopsInboundTunnel ();
void SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg);
void Print (std::stringstream& s) const;
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }
private:
@ -204,7 +205,7 @@ namespace tunnel
ZeroHopsOutboundTunnel ();
void SendTunnelDataMsg (const std::vector<TunnelMessageBlock>& msgs);
void Print (std::stringstream& s) const;
size_t GetNumSentBytes () const { return m_NumSentBytes; };
size_t GetNumSentBytes () const { return m_NumSentBytes; }
private:
@ -224,7 +225,7 @@ namespace tunnel
std::shared_ptr<OutboundTunnel> GetPendingOutboundTunnel (uint32_t replyMsgID);
std::shared_ptr<InboundTunnel> GetNextInboundTunnel ();
std::shared_ptr<OutboundTunnel> GetNextOutboundTunnel ();
std::shared_ptr<TunnelPool> GetExploratoryPool () const { return m_ExploratoryPool; };
std::shared_ptr<TunnelPool> GetExploratoryPool () const { return m_ExploratoryPool; }
std::shared_ptr<TunnelBase> GetTunnel (uint32_t tunnelID);
int GetTransitTunnelsExpirationTimeout ();
void AddTransitTunnel (std::shared_ptr<TransitTunnel> tunnel);
@ -266,7 +267,7 @@ namespace tunnel
private:
bool m_IsRunning;
std::atomic_bool m_IsRunning;
std::thread * m_Thread;
std::map<uint32_t, std::shared_ptr<InboundTunnel> > m_PendingInboundTunnels; // by replyMsgID
std::map<uint32_t, std::shared_ptr<OutboundTunnel> > m_PendingOutboundTunnels; // by replyMsgID
@ -285,24 +286,24 @@ namespace tunnel
public:
// for HTTP only
const decltype(m_OutboundTunnels)& GetOutboundTunnels () const { return m_OutboundTunnels; };
const decltype(m_InboundTunnels)& GetInboundTunnels () const { return m_InboundTunnels; };
const decltype(m_TransitTunnels)& GetTransitTunnels () const { return m_TransitTunnels; };
const decltype(m_OutboundTunnels)& GetOutboundTunnels () const { return m_OutboundTunnels; }
const decltype(m_InboundTunnels)& GetInboundTunnels () const { return m_InboundTunnels; }
const decltype(m_TransitTunnels)& GetTransitTunnels () const { return m_TransitTunnels; }
size_t CountTransitTunnels() const;
size_t CountInboundTunnels() const;
size_t CountOutboundTunnels() const;
int GetQueueSize () { return m_Queue.GetSize (); };
int GetQueueSize () { return m_Queue.GetSize (); }
int GetTunnelCreationSuccessRate () const // in percents
{
int totalNum = m_NumSuccesiveTunnelCreations + m_NumFailedTunnelCreations;
return totalNum ? m_NumSuccesiveTunnelCreations*100/totalNum : 0;
}
};
};
extern Tunnels tunnels;
}
}
}
#endif