From a906d7f02fc7ec2cf210fbdeb744d88b1979c13a Mon Sep 17 00:00:00 2001 From: "Francisco Blas (klondike) Izquierdo Riera" Date: Fri, 2 Jan 2015 13:35:38 +0100 Subject: [PATCH] Allow for asynchronous creation of streams --- Destination.cpp | 51 +++++++++++++++++++++++++------------------------ Destination.h | 6 ++++-- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/Destination.cpp b/Destination.cpp index 148d9429..544d2b4c 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include #include "Log.h" @@ -384,47 +384,48 @@ namespace client } } - std::shared_ptr ClientDestination::CreateStream (const std::string& dest, int port) { + void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const std::string& dest, int port) { + assert(streamRequestComplete); i2p::data::IdentHash identHash; if (i2p::client::context.GetAddressBook ().GetIdentHash (dest, identHash)) - return CreateStream (identHash, port); + CreateStream (streamRequestComplete, identHash, port); else { LogPrint (eLogWarning, "Remote destination ", dest, " not found"); - return nullptr; + streamRequestComplete (nullptr); } } - std::shared_ptr ClientDestination::CreateStream (const i2p::data::IdentHash& dest, int port) { + void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) { + assert(streamRequestComplete); const i2p::data::LeaseSet * leaseSet = FindLeaseSet (dest); - if (!leaseSet) - { - bool found = false; - std::condition_variable newDataReceived; - std::mutex newDataReceivedMutex; - std::unique_lock l(newDataReceivedMutex); - RequestDestination (dest, - [&newDataReceived, &found](bool success) - { - found = success; - newDataReceived.notify_all (); - }); - if (newDataReceived.wait_for (l, std::chrono::seconds (STREAM_REQUEST_TIMEOUT)) == std::cv_status::timeout) - LogPrint (eLogError, "Subscription LeseseSet request timeout expired"); - if (found) - leaseSet = FindLeaseSet (dest); - } if (leaseSet) - return CreateStream (*leaseSet, port); + streamRequestComplete(CreateStream (*leaseSet, port)); else - return nullptr; + { + RequestDestination (dest, + [this, streamRequestComplete, dest, port](bool success) + { + if (!success) + streamRequestComplete (nullptr); + else + { + const i2p::data::LeaseSet * leaseSet = FindLeaseSet (dest); + if (leaseSet) + streamRequestComplete(CreateStream (*leaseSet, port)); + else + streamRequestComplete (nullptr); + } + }); + } } std::shared_ptr ClientDestination::CreateStream (const i2p::data::LeaseSet& remote, int port) { if (m_StreamingDestination) return m_StreamingDestination->CreateNewOutgoingStream (remote, port); - return nullptr; + else + return nullptr; } void ClientDestination::AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor) diff --git a/Destination.h b/Destination.h index 1f42dba3..e10cd41e 100644 --- a/Destination.h +++ b/Destination.h @@ -49,6 +49,8 @@ namespace client RequestComplete requestComplete; }; + typedef std::function stream)> StreamRequestComplete; + public: ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params = nullptr); @@ -65,8 +67,8 @@ namespace client // streaming i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; }; - std::shared_ptr CreateStream (const std::string& dest, int port = 0); - std::shared_ptr CreateStream (const i2p::data::IdentHash& dest, int port = 0); + void CreateStream (StreamRequestComplete streamRequestComplete, const std::string& dest, int port = 0); + void CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port = 0); std::shared_ptr CreateStream (const i2p::data::LeaseSet& remote, int port = 0); void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor); void StopAcceptingStreams ();