initial wack at single threaded sam

This commit is contained in:
Jeff Becker 2019-07-12 11:49:21 -04:00
parent a090114066
commit 0098f5b98c
No known key found for this signature in database
GPG key ID: F357B3B42F6F9B05
6 changed files with 74 additions and 34 deletions

View file

@ -13,10 +13,12 @@ namespace i2p
{ {
namespace client namespace client
{ {
LeaseSetDestination::LeaseSetDestination (bool isPublic, const std::map<std::string, std::string> * params): LeaseSetDestination::LeaseSetDestination (std::shared_ptr<boost::asio::io_service> service, bool isPublic, const std::map<std::string, std::string> * params):
m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), m_IsRunning (false), m_Thread (nullptr),
m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), m_Service(std::move(service)),
m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service), m_IsPublic (isPublic),
m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (*m_Service),
m_PublishVerificationTimer (*m_Service), m_PublishDelayTimer (*m_Service), m_CleanupTimer (*m_Service),
m_LeaseSetType (DEFAULT_LEASESET_TYPE) m_LeaseSetType (DEFAULT_LEASESET_TYPE)
{ {
int inLen = DEFAULT_INBOUND_TUNNEL_LENGTH; int inLen = DEFAULT_INBOUND_TUNNEL_LENGTH;
@ -124,7 +126,7 @@ namespace client
{ {
try try
{ {
m_Service.run (); m_Service->run ();
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
@ -168,7 +170,7 @@ namespace client
m_Pool->SetLocalDestination (nullptr); m_Pool->SetLocalDestination (nullptr);
i2p::tunnel::tunnels.StopTunnelPool (m_Pool); i2p::tunnel::tunnels.StopTunnelPool (m_Pool);
} }
m_Service.stop (); m_Service->stop ();
if (m_Thread) if (m_Thread)
{ {
m_Thread->join (); m_Thread->join ();
@ -302,7 +304,7 @@ namespace client
if (m_IsPublic) if (m_IsPublic)
{ {
auto s = shared_from_this (); auto s = shared_from_this ();
m_Service.post ([s](void) m_Service->post ([s](void)
{ {
s->m_PublishVerificationTimer.cancel (); s->m_PublishVerificationTimer.cancel ();
s->Publish (); s->Publish ();
@ -326,7 +328,7 @@ namespace client
memcpy (data.k, key, 32); memcpy (data.k, key, 32);
memcpy (data.t, tag, 32); memcpy (data.t, tag, 32);
auto s = shared_from_this (); auto s = shared_from_this ();
m_Service.post ([s,data](void) m_Service->post ([s,data](void)
{ {
s->AddSessionKey (data.k, data.t); s->AddSessionKey (data.k, data.t);
}); });
@ -335,12 +337,12 @@ namespace client
void LeaseSetDestination::ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg) void LeaseSetDestination::ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg)
{ {
m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg)); m_Service->post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg));
} }
void LeaseSetDestination::ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg) void LeaseSetDestination::ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg)
{ {
m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); m_Service->post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg));
} }
void LeaseSetDestination::HandleI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr<i2p::tunnel::InboundTunnel> from) void LeaseSetDestination::HandleI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr<i2p::tunnel::InboundTunnel> from)
@ -646,10 +648,10 @@ namespace client
if (!m_Pool || !IsReady ()) if (!m_Pool || !IsReady ())
{ {
if (requestComplete) if (requestComplete)
m_Service.post ([requestComplete](void){requestComplete (nullptr);}); m_Service->post ([requestComplete](void){requestComplete (nullptr);});
return false; return false;
} }
m_Service.post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), dest, requestComplete, nullptr)); m_Service->post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), dest, requestComplete, nullptr));
return true; return true;
} }
@ -658,7 +660,7 @@ namespace client
if (!dest || !m_Pool || !IsReady ()) if (!dest || !m_Pool || !IsReady ())
{ {
if (requestComplete) if (requestComplete)
m_Service.post ([requestComplete](void){requestComplete (nullptr);}); m_Service->post ([requestComplete](void){requestComplete (nullptr);});
return false; return false;
} }
auto storeHash = dest->GetStoreHash (); auto storeHash = dest->GetStoreHash ();
@ -666,17 +668,17 @@ namespace client
if (leaseSet) if (leaseSet)
{ {
if (requestComplete) if (requestComplete)
m_Service.post ([requestComplete, leaseSet](void){requestComplete (leaseSet);}); m_Service->post ([requestComplete, leaseSet](void){requestComplete (leaseSet);});
return true; return true;
} }
m_Service.post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), storeHash, requestComplete, dest)); m_Service->post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), storeHash, requestComplete, dest));
return true; return true;
} }
void LeaseSetDestination::CancelDestinationRequest (const i2p::data::IdentHash& dest, bool notify) void LeaseSetDestination::CancelDestinationRequest (const i2p::data::IdentHash& dest, bool notify)
{ {
auto s = shared_from_this (); auto s = shared_from_this ();
m_Service.post ([dest, notify, s](void) m_Service->post ([dest, notify, s](void)
{ {
auto it = s->m_LeaseSetRequests.find (dest); auto it = s->m_LeaseSetRequests.find (dest);
if (it != s->m_LeaseSetRequests.end ()) if (it != s->m_LeaseSetRequests.end ())
@ -700,7 +702,7 @@ namespace client
auto floodfill = i2p::data::netdb.GetClosestFloodfill (dest, excluded); auto floodfill = i2p::data::netdb.GetClosestFloodfill (dest, excluded);
if (floodfill) if (floodfill)
{ {
auto request = std::make_shared<LeaseSetRequest> (m_Service); auto request = std::make_shared<LeaseSetRequest> (GetService());
request->requestedBlindedKey = requestedBlindedKey; // for encrypted LeaseSet2 request->requestedBlindedKey = requestedBlindedKey; // for encrypted LeaseSet2
if (requestComplete) if (requestComplete)
request->requestComplete.push_back (requestComplete); request->requestComplete.push_back (requestComplete);
@ -843,8 +845,8 @@ namespace client
} }
} }
ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params): ClientDestination::ClientDestination (std::shared_ptr<boost::asio::io_context> ioc, const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params):
LeaseSetDestination (isPublic, params), m_Keys (keys), m_StreamingAckDelay (DEFAULT_INITIAL_ACK_DELAY), LeaseSetDestination (std::move(ioc), isPublic, params), m_Keys (keys), m_StreamingAckDelay (DEFAULT_INITIAL_ACK_DELAY),
m_DatagramDestination (nullptr), m_RefCounter (0), m_DatagramDestination (nullptr), m_RefCounter (0),
m_ReadyChecker(GetService()) m_ReadyChecker(GetService())
{ {

View file

@ -95,7 +95,9 @@ namespace client
public: public:
LeaseSetDestination (bool isPublic, const std::map<std::string, std::string> * params = nullptr); LeaseSetDestination (std::shared_ptr<boost::asio::io_service> service, bool isPublic, const std::map<std::string, std::string> * params = nullptr);
LeaseSetDestination (bool isPublic, const std::map<std::string, std::string> * params = nullptr) :
LeaseSetDestination(std::make_shared<boost::asio::io_service>(), isPublic, params) {};
~LeaseSetDestination (); ~LeaseSetDestination ();
const std::string& GetNickname () const { return m_Nickname; }; const std::string& GetNickname () const { return m_Nickname; };
@ -106,7 +108,7 @@ namespace client
virtual bool Reconfigure(std::map<std::string, std::string> i2cpOpts); virtual bool Reconfigure(std::map<std::string, std::string> i2cpOpts);
bool IsRunning () const { return m_IsRunning; }; bool IsRunning () const { return m_IsRunning; };
boost::asio::io_service& GetService () { return m_Service; }; boost::asio::io_service& GetService () { return *m_Service; };
std::shared_ptr<i2p::tunnel::TunnelPool> GetTunnelPool () { return m_Pool; }; std::shared_ptr<i2p::tunnel::TunnelPool> GetTunnelPool () { return m_Pool; };
bool IsReady () const { return m_LeaseSet && !m_LeaseSet->IsExpired () && m_Pool->GetOutboundTunnels ().size () > 0; }; bool IsReady () const { return m_LeaseSet && !m_LeaseSet->IsExpired () && m_Pool->GetOutboundTunnels ().size () > 0; };
std::shared_ptr<i2p::data::LeaseSet> FindLeaseSet (const i2p::data::IdentHash& ident); std::shared_ptr<i2p::data::LeaseSet> FindLeaseSet (const i2p::data::IdentHash& ident);
@ -159,7 +161,7 @@ namespace client
volatile bool m_IsRunning; volatile bool m_IsRunning;
std::thread * m_Thread; std::thread * m_Thread;
boost::asio::io_service m_Service; std::shared_ptr<boost::asio::io_service> m_Service;
mutable std::mutex m_RemoteLeaseSetsMutex; mutable std::mutex m_RemoteLeaseSetsMutex;
std::map<i2p::data::IdentHash, std::shared_ptr<i2p::data::LeaseSet> > m_RemoteLeaseSets; std::map<i2p::data::IdentHash, std::shared_ptr<i2p::data::LeaseSet> > m_RemoteLeaseSets;
std::map<i2p::data::IdentHash, std::shared_ptr<LeaseSetRequest> > m_LeaseSetRequests; std::map<i2p::data::IdentHash, std::shared_ptr<LeaseSetRequest> > m_LeaseSetRequests;
@ -196,8 +198,9 @@ namespace client
// if cancelled before ready, informs promise with nullptr // if cancelled before ready, informs promise with nullptr
void Ready(ReadyPromise & p); void Ready(ReadyPromise & p);
#endif #endif
ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params = nullptr) :
ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params = nullptr); ClientDestination(std::make_shared<boost::asio::io_service>(), keys, isPublic, params) {};
ClientDestination (std::shared_ptr<boost::asio::io_service> service, const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params = nullptr);
~ClientDestination (); ~ClientDestination ();
virtual bool Start (); virtual bool Start ();

View file

@ -312,6 +312,18 @@ namespace client
return localDestination; return localDestination;
} }
std::shared_ptr<ClientDestination> ClientContext::CreateNewSAMDestination (std::shared_ptr<boost::asio::io_context> ioctx, bool isPublic,
i2p::data::SigningKeyType sigType, i2p::data::CryptoKeyType cryptoType,
const std::map<std::string, std::string> * params)
{
i2p::data::PrivateKeys keys = i2p::data::PrivateKeys::CreateRandomKeys (sigType, cryptoType);
auto localDestination = std::make_shared<ClientDestination> (keys, isPublic, params);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
localDestination->Start ();
return localDestination;
}
std::shared_ptr<ClientDestination> ClientContext::CreateNewMatchedTunnelDestination(const i2p::data::PrivateKeys &keys, const std::string & name, const std::map<std::string, std::string> * params) std::shared_ptr<ClientDestination> ClientContext::CreateNewMatchedTunnelDestination(const i2p::data::PrivateKeys &keys, const std::string & name, const std::map<std::string, std::string> * params)
{ {
MatchedTunnelDestination * cl = new MatchedTunnelDestination(keys, name, params); MatchedTunnelDestination * cl = new MatchedTunnelDestination(keys, name, params);
@ -337,6 +349,23 @@ namespace client
} }
} }
std::shared_ptr<ClientDestination> ClientContext::CreateNewSAMDestination (std::shared_ptr<boost::asio::io_context> ioctx, const i2p::data::PrivateKeys& keys, bool isPublic,
const std::map<std::string, std::string> * params)
{
auto it = m_Destinations.find (keys.GetPublic ()->GetIdentHash ());
if (it != m_Destinations.end ())
{
LogPrint (eLogWarning, "Clients: Local destination ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " exists, removing it");
it->second->Stop();
m_Destinations.erase(it);
}
auto localDestination = std::make_shared<ClientDestination> (ioctx, keys, isPublic, params);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[keys.GetPublic ()->GetIdentHash ()] = localDestination;
localDestination->Start ();
return localDestination;
}
std::shared_ptr<ClientDestination> ClientContext::CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic, std::shared_ptr<ClientDestination> ClientContext::CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic,
const std::map<std::string, std::string> * params) const std::map<std::string, std::string> * params)
{ {

View file

@ -64,10 +64,15 @@ namespace client
void ReloadConfig (); void ReloadConfig ();
std::shared_ptr<ClientDestination> GetSharedLocalDestination () const { return m_SharedLocalDestination; }; std::shared_ptr<ClientDestination> GetSharedLocalDestination () const { return m_SharedLocalDestination; };
std::shared_ptr<ClientDestination> CreateNewLocalDestination (bool isPublic = false, // transient std::shared_ptr<ClientDestination> CreateNewLocalDestination (bool isPublic = false,
i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519,
i2p::data::CryptoKeyType cryptoType = i2p::data::CRYPTO_KEY_TYPE_ELGAMAL, const std::map<std::string, std::string> * params = nullptr);
std::shared_ptr<ClientDestination> CreateNewSAMDestination (std::shared_ptr<boost::asio::io_context> ioctx, bool isPublic = false,
i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519,
i2p::data::CryptoKeyType cryptoType = i2p::data::CRYPTO_KEY_TYPE_ELGAMAL, i2p::data::CryptoKeyType cryptoType = i2p::data::CRYPTO_KEY_TYPE_ELGAMAL,
const std::map<std::string, std::string> * params = nullptr); // used by SAM only const std::map<std::string, std::string> * params = nullptr);
std::shared_ptr<ClientDestination> CreateNewSAMDestination (std::shared_ptr<boost::asio::io_context> ioctx, const i2p::data::PrivateKeys& keys, bool isPublic = true,
const std::map<std::string, std::string> * params = nullptr);
std::shared_ptr<ClientDestination> CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true, std::shared_ptr<ClientDestination> CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true,
const std::map<std::string, std::string> * params = nullptr); const std::map<std::string, std::string> * params = nullptr);
std::shared_ptr<ClientDestination> CreateNewMatchedTunnelDestination(const i2p::data::PrivateKeys &keys, const std::string & name, const std::map<std::string, std::string> * params = nullptr); std::shared_ptr<ClientDestination> CreateNewMatchedTunnelDestination(const i2p::data::PrivateKeys &keys, const std::string & name, const std::map<std::string, std::string> * params = nullptr);

View file

@ -1001,8 +1001,9 @@ namespace client
SAMBridge::SAMBridge (const std::string& address, int port): SAMBridge::SAMBridge (const std::string& address, int port):
m_IsRunning (false), m_Thread (nullptr), m_IsRunning (false), m_Thread (nullptr),
m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(address), port)), m_Service(std::make_shared<boost::asio::io_service>()),
m_DatagramEndpoint (boost::asio::ip::address::from_string(address), port-1), m_DatagramSocket (m_Service, m_DatagramEndpoint), m_Acceptor (*m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(address), port)),
m_DatagramEndpoint (boost::asio::ip::address::from_string(address), port-1), m_DatagramSocket (*m_Service, m_DatagramEndpoint),
m_SignatureTypes m_SignatureTypes
{ {
{"DSA_SHA1", i2p::data::SIGNING_KEY_TYPE_DSA_SHA1}, {"DSA_SHA1", i2p::data::SIGNING_KEY_TYPE_DSA_SHA1},
@ -1046,7 +1047,7 @@ namespace client
for (auto& it: m_Sessions) for (auto& it: m_Sessions)
it.second->CloseStreams (); it.second->CloseStreams ();
m_Sessions.clear (); m_Sessions.clear ();
m_Service.stop (); m_Service->stop ();
if (m_Thread) if (m_Thread)
{ {
m_Thread->join (); m_Thread->join ();
@ -1061,7 +1062,7 @@ namespace client
{ {
try try
{ {
m_Service.run (); m_Service->run ();
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
@ -1116,7 +1117,7 @@ namespace client
{ {
i2p::data::PrivateKeys keys; i2p::data::PrivateKeys keys;
if (!keys.FromBase64 (destination)) return nullptr; if (!keys.FromBase64 (destination)) return nullptr;
localDestination = i2p::client::context.CreateNewLocalDestination (keys, true, params); localDestination = i2p::client::context.CreateNewSAMDestination (m_Service, keys, true, params);
} }
else // transient else // transient
{ {
@ -1144,7 +1145,7 @@ namespace client
} }
} }
} }
localDestination = i2p::client::context.CreateNewLocalDestination (true, signatureType, cryptoType, params); localDestination = i2p::client::context.CreateNewSAMDestination (m_Service, true, signatureType, cryptoType, params);
} }
if (localDestination) if (localDestination)
{ {

View file

@ -184,7 +184,7 @@ namespace client
void Start (); void Start ();
void Stop (); void Stop ();
boost::asio::io_service& GetService () { return m_Service; }; boost::asio::io_service& GetService () { return *m_Service; };
std::shared_ptr<SAMSession> CreateSession (const std::string& id, SAMSessionType type, const std::string& destination, // empty string means transient std::shared_ptr<SAMSession> CreateSession (const std::string& id, SAMSessionType type, const std::string& destination, // empty string means transient
const std::map<std::string, std::string> * params); const std::map<std::string, std::string> * params);
void CloseSession (const std::string& id); void CloseSession (const std::string& id);
@ -213,7 +213,7 @@ namespace client
bool m_IsRunning; bool m_IsRunning;
std::thread * m_Thread; std::thread * m_Thread;
boost::asio::io_service m_Service; std::shared_ptr<boost::asio::io_service> m_Service;
boost::asio::ip::tcp::acceptor m_Acceptor; boost::asio::ip::tcp::acceptor m_Acceptor;
boost::asio::ip::udp::endpoint m_DatagramEndpoint, m_SenderEndpoint; boost::asio::ip::udp::endpoint m_DatagramEndpoint, m_SenderEndpoint;
boost::asio::ip::udp::socket m_DatagramSocket; boost::asio::ip::udp::socket m_DatagramSocket;