single thread for I2CP

This commit is contained in:
orignal 2020-10-02 13:13:27 -04:00
parent ee84291997
commit 3a2724ec58
4 changed files with 70 additions and 72 deletions

View file

@ -152,6 +152,7 @@ namespace config {
("i2cp.enabled", value<bool>()->default_value(false), "Enable or disable I2CP") ("i2cp.enabled", value<bool>()->default_value(false), "Enable or disable I2CP")
("i2cp.address", value<std::string>()->default_value("127.0.0.1"), "I2CP listen address") ("i2cp.address", value<std::string>()->default_value("127.0.0.1"), "I2CP listen address")
("i2cp.port", value<uint16_t>()->default_value(7654), "I2CP listen port") ("i2cp.port", value<uint16_t>()->default_value(7654), "I2CP listen port")
("i2cp.singlethread", value<bool>()->default_value(true), "Destinations run in the I2CP server's thread")
; ;
options_description i2pcontrol("I2PControl options"); options_description i2pcontrol("I2PControl options");

View file

@ -102,10 +102,11 @@ namespace client
{ {
std::string i2cpAddr; i2p::config::GetOption("i2cp.address", i2cpAddr); std::string i2cpAddr; i2p::config::GetOption("i2cp.address", i2cpAddr);
uint16_t i2cpPort; i2p::config::GetOption("i2cp.port", i2cpPort); uint16_t i2cpPort; i2p::config::GetOption("i2cp.port", i2cpPort);
bool singleThread; i2p::config::GetOption("i2cp.singlethread", singleThread);
LogPrint(eLogInfo, "Clients: starting I2CP at ", i2cpAddr, ":", i2cpPort); LogPrint(eLogInfo, "Clients: starting I2CP at ", i2cpAddr, ":", i2cpPort);
try try
{ {
m_I2CPServer = new I2CPServer (i2cpAddr, i2cpPort); m_I2CPServer = new I2CPServer (i2cpAddr, i2cpPort, singleThread);
m_I2CPServer->Start (); m_I2CPServer->Start ();
} }
catch (std::exception& e) catch (std::exception& e)

View file

@ -23,36 +23,13 @@ namespace i2p
namespace client namespace client
{ {
I2CPDestination::I2CPDestination (std::shared_ptr<I2CPSession> owner, std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params): I2CPDestination::I2CPDestination (boost::asio::io_service& service, std::shared_ptr<I2CPSession> owner,
RunnableService ("I2CP"), LeaseSetDestination (GetIOService (), isPublic, &params), std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params):
LeaseSetDestination (service, isPublic, &params),
m_Owner (owner), m_Identity (identity), m_EncryptionKeyType (m_Identity->GetCryptoKeyType ()) m_Owner (owner), m_Identity (identity), m_EncryptionKeyType (m_Identity->GetCryptoKeyType ())
{ {
} }
I2CPDestination::~I2CPDestination ()
{
if (IsRunning ())
Stop ();
}
void I2CPDestination::Start ()
{
if (!IsRunning ())
{
LeaseSetDestination::Start ();
StartIOService ();
}
}
void I2CPDestination::Stop ()
{
if (IsRunning ())
{
LeaseSetDestination::Stop ();
StopIOService ();
}
}
void I2CPDestination::SetEncryptionPrivateKey (const uint8_t * key) void I2CPDestination::SetEncryptionPrivateKey (const uint8_t * key)
{ {
m_Decryptor = i2p::data::PrivateKeys::CreateDecryptor (m_Identity->GetCryptoKeyType (), key); m_Decryptor = i2p::data::PrivateKeys::CreateDecryptor (m_Identity->GetCryptoKeyType (), key);
@ -217,6 +194,37 @@ namespace client
} }
} }
RunnableI2CPDestination::RunnableI2CPDestination (std::shared_ptr<I2CPSession> owner,
std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params):
RunnableService ("I2CP"),
I2CPDestination (GetIOService (), owner, identity, isPublic, params)
{
}
RunnableI2CPDestination::~RunnableI2CPDestination ()
{
if (IsRunning ())
Stop ();
}
void RunnableI2CPDestination::Start ()
{
if (!IsRunning ())
{
I2CPDestination::Start ();
StartIOService ();
}
}
void RunnableI2CPDestination::Stop ()
{
if (IsRunning ())
{
I2CPDestination::Stop ();
StopIOService ();
}
}
I2CPSession::I2CPSession (I2CPServer& owner, std::shared_ptr<proto::socket> socket): I2CPSession::I2CPSession (I2CPServer& owner, std::shared_ptr<proto::socket> socket):
m_Owner (owner), m_Socket (socket), m_Payload (nullptr), m_Owner (owner), m_Socket (socket), m_Payload (nullptr),
m_SessionID (0xFFFF), m_MessageID (0), m_IsSendAccepted (true) m_SessionID (0xFFFF), m_MessageID (0), m_IsSendAccepted (true)
@ -451,7 +459,9 @@ namespace client
if (params[I2CP_PARAM_DONT_PUBLISH_LEASESET] == "true") isPublic = false; if (params[I2CP_PARAM_DONT_PUBLISH_LEASESET] == "true") isPublic = false;
if (!m_Destination) if (!m_Destination)
{ {
m_Destination = std::make_shared<I2CPDestination>(shared_from_this (), identity, isPublic, params); m_Destination = m_Owner.IsSingleThread () ?
std::make_shared<I2CPDestination>(m_Owner.GetService (), shared_from_this (), identity, isPublic, params):
std::make_shared<RunnableI2CPDestination>(shared_from_this (), identity, isPublic, params);
SendSessionStatusMessage (1); // created SendSessionStatusMessage (1); // created
LogPrint (eLogDebug, "I2CP: session ", m_SessionID, " created"); LogPrint (eLogDebug, "I2CP: session ", m_SessionID, " created");
m_Destination->Start (); m_Destination->Start ();
@ -800,9 +810,9 @@ namespace client
std::placeholders::_1, std::placeholders::_2, buf)); std::placeholders::_1, std::placeholders::_2, buf));
} }
I2CPServer::I2CPServer (const std::string& interface, int port): I2CPServer::I2CPServer (const std::string& interface, int port, bool isSingleThread):
m_IsRunning (false), m_Thread (nullptr), RunnableService ("I2CP"), m_IsSingleThread (isSingleThread),
m_Acceptor (m_Service, m_Acceptor (GetIOService (),
#ifdef ANDROID #ifdef ANDROID
I2CPSession::proto::endpoint(std::string (1, '\0') + interface)) // leading 0 for abstract address I2CPSession::proto::endpoint(std::string (1, '\0') + interface)) // leading 0 for abstract address
#else #else
@ -825,20 +835,18 @@ namespace client
I2CPServer::~I2CPServer () I2CPServer::~I2CPServer ()
{ {
if (m_IsRunning) if (IsRunning ())
Stop (); Stop ();
} }
void I2CPServer::Start () void I2CPServer::Start ()
{ {
Accept (); Accept ();
m_IsRunning = true; StartIOService ();
m_Thread = new std::thread (std::bind (&I2CPServer::Run, this));
} }
void I2CPServer::Stop () void I2CPServer::Stop ()
{ {
m_IsRunning = false;
m_Acceptor.cancel (); m_Acceptor.cancel ();
{ {
auto sessions = m_Sessions; auto sessions = m_Sessions;
@ -846,33 +854,12 @@ namespace client
it.second->Stop (); it.second->Stop ();
} }
m_Sessions.clear (); m_Sessions.clear ();
m_Service.stop (); StopIOService ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = nullptr;
}
}
void I2CPServer::Run ()
{
while (m_IsRunning)
{
try
{
m_Service.run ();
}
catch (std::exception& ex)
{
LogPrint (eLogError, "I2CP: runtime exception: ", ex.what ());
}
}
} }
void I2CPServer::Accept () void I2CPServer::Accept ()
{ {
auto newSocket = std::make_shared<I2CPSession::proto::socket> (m_Service); auto newSocket = std::make_shared<I2CPSession::proto::socket> (GetIOService ());
m_Acceptor.async_accept (*newSocket, std::bind (&I2CPServer::HandleAccept, this, m_Acceptor.async_accept (*newSocket, std::bind (&I2CPServer::HandleAccept, this,
std::placeholders::_1, newSocket)); std::placeholders::_1, newSocket));
} }

View file

@ -63,16 +63,14 @@ namespace client
const char I2CP_PARAM_MESSAGE_RELIABILITY[] = "i2cp.messageReliability"; const char I2CP_PARAM_MESSAGE_RELIABILITY[] = "i2cp.messageReliability";
class I2CPSession; class I2CPSession;
class I2CPDestination: private i2p::util::RunnableService, public LeaseSetDestination class I2CPDestination: public LeaseSetDestination
{ {
public: public:
I2CPDestination (std::shared_ptr<I2CPSession> owner, std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params); I2CPDestination (boost::asio::io_service& service, std::shared_ptr<I2CPSession> owner,
~I2CPDestination (); std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params);
~I2CPDestination () {};
void Start ();
void Stop ();
void SetEncryptionPrivateKey (const uint8_t * key); void SetEncryptionPrivateKey (const uint8_t * key);
void SetEncryptionType (i2p::data::CryptoKeyType keyType) { m_EncryptionKeyType = keyType; }; void SetEncryptionType (i2p::data::CryptoKeyType keyType) { m_EncryptionKeyType = keyType; };
void SetECIESx25519EncryptionPrivateKey (const uint8_t * key); void SetECIESx25519EncryptionPrivateKey (const uint8_t * key);
@ -109,6 +107,18 @@ namespace client
uint64_t m_LeaseSetExpirationTime; uint64_t m_LeaseSetExpirationTime;
}; };
class RunnableI2CPDestination: private i2p::util::RunnableService, public I2CPDestination
{
public:
RunnableI2CPDestination (std::shared_ptr<I2CPSession> owner, std::shared_ptr<const i2p::data::IdentityEx> identity,
bool isPublic, const std::map<std::string, std::string>& params);
~RunnableI2CPDestination ();
void Start ();
void Stop ();
};
class I2CPServer; class I2CPServer;
class I2CPSession: public std::enable_shared_from_this<I2CPSession> class I2CPSession: public std::enable_shared_from_this<I2CPSession>
{ {
@ -179,17 +189,18 @@ namespace client
}; };
typedef void (I2CPSession::*I2CPMessageHandler)(const uint8_t * buf, size_t len); typedef void (I2CPSession::*I2CPMessageHandler)(const uint8_t * buf, size_t len);
class I2CPServer class I2CPServer: private i2p::util::RunnableService
{ {
public: public:
I2CPServer (const std::string& interface, int port); I2CPServer (const std::string& interface, int port, bool isSingleThread);
~I2CPServer (); ~I2CPServer ();
void Start (); void Start ();
void Stop (); void Stop ();
boost::asio::io_service& GetService () { return m_Service; }; boost::asio::io_service& GetService () { return GetIOService (); };
bool IsSingleThread () const { return m_IsSingleThread; };
bool InsertSession (std::shared_ptr<I2CPSession> session); bool InsertSession (std::shared_ptr<I2CPSession> session);
void RemoveSession (uint16_t sessionID); void RemoveSession (uint16_t sessionID);
@ -203,12 +214,10 @@ namespace client
private: private:
bool m_IsSingleThread;
I2CPMessageHandler m_MessagesHandlers[256]; I2CPMessageHandler m_MessagesHandlers[256];
std::map<uint16_t, std::shared_ptr<I2CPSession> > m_Sessions; std::map<uint16_t, std::shared_ptr<I2CPSession> > m_Sessions;
bool m_IsRunning;
std::thread * m_Thread;
boost::asio::io_service m_Service;
I2CPSession::proto::acceptor m_Acceptor; I2CPSession::proto::acceptor m_Acceptor;
public: public: