separate thread per local destination

This commit is contained in:
orignal 2014-10-09 10:05:28 -04:00
parent 82814dcc26
commit c5c930bb72
2 changed files with 60 additions and 42 deletions

View file

@ -10,8 +10,9 @@ namespace i2p
{
namespace stream
{
StreamingDestination::StreamingDestination (boost::asio::io_service& service, bool isPublic):
m_Service (service), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
StreamingDestination::StreamingDestination (bool isPublic):
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service),
m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{
m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
@ -21,8 +22,9 @@ namespace stream
LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created");
}
StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic):
m_Service (service), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
StreamingDestination::StreamingDestination (const std::string& fullPath, bool isPublic):
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service),
m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{
std::ifstream s(fullPath.c_str (), std::ifstream::binary);
if (s.is_open ())
@ -55,8 +57,9 @@ namespace stream
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
}
StreamingDestination::StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic):
m_Service (service), m_Keys (keys), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
StreamingDestination::StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic):
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service),
m_Keys (keys), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
@ -73,11 +76,35 @@ namespace stream
delete it.second;
m_Streams.clear ();
}
Stop ();
if (m_Pool)
i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool);
delete m_LeaseSet;
}
void StreamingDestination::Run ()
{
m_Service.run ();
}
void StreamingDestination::Start ()
{
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&StreamingDestination::Run, this));
}
void StreamingDestination::Stop ()
{
m_IsRunning = false;
m_Service.stop ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = 0;
}
}
void StreamingDestination::SendTunnelDataMsgs (const std::vector<i2p::tunnel::TunnelMessageBlock>& msgs)
{
m_CurrentOutboundTunnel = m_Pool->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
@ -246,35 +273,21 @@ namespace stream
{
if (!m_SharedLocalDestination)
{
m_SharedLocalDestination = new StreamingDestination (m_Service, false); // non-public
m_SharedLocalDestination = new StreamingDestination (false); // non-public
m_Destinations[m_SharedLocalDestination->GetIdentity ().GetIdentHash ()] = m_SharedLocalDestination;
m_SharedLocalDestination->Start ();
}
// LoadLocalDestinations ();
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&StreamingDestinations::Run, this));
}
void StreamingDestinations::Stop ()
{
for (auto it: m_Destinations)
{
it.second->Stop ();
delete it.second;
}
m_Destinations.clear ();
m_SharedLocalDestination = 0; // deleted through m_Destination
m_IsRunning = false;
m_Service.stop ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = 0;
}
}
void StreamingDestinations::Run ()
{
m_Service.run ();
}
void StreamingDestinations::LoadLocalDestinations ()
@ -292,7 +305,7 @@ namespace stream
#else
it->path();
#endif
auto localDestination = new StreamingDestination (m_Service, fullPath, true);
auto localDestination = new StreamingDestination (fullPath, true);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
numDestinations++;
}
@ -303,17 +316,19 @@ namespace stream
StreamingDestination * StreamingDestinations::LoadLocalDestination (const std::string& filename, bool isPublic)
{
auto localDestination = new StreamingDestination (m_Service, i2p::util::filesystem::GetFullPath (filename), isPublic);
auto localDestination = new StreamingDestination (i2p::util::filesystem::GetFullPath (filename), isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
localDestination->Start ();
return localDestination;
}
StreamingDestination * StreamingDestinations::CreateNewLocalDestination (bool isPublic)
{
auto localDestination = new StreamingDestination (m_Service, isPublic);
auto localDestination = new StreamingDestination (isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
localDestination->Start ();
return localDestination;
}
@ -328,6 +343,7 @@ namespace stream
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations.erase (it);
}
d->Stop ();
delete d;
}
}
@ -340,9 +356,10 @@ namespace stream
LogPrint ("Local destination ", keys.GetPublic ().GetIdentHash ().ToBase32 (), ".b32.i2p exists");
return nullptr;
}
auto localDestination = new StreamingDestination (m_Service, keys, isPublic);
auto localDestination = new StreamingDestination (keys, isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination;
localDestination->Start ();
return localDestination;
}

View file

@ -18,11 +18,14 @@ namespace stream
{
public:
StreamingDestination (boost::asio::io_service& service, bool isPublic);
StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic);
StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic);
StreamingDestination (bool isPublic);
StreamingDestination (const std::string& fullPath, bool isPublic);
StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic);
~StreamingDestination ();
void Start ();
void Stop ();
i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; };
Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote);
@ -52,12 +55,17 @@ namespace stream
private:
void Run ();
Stream * CreateNewIncomingStream ();
void UpdateLeaseSet ();
private:
boost::asio::io_service& m_Service;
bool m_IsRunning;
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work;
std::mutex m_StreamsMutex;
std::map<uint32_t, Stream *> m_Streams;
i2p::data::PrivateKeys m_Keys;
@ -75,8 +83,7 @@ namespace stream
{
public:
StreamingDestinations (): m_IsRunning (false), m_Thread (nullptr),
m_Work (m_Service), m_SharedLocalDestination (nullptr) {};
StreamingDestinations (): m_SharedLocalDestination (nullptr) {};
~StreamingDestinations () {};
void Start ();
@ -93,16 +100,10 @@ namespace stream
private:
void Run ();
void LoadLocalDestinations ();
private:
bool m_IsRunning;
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work;
std::mutex m_DestinationsMutex;
std::map<i2p::data::IdentHash, StreamingDestination *> m_Destinations;
StreamingDestination * m_SharedLocalDestination;