From dfa1482ab246c6621a3a4cfdd8b5d32fd8ade9d0 Mon Sep 17 00:00:00 2001 From: brain5lug Date: Sun, 1 Oct 2017 00:46:49 +0300 Subject: [PATCH] atomics have been used for threads stop --- daemon/Daemon.h | 2 +- daemon/HTTPServer.cpp | 58 ++++++++++--------- daemon/HTTPServer.h | 3 +- daemon/I2PControl.cpp | 12 ++-- daemon/I2PControl.h | 3 +- daemon/UPnP.cpp | 21 ++++--- daemon/UPnP.h | 5 +- libi2pd/Log.cpp | 38 +++++++------ libi2pd/Log.h | 15 ++--- libi2pd/NetDb.cpp | 107 +++++++++++++++++++---------------- libi2pd/NetDb.hpp | 41 +++++++------- libi2pd/Tunnel.cpp | 24 +++++--- libi2pd/Tunnel.h | 51 +++++++++-------- libi2pd_client/WebSocks.cpp | 43 ++++++++------ libi2pd_client/Websocket.cpp | 48 +++++++++------- 15 files changed, 259 insertions(+), 212 deletions(-) diff --git a/daemon/Daemon.h b/daemon/Daemon.h index 48301e73..1495f0bf 100644 --- a/daemon/Daemon.h +++ b/daemon/Daemon.h @@ -15,7 +15,7 @@ namespace util virtual bool init(int argc, char* argv[]); virtual bool start(); virtual bool stop(); - virtual void run () {}; + virtual void run () {} bool isDaemon; bool running; diff --git a/daemon/HTTPServer.cpp b/daemon/HTTPServer.cpp index 99ed0441..b8eaf0d7 100644 --- a/daemon/HTTPServer.cpp +++ b/daemon/HTTPServer.cpp @@ -943,44 +943,50 @@ namespace http { void HTTPServer::Start () { - bool needAuth; i2p::config::GetOption("http.auth", needAuth); - std::string user; i2p::config::GetOption("http.user", user); - std::string pass; i2p::config::GetOption("http.pass", pass); - /* generate pass if needed */ - if (needAuth && pass == "") { - uint8_t random[16]; - char alnum[] = "0123456789" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz"; - pass.resize(sizeof(random)); - RAND_bytes(random, sizeof(random)); - for (size_t i = 0; i < sizeof(random); i++) { - pass[i] = alnum[random[i] % (sizeof(alnum) - 1)]; + if (!m_IsRunning.load()) + { + bool needAuth; i2p::config::GetOption("http.auth", needAuth); + std::string user; i2p::config::GetOption("http.user", user); + std::string pass; i2p::config::GetOption("http.pass", pass); + /* generate pass if needed */ + if (needAuth && pass == "") { + uint8_t random[16]; + char alnum[] = "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + pass.resize(sizeof(random)); + RAND_bytes(random, sizeof(random)); + for (size_t i = 0; i < sizeof(random); i++) { + pass[i] = alnum[random[i] % (sizeof(alnum) - 1)]; + } + i2p::config::SetOption("http.pass", pass); + LogPrint(eLogInfo, "HTTPServer: password set to ", pass); } - i2p::config::SetOption("http.pass", pass); - LogPrint(eLogInfo, "HTTPServer: password set to ", pass); + m_IsRunning.store(true); + m_Thread = std::unique_ptr(new std::thread (std::bind (&HTTPServer::Run, this))); + m_Acceptor.listen (); + Accept (); } - m_IsRunning = true; - m_Thread = std::unique_ptr(new std::thread (std::bind (&HTTPServer::Run, this))); - m_Acceptor.listen (); - Accept (); } void HTTPServer::Stop () { - m_IsRunning = false; - m_Acceptor.close(); - m_Service.stop (); - if (m_Thread) + if (m_IsRunning.load()) { - m_Thread->join (); - m_Thread = nullptr; + m_IsRunning.store(false); + m_Acceptor.close(); + m_Service.stop (); + if (m_Thread) + { + m_Thread->join (); + m_Thread = nullptr; + } } } void HTTPServer::Run () { - while (m_IsRunning) + while (m_IsRunning.load(std::memory_order_acquire)) { try { diff --git a/daemon/HTTPServer.h b/daemon/HTTPServer.h index ec718532..bb56bb78 100644 --- a/daemon/HTTPServer.h +++ b/daemon/HTTPServer.h @@ -8,6 +8,7 @@ #include #include #include +#include #include "HTTP.h" namespace i2p @@ -70,7 +71,7 @@ namespace http private: - bool m_IsRunning; + std::atomic_bool m_IsRunning; std::unique_ptr m_Thread; boost::asio::io_service m_Service; boost::asio::io_service::work m_Work; diff --git a/daemon/I2PControl.cpp b/daemon/I2PControl.cpp index 3d06b97a..68464703 100644 --- a/daemon/I2PControl.cpp +++ b/daemon/I2PControl.cpp @@ -101,19 +101,19 @@ namespace client void I2PControlService::Start () { - if (!m_IsRunning) + if (!m_IsRunning.load()) { Accept (); - m_IsRunning = true; + m_IsRunning.store(true); m_Thread = new std::thread (std::bind (&I2PControlService::Run, this)); } } void I2PControlService::Stop () { - if (m_IsRunning) + if (m_IsRunning.load()) { - m_IsRunning = false; + m_IsRunning.store(false); m_Acceptor.cancel (); m_Service.stop (); if (m_Thread) @@ -127,7 +127,7 @@ namespace client void I2PControlService::Run () { - while (m_IsRunning) + while (m_IsRunning.load(std::memory_order_acquire)) { try { m_Service.run (); @@ -160,7 +160,7 @@ namespace client void I2PControlService::Handshake (std::shared_ptr socket) { socket->async_handshake(boost::asio::ssl::stream_base::server, - std::bind( &I2PControlService::HandleHandshake, this, std::placeholders::_1, socket)); + std::bind( &I2PControlService::HandleHandshake, this, std::placeholders::_1, socket)); } void I2PControlService::HandleHandshake (const boost::system::error_code& ecode, std::shared_ptr socket) diff --git a/daemon/I2PControl.h b/daemon/I2PControl.h index 5d81c8f6..9cdefad7 100644 --- a/daemon/I2PControl.h +++ b/daemon/I2PControl.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -101,7 +102,7 @@ namespace client private: std::string m_Password; - bool m_IsRunning; + std::atomic_bool m_IsRunning; std::thread * m_Thread; boost::asio::io_service m_Service; diff --git a/daemon/UPnP.cpp b/daemon/UPnP.cpp index d1a190d6..568692eb 100644 --- a/daemon/UPnP.cpp +++ b/daemon/UPnP.cpp @@ -28,10 +28,10 @@ namespace transport void UPnP::Stop () { - if (m_IsRunning) + if (m_IsRunning.load()) { LogPrint(eLogInfo, "UPnP: stopping"); - m_IsRunning = false; + m_IsRunning.store(false); m_Timer.cancel (); m_Service.stop (); if (m_Thread) @@ -46,12 +46,15 @@ namespace transport void UPnP::Start() { - m_IsRunning = true; - LogPrint(eLogInfo, "UPnP: starting"); - m_Service.post (std::bind (&UPnP::Discover, this)); - std::unique_lock l(m_StartedMutex); - m_Thread.reset (new std::thread (std::bind (&UPnP::Run, this))); - m_Started.wait_for (l, std::chrono::seconds (5)); // 5 seconds maximum + if (!m_IsRunning.load()) + { + m_IsRunning.store(true); + LogPrint(eLogInfo, "UPnP: starting"); + m_Service.post (std::bind (&UPnP::Discover, this)); + std::unique_lock l(m_StartedMutex); + m_Thread.reset (new std::thread (std::bind (&UPnP::Run, this))); + m_Started.wait_for (l, std::chrono::seconds (5)); // 5 seconds maximum + } } UPnP::~UPnP () @@ -61,7 +64,7 @@ namespace transport void UPnP::Run () { - while (m_IsRunning) + while (m_IsRunning.load(std::memory_order_acquire)) { try { diff --git a/daemon/UPnP.h b/daemon/UPnP.h index 7d67fdbf..1dab9561 100644 --- a/daemon/UPnP.h +++ b/daemon/UPnP.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -43,8 +44,8 @@ namespace transport private: - bool m_IsRunning; - std::unique_ptr m_Thread; + std::atomic_bool m_IsRunning; + std::unique_ptr m_Thread; std::condition_variable m_Started; std::mutex m_StartedMutex; boost::asio::io_service m_Service; diff --git a/libi2pd/Log.cpp b/libi2pd/Log.cpp index 8ae417c2..a868515d 100644 --- a/libi2pd/Log.cpp +++ b/libi2pd/Log.cpp @@ -10,7 +10,6 @@ namespace i2p { namespace log { - static Log logger; /** * @brief Maps our loglevel to their symbolic name */ @@ -70,17 +69,21 @@ namespace log { void Log::Start () { - if (!m_IsRunning) - { - m_IsRunning = true; + // separate load and store for atomic is valid here + // because only one thread changes m_IsRunning + if (!m_IsRunning.load()) + { + m_IsRunning.store(true); m_Thread = new std::thread (std::bind (&Log::Run, this)); } } void Log::Stop () { - switch (m_Destination) + if (m_IsRunning.load()) { + switch (m_Destination) + { #ifndef _WIN32 case eLogSyslog : closelog(); @@ -88,20 +91,22 @@ namespace log { #endif case eLogFile: case eLogStream: - if (m_LogStream) m_LogStream->flush(); + if (m_LogStream) m_LogStream->flush(); break; default: /* do nothing */ break; + } + + m_IsRunning.store(false); + m_Queue.WakeUp (); + if (m_Thread) + { + m_Thread->join (); + delete m_Thread; + m_Thread = nullptr; + } } - m_IsRunning = false; - m_Queue.WakeUp (); - if (m_Thread) - { - m_Thread->join (); - delete m_Thread; - m_Thread = nullptr; - } } void Log::SetLogLevel (const std::string& level) { @@ -162,13 +167,13 @@ namespace log { void Log::Run () { Reopen (); - while (m_IsRunning) + while (m_IsRunning.load(std::memory_order_acquire)) { std::shared_ptr msg; while (msg = m_Queue.Get ()) Process (msg); if (m_LogStream) m_LogStream->flush(); - if (m_IsRunning) + if (m_IsRunning.load(std::memory_order_acquire)) m_Queue.Wait (); } } @@ -215,6 +220,7 @@ namespace log { } Log & Logger() { + static Log logger; return logger; } } // log diff --git a/libi2pd/Log.h b/libi2pd/Log.h index bec741a8..fc4e0deb 100644 --- a/libi2pd/Log.h +++ b/libi2pd/Log.h @@ -17,6 +17,7 @@ #include #include #include +#include #include "Queue.h" #ifndef _WIN32 @@ -59,7 +60,7 @@ namespace log { i2p::util::Queue > m_Queue; bool m_HasColors; std::string m_TimeFormat; - volatile bool m_IsRunning; + std::atomic_bool m_IsRunning; std::thread * m_Thread; private: @@ -84,8 +85,8 @@ namespace log { Log (); ~Log (); - LogType GetLogType () { return m_Destination; }; - LogLevel GetLogLevel () { return m_MinLevel; }; + LogType GetLogType () { return m_Destination; } + LogLevel GetLogLevel () { return m_MinLevel; } void Start (); void Stop (); @@ -112,7 +113,7 @@ namespace log { * @brief Sets format for timestamps in log * @param format String with timestamp format */ - void SetTimeFormat (std::string format) { m_TimeFormat = format; }; + void SetTimeFormat (std::string format) { m_TimeFormat = format; } #ifndef _WIN32 /** @@ -145,8 +146,8 @@ namespace log { std::string text; /**< message text as single string */ LogLevel level; /**< message level */ std::thread::id tid; /**< id of thread that generated message */ - - LogMsg (LogLevel lvl, std::time_t ts, const std::string & txt): timestamp(ts), text(txt), level(lvl) {}; + + LogMsg (LogLevel lvl, std::time_t ts, const std::string & txt): timestamp(ts), text(txt), level(lvl) {} }; Log & Logger(); @@ -184,7 +185,7 @@ void LogPrint (LogLevel level, TArgs&&... args) noexcept std::stringstream ss(""); LogPrint (ss, std::forward(args)...); - + auto msg = std::make_shared(level, std::time(nullptr), ss.str()); msg->tid = std::this_thread::get_id(); log.Append(msg); diff --git a/libi2pd/NetDb.cpp b/libi2pd/NetDb.cpp index 97ef0280..76fe8c0d 100644 --- a/libi2pd/NetDb.cpp +++ b/libi2pd/NetDb.cpp @@ -22,7 +22,7 @@ using namespace i2p::transport; namespace i2p { namespace data -{ +{ NetDb netdb; NetDb::NetDb (): m_IsRunning (false), m_Thread (nullptr), m_Reseeder (nullptr), m_Storage("netDb", "r", "routerInfo-", "dat"), m_FloodfillBootstrap(nullptr), m_HiddenMode(false) @@ -31,65 +31,70 @@ namespace data NetDb::~NetDb () { - Stop (); + Stop (); delete m_Reseeder; - } + } void NetDb::Start () { - m_Storage.SetPlace(i2p::fs::GetDataDir()); - m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64); - InitProfilesStorage (); - m_Families.LoadCertificates (); - Load (); + // separate load and store for atomic is valid here + // because only one thread changes m_IsRunning + if (!m_IsRunning.load()) + { + m_Storage.SetPlace(i2p::fs::GetDataDir()); + m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64); + InitProfilesStorage (); + m_Families.LoadCertificates (); + Load (); - uint16_t threshold; i2p::config::GetOption("reseed.threshold", threshold); - if (m_RouterInfos.size () < threshold) // reseed if # of router less than threshold - Reseed (); + uint16_t threshold; i2p::config::GetOption("reseed.threshold", threshold); + if (m_RouterInfos.size () < threshold) // reseed if # of router less than threshold + Reseed (); - m_IsRunning = true; - m_Thread = new std::thread (std::bind (&NetDb::Run, this)); + m_IsRunning.store(true); + m_Thread = new std::thread (std::bind (&NetDb::Run, this)); + } } void NetDb::Stop () { - if (m_IsRunning) - { + if (m_IsRunning.load()) + { for (auto& it: m_RouterInfos) it.second->SaveProfile (); DeleteObsoleteProfiles (); m_RouterInfos.clear (); m_Floodfills.clear (); if (m_Thread) - { - m_IsRunning = false; + { + m_IsRunning.store(false); m_Queue.WakeUp (); m_Thread->join (); delete m_Thread; - m_Thread = 0; + m_Thread = nullptr; } m_LeaseSets.clear(); m_Requests.Stop (); } - } - + } + void NetDb::Run () { - uint32_t lastSave = 0, lastPublish = 0, lastExploratory = 0, lastManageRequest = 0, lastDestinationCleanup = 0; - while (m_IsRunning) - { + uint64_t lastSave = 0, lastPublish = 0, lastExploratory = 0, lastManageRequest = 0, lastDestinationCleanup = 0; + while (m_IsRunning.load(std::memory_order_acquire)) + { try - { + { auto msg = m_Queue.GetNextWithTimeout (15000); // 15 sec if (msg) - { - int numMsgs = 0; + { + int numMsgs = 0; while (msg) { LogPrint(eLogDebug, "NetDb: got request with type ", (int) msg->GetTypeID ()); switch (msg->GetTypeID ()) { - case eI2NPDatabaseStore: + case eI2NPDatabaseStore: HandleDatabaseStoreMsg (msg); break; case eI2NPDatabaseSearchReply: @@ -97,7 +102,7 @@ namespace data break; case eI2NPDatabaseLookup: HandleDatabaseLookupMsg (msg); - break; + break; default: // WTF? LogPrint (eLogError, "NetDb: unexpected message type ", (int) msg->GetTypeID ()); //i2p::HandleI2NPMessage (msg); @@ -105,23 +110,25 @@ namespace data if (numMsgs > 100) break; msg = m_Queue.Get (); numMsgs++; - } - } - if (!m_IsRunning) break; + } + } + + if (!m_IsRunning.load(std::memory_order_acquire)) + break; uint64_t ts = i2p::util::GetSecondsSinceEpoch (); if (ts - lastManageRequest >= 15) // manage requests every 15 seconds { m_Requests.ManageRequests (); lastManageRequest = ts; - } + } if (ts - lastSave >= 60) // save routers, manage leasesets and validate subscriptions every minute { if (lastSave) { SaveUpdated (); ManageLeaseSets (); - } + } lastSave = ts; } if (ts - lastDestinationCleanup >= i2p::garlic::INCOMING_TAGS_EXPIRATION_TIMEOUT) @@ -129,23 +136,23 @@ namespace data i2p::context.CleanupDestination (); lastDestinationCleanup = ts; } - + if (ts - lastPublish >= NETDB_PUBLISH_INTERVAL && !m_HiddenMode) // publish { Publish (); lastPublish = ts; - } + } if (ts - lastExploratory >= 30) // exploratory every 30 seconds - { + { auto numRouters = m_RouterInfos.size (); if (numRouters == 0) { - throw std::runtime_error("No known routers, reseed seems to be totally failed"); + throw std::runtime_error("No known routers, reseed seems to be totally failed"); } else // we have peers now m_FloodfillBootstrap = nullptr; if (numRouters < 2500 || ts - lastExploratory >= 90) - { + { numRouters = 800/numRouters; if (numRouters < 1) numRouters = 1; if (numRouters > 9) numRouters = 9; @@ -153,15 +160,15 @@ namespace data if(!m_HiddenMode) Explore (numRouters); lastExploratory = ts; - } - } + } + } } catch (std::exception& ex) { LogPrint (eLogError, "NetDb: runtime exception: ", ex.what ()); - } - } - } + } + } + } bool NetDb::AddRouterInfo (const uint8_t * buf, int len) { @@ -171,11 +178,11 @@ namespace data return false; } - void NetDb::SetHidden(bool hide) { - // TODO: remove reachable addresses from router info - m_HiddenMode = hide; - } - + void NetDb::SetHidden(bool hide) { + // TODO: remove reachable addresses from router info + m_HiddenMode = hide; + } + bool NetDb::AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len) { bool updated = true; @@ -330,7 +337,7 @@ namespace data } } - m_Reseeder->Bootstrap (); + m_Reseeder->Bootstrap (); } void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills) @@ -392,7 +399,7 @@ namespace data void NetDb::VisitStoredRouterInfos(RouterInfoVisitor v) { m_Storage.Iterate([v] (const std::string & filename) { - auto ri = std::make_shared(filename); + auto ri = std::make_shared(filename); v(ri); }); } diff --git a/libi2pd/NetDb.hpp b/libi2pd/NetDb.hpp index a5cec84f..45137e47 100644 --- a/libi2pd/NetDb.hpp +++ b/libi2pd/NetDb.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include "Base.h" #include "Gzip.h" @@ -25,7 +26,7 @@ namespace i2p { namespace data -{ +{ const int NETDB_MIN_ROUTERS = 90; const int NETDB_FLOODFILL_EXPIRATION_TIMEOUT = 60*60; // 1 hour, in seconds const int NETDB_INTRODUCEE_EXPIRATION_TIMEOUT = 65*60; @@ -41,7 +42,7 @@ namespace data /** function for visiting a router info and determining if we want to use it */ typedef std::function)> RouterInfoFilter; - + class NetDb { public: @@ -51,7 +52,7 @@ namespace data void Start (); void Stop (); - + bool AddRouterInfo (const uint8_t * buf, int len); bool AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len); bool AddLeaseSet (const IdentHash& ident, const uint8_t * buf, int len, std::shared_ptr from); @@ -60,7 +61,7 @@ namespace data std::shared_ptr FindRouterProfile (const IdentHash& ident) const; void RequestDestination (const IdentHash& destination, RequestedDestination::RequestComplete requestComplete = nullptr); - void RequestDestinationFrom (const IdentHash& destination, const IdentHash & from, bool exploritory, RequestedDestination::RequestComplete requestComplete = nullptr); + void RequestDestinationFrom (const IdentHash& destination, const IdentHash & from, bool exploritory, RequestedDestination::RequestComplete requestComplete = nullptr); void HandleDatabaseStoreMsg (std::shared_ptr msg); void HandleDatabaseSearchReplyMsg (std::shared_ptr msg); @@ -75,21 +76,21 @@ namespace data std::vector GetClosestFloodfills (const IdentHash& destination, size_t num, std::set& excluded, bool closeThanUsOnly = false) const; std::shared_ptr GetClosestNonFloodfill (const IdentHash& destination, const std::set& excluded) const; - std::shared_ptr GetRandomRouterInFamily(const std::string & fam) const; + std::shared_ptr GetRandomRouterInFamily(const std::string & fam) const; void SetUnreachable (const IdentHash& ident, bool unreachable); void PostI2NPMsg (std::shared_ptr msg); - /** set hidden mode, aka don't publish our RI to netdb and don't explore */ - void SetHidden(bool hide); - + /** set hidden mode, aka don't publish our RI to netdb and don't explore */ + void SetHidden(bool hide); + void Reseed (); - Families& GetFamilies () { return m_Families; }; + Families& GetFamilies () { return m_Families; } // for web interface - int GetNumRouters () const { return m_RouterInfos.size (); }; - int GetNumFloodfills () const { return m_Floodfills.size (); }; - int GetNumLeaseSets () const { return m_LeaseSets.size (); }; + int GetNumRouters () const { return m_RouterInfos.size (); } + int GetNumFloodfills () const { return m_Floodfills.size (); } + int GetNumLeaseSets () const { return m_LeaseSets.size (); } /** visit all lease sets we currently store */ void VisitLeaseSets(LeaseSetVisitor v); @@ -100,7 +101,7 @@ namespace data /** visit N random router that match using filter, then visit them with a visitor, return number of RouterInfos that were visited */ size_t VisitRandomRouterInfos(RouterInfoFilter f, RouterInfoVisitor v, size_t n); - void ClearRouterInfos () { m_RouterInfos.clear (); }; + void ClearRouterInfos () { m_RouterInfos.clear (); } private: @@ -115,8 +116,8 @@ namespace data void ReseedFromFloodfill(const RouterInfo & ri, int numRouters=40, int numFloodfills=20); - template - std::shared_ptr GetRandomRouter (Filter filter) const; + template + std::shared_ptr GetRandomRouter (Filter filter) const; private: @@ -127,9 +128,9 @@ namespace data mutable std::mutex m_FloodfillsMutex; std::list > m_Floodfills; - bool m_IsRunning; + std::atomic_bool m_IsRunning; uint64_t m_LastLoad; - std::thread * m_Thread; + std::thread * m_Thread; i2p::util::Queue > m_Queue; // of I2NPDatabaseStoreMsg GzipInflator m_Inflator; @@ -142,10 +143,10 @@ namespace data /** router info we are bootstrapping from or nullptr if we are not currently doing that*/ std::shared_ptr m_FloodfillBootstrap; - - /** true if in hidden mode */ - bool m_HiddenMode; + + /** true if in hidden mode */ + bool m_HiddenMode; }; extern NetDb netdb; diff --git a/libi2pd/Tunnel.cpp b/libi2pd/Tunnel.cpp index 6ae1e119..70d6da2c 100644 --- a/libi2pd/Tunnel.cpp +++ b/libi2pd/Tunnel.cpp @@ -445,19 +445,25 @@ namespace tunnel void Tunnels::Start () { - m_IsRunning = true; - m_Thread = new std::thread (std::bind (&Tunnels::Run, this)); + if (!m_IsRunning.load()) + { + m_IsRunning.store(true); + m_Thread = new std::thread (std::bind (&Tunnels::Run, this)); + } } void Tunnels::Stop () { - m_IsRunning = false; - m_Queue.WakeUp (); - if (m_Thread) + if (m_IsRunning.load()) { - m_Thread->join (); - delete m_Thread; - m_Thread = 0; + m_IsRunning.store(false); + m_Queue.WakeUp (); + if (m_Thread) + { + m_Thread->join (); + delete m_Thread; + m_Thread = 0; + } } } @@ -466,7 +472,7 @@ namespace tunnel std::this_thread::sleep_for (std::chrono::seconds(1)); // wait for other parts are ready uint64_t lastTs = 0; - while (m_IsRunning) + while (m_IsRunning.load(std::memory_order_acquire)) { try { diff --git a/libi2pd/Tunnel.h b/libi2pd/Tunnel.h index 0c90c36c..fdf5af0e 100644 --- a/libi2pd/Tunnel.h +++ b/libi2pd/Tunnel.h @@ -10,6 +10,7 @@ #include #include #include +#include #include "Queue.h" #include "Crypto.h" #include "TunnelConfig.h" @@ -61,7 +62,7 @@ namespace tunnel #endif } - + const int TUNNEL_EXPIRATION_TIMEOUT = 660; // 11 minutes const int TUNNEL_EXPIRATION_THRESHOLD = 60; // 1 minute const int TUNNEL_RECREATION_THRESHOLD = 90; // 1.5 minutes @@ -99,20 +100,20 @@ namespace tunnel std::shared_ptr GetTunnelConfig () const { return m_Config; } std::vector > GetPeers () const; std::vector > GetInvertedPeers () const; - TunnelState GetState () const { return m_State; }; + TunnelState GetState () const { return m_State; } void SetState (TunnelState state); - bool IsEstablished () const { return m_State == eTunnelStateEstablished; }; - bool IsFailed () const { return m_State == eTunnelStateFailed; }; - bool IsRecreated () const { return m_IsRecreated; }; - void SetIsRecreated () { m_IsRecreated = true; }; + bool IsEstablished () const { return m_State == eTunnelStateEstablished; } + bool IsFailed () const { return m_State == eTunnelStateFailed; } + bool IsRecreated () const { return m_IsRecreated; } + void SetIsRecreated () { m_IsRecreated = true; } virtual bool IsInbound() const = 0; - std::shared_ptr GetTunnelPool () const { return m_Pool; }; - void SetTunnelPool (std::shared_ptr pool) { m_Pool = pool; }; + std::shared_ptr GetTunnelPool () const { return m_Pool; } + void SetTunnelPool (std::shared_ptr pool) { m_Pool = pool; } bool HandleTunnelBuildResponse (uint8_t * msg, size_t len); - virtual void Print (std::stringstream&) const {}; + virtual void Print (std::stringstream&) const {} // implements TunnelBase void SendTunnelDataMsg (std::shared_ptr msg); @@ -145,12 +146,12 @@ namespace tunnel public: OutboundTunnel (std::shared_ptr config): - Tunnel (config), m_Gateway (this), m_EndpointIdentHash (config->GetLastIdentHash ()) {}; + Tunnel (config), m_Gateway (this), m_EndpointIdentHash (config->GetLastIdentHash ()) {} void SendTunnelDataMsg (const uint8_t * gwHash, uint32_t gwTunnel, std::shared_ptr msg); virtual void SendTunnelDataMsg (const std::vector& msgs); // multiple messages - const i2p::data::IdentHash& GetEndpointIdentHash () const { return m_EndpointIdentHash; }; - virtual size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); }; + const i2p::data::IdentHash& GetEndpointIdentHash () const { return m_EndpointIdentHash; } + virtual size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); } void Print (std::stringstream& s) const; // implements TunnelBase @@ -169,14 +170,14 @@ namespace tunnel { public: - InboundTunnel (std::shared_ptr config): Tunnel (config), m_Endpoint (true) {}; + InboundTunnel (std::shared_ptr config): Tunnel (config), m_Endpoint (true) {} void HandleTunnelDataMsg (std::shared_ptr msg); - virtual size_t GetNumReceivedBytes () const { return m_Endpoint.GetNumReceivedBytes (); }; + virtual size_t GetNumReceivedBytes () const { return m_Endpoint.GetNumReceivedBytes (); } void Print (std::stringstream& s) const; bool IsInbound() const { return true; } // override TunnelBase - void Cleanup () { m_Endpoint.Cleanup (); }; + void Cleanup () { m_Endpoint.Cleanup (); } private: @@ -190,7 +191,7 @@ namespace tunnel ZeroHopsInboundTunnel (); void SendTunnelDataMsg (std::shared_ptr msg); void Print (std::stringstream& s) const; - size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; + size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; } private: @@ -204,7 +205,7 @@ namespace tunnel ZeroHopsOutboundTunnel (); void SendTunnelDataMsg (const std::vector& msgs); void Print (std::stringstream& s) const; - size_t GetNumSentBytes () const { return m_NumSentBytes; }; + size_t GetNumSentBytes () const { return m_NumSentBytes; } private: @@ -224,7 +225,7 @@ namespace tunnel std::shared_ptr GetPendingOutboundTunnel (uint32_t replyMsgID); std::shared_ptr GetNextInboundTunnel (); std::shared_ptr GetNextOutboundTunnel (); - std::shared_ptr GetExploratoryPool () const { return m_ExploratoryPool; }; + std::shared_ptr GetExploratoryPool () const { return m_ExploratoryPool; } std::shared_ptr GetTunnel (uint32_t tunnelID); int GetTransitTunnelsExpirationTimeout (); void AddTransitTunnel (std::shared_ptr tunnel); @@ -266,7 +267,7 @@ namespace tunnel private: - bool m_IsRunning; + std::atomic_bool m_IsRunning; std::thread * m_Thread; std::map > m_PendingInboundTunnels; // by replyMsgID std::map > m_PendingOutboundTunnels; // by replyMsgID @@ -285,24 +286,24 @@ namespace tunnel public: // for HTTP only - const decltype(m_OutboundTunnels)& GetOutboundTunnels () const { return m_OutboundTunnels; }; - const decltype(m_InboundTunnels)& GetInboundTunnels () const { return m_InboundTunnels; }; - const decltype(m_TransitTunnels)& GetTransitTunnels () const { return m_TransitTunnels; }; + const decltype(m_OutboundTunnels)& GetOutboundTunnels () const { return m_OutboundTunnels; } + const decltype(m_InboundTunnels)& GetInboundTunnels () const { return m_InboundTunnels; } + const decltype(m_TransitTunnels)& GetTransitTunnels () const { return m_TransitTunnels; } size_t CountTransitTunnels() const; size_t CountInboundTunnels() const; size_t CountOutboundTunnels() const; - int GetQueueSize () { return m_Queue.GetSize (); }; + int GetQueueSize () { return m_Queue.GetSize (); } int GetTunnelCreationSuccessRate () const // in percents { int totalNum = m_NumSuccesiveTunnelCreations + m_NumFailedTunnelCreations; return totalNum ? m_NumSuccesiveTunnelCreations*100/totalNum : 0; } - }; + }; extern Tunnels tunnels; -} +} } #endif diff --git a/libi2pd_client/WebSocks.cpp b/libi2pd_client/WebSocks.cpp index b6faa711..cb43a25b 100644 --- a/libi2pd_client/WebSocks.cpp +++ b/libi2pd_client/WebSocks.cpp @@ -8,6 +8,7 @@ #include "Destination.h" #include "Streaming.h" #include +#include #include #include @@ -100,35 +101,41 @@ namespace client void Start() { - if(m_Run) return; // already started - m_Server.listen(boost::asio::ip::address::from_string(m_Addr), m_Port); - m_Server.start_accept(); - m_Run = true; - m_Thread = new std::thread([&] (){ - while(m_Run) { + if (!m_Run.load()) + { + m_Run.store(true); + m_Server.listen(boost::asio::ip::address::from_string(m_Addr), m_Port); + m_Server.start_accept(); + m_Run = true; + m_Thread = new std::thread([&] (){ + while (m_Run.load(std::memory_order_acquire)) { try { m_Server.run(); } catch( std::exception & ex) { LogPrint(eLogError, "Websocks runtime exception: ", ex.what()); } } - }); - m_Dest->Start(); + }); + m_Dest->Start(); + } } void Stop() { - for(const auto & conn : m_Conns) - conn->Close(); + if (m_Run.load()) + { + for(const auto & conn : m_Conns) + conn->Close(); - m_Dest->Stop(); - m_Run = false; - m_Server.stop(); - if(m_Thread) { - m_Thread->join(); - delete m_Thread; + m_Dest->Stop(); + m_Run.store(false); + m_Server.stop(); + if(m_Thread) { + m_Thread->join(); + delete m_Thread; + } + m_Thread = nullptr; } - m_Thread = nullptr; } boost::asio::ip::tcp::endpoint GetLocalEndpoint() @@ -140,7 +147,7 @@ namespace client private: std::vector m_Conns; - bool m_Run; + std::atomic_bool m_Run; ServerImpl m_Server; std::string m_Addr; int m_Port; diff --git a/libi2pd_client/Websocket.cpp b/libi2pd_client/Websocket.cpp index 3d456655..d98f97c0 100644 --- a/libi2pd_client/Websocket.cpp +++ b/libi2pd_client/Websocket.cpp @@ -30,7 +30,7 @@ namespace i2p public: WebsocketServerImpl(const std::string & addr, int port) : - m_run(false), + m_IsRunning(false), m_ws_thread(nullptr), m_ev_thread(nullptr), m_WebsocketTicker(m_Service) @@ -48,10 +48,12 @@ namespace i2p } void Start() { - m_run = true; - m_server.start_accept(); - m_ws_thread = new std::thread([&] () { - while(m_run) { + if (!m_IsRunning.load()) + { + m_IsRunning.store(true); + m_server.start_accept(); + m_ws_thread = new std::thread([&] () { + while(m_IsRunning.load(std::memory_order_acquire)) { try { m_server.run(); } catch (std::exception & e ) { @@ -59,8 +61,8 @@ namespace i2p } } }); - m_ev_thread = new std::thread([&] () { - while(m_run) { + m_ev_thread = new std::thread([&] () { + while(m_IsRunning.load(std::memory_order_acquire)) { try { m_Service.run(); break; @@ -69,25 +71,29 @@ namespace i2p } } }); - ScheduleTick(); + ScheduleTick(); + } } void Stop() { - m_run = false; - m_Service.stop(); - m_server.stop(); + if (m_IsRunning.load()) + { + m_IsRunning.store(false); + m_Service.stop(); + m_server.stop(); - if(m_ev_thread) { - m_ev_thread->join(); - delete m_ev_thread; - } - m_ev_thread = nullptr; + if(m_ev_thread) { + m_ev_thread->join(); + delete m_ev_thread; + } + m_ev_thread = nullptr; - if(m_ws_thread) { - m_ws_thread->join(); - delete m_ws_thread; + if(m_ws_thread) { + m_ws_thread->join(); + delete m_ws_thread; + } + m_ws_thread = nullptr; } - m_ws_thread = nullptr; } void ConnOpened(ServerConn c) @@ -158,7 +164,7 @@ namespace i2p private: typedef std::set > ConnList; - bool m_run; + std::atomic_bool m_IsRunning; std::thread * m_ws_thread; std::thread * m_ev_thread; std::mutex m_connsMutex;