From 3f409d0e285489e5b66caf176e038af0e7fe469b Mon Sep 17 00:00:00 2001
From: Jeff Becker <jeff@i2p.rocks>
Date: Thu, 31 Aug 2017 09:53:31 -0400
Subject: [PATCH] add deferred ready checking for destination

---
 libi2pd/Destination.cpp | 396 ++++++++++++++++++++++------------------
 libi2pd/Destination.h   |  14 +-
 2 files changed, 231 insertions(+), 179 deletions(-)

diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp
index d45fdf47..5f17856b 100644
--- a/libi2pd/Destination.cpp
+++ b/libi2pd/Destination.cpp
@@ -13,9 +13,10 @@ namespace i2p
 namespace client
 {
 	LeaseSetDestination::LeaseSetDestination (bool isPublic, const std::map<std::string, std::string> * params):
-		m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), 
-		m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), 
-		m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service)
+		m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic),
+		m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service),
+		m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service),
+		m_ReadyCheckTimer(m_Service)
 	{
 		int inLen   = DEFAULT_INBOUND_TUNNEL_LENGTH;
 		int inQty   = DEFAULT_INBOUND_TUNNELS_QUANTITY;
@@ -83,77 +84,80 @@ namespace client
 
 	LeaseSetDestination::~LeaseSetDestination ()
 	{
-		if (m_IsRunning)	
+		if (m_IsRunning)
 			Stop ();
 		if (m_Pool)
-			i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool);		
+			i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool);
 		for (auto& it: m_LeaseSetRequests)
 			it.second->Complete (nullptr);
-	}	
+	}
 
 	void LeaseSetDestination::Run ()
 	{
 		while (m_IsRunning)
 		{
 			try
-			{	
+			{
 				m_Service.run ();
 			}
 			catch (std::exception& ex)
 			{
 				LogPrint (eLogError, "Destination: runtime exception: ", ex.what ());
-			}	
-		}	
-	}	
+			}
+		}
+	}
 
 	bool LeaseSetDestination::Start ()
-	{	
+	{
 		if (!m_IsRunning)
-		{	
+		{
 			LoadTags ();
 			m_IsRunning = true;
 			m_Pool->SetLocalDestination (shared_from_this ());
-			m_Pool->SetActive (true);	
+			m_Pool->SetActive (true);
 			m_CleanupTimer.expires_from_now (boost::posix_time::minutes (DESTINATION_CLEANUP_TIMEOUT));
 			m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer,
-				shared_from_this (), std::placeholders::_1));		
+				shared_from_this (), std::placeholders::_1));
 			m_Thread = new std::thread (std::bind (&LeaseSetDestination::Run, shared_from_this ()));
-			
+			m_ReadyCheckTimer.expires_from_now(boost::posix_time::seconds (1));
+			m_ReadyCheckTimer.async_wait(std::bind(&LeaseSetDestination::HandleReadyCheckTimer,
+																						 shared_from_this (), std::placeholders::_1));
 			return true;
-		}	
+		}
 		else
 			return false;
 	}
-		
+
 	bool LeaseSetDestination::Stop ()
-	{	
+	{
 		if (m_IsRunning)
-		{	
+		{
 			m_CleanupTimer.cancel ();
 			m_PublishConfirmationTimer.cancel ();
-			m_PublishVerificationTimer.cancel ();		
+			m_PublishVerificationTimer.cancel ();
+			m_ReadyCheckTimer.cancel ();
 
 			m_IsRunning = false;
 			if (m_Pool)
-			{	
+			{
 				m_Pool->SetLocalDestination (nullptr);
 				i2p::tunnel::tunnels.StopTunnelPool (m_Pool);
-			}	
+			}
 			m_Service.stop ();
 			if (m_Thread)
-			{	
-				m_Thread->join (); 
+			{
+				m_Thread->join ();
 				delete m_Thread;
 				m_Thread = 0;
-			}	
+			}
 			SaveTags ();
 			CleanUp (); // GarlicDestination
 			return true;
-		}	
+		}
 		else
 			return false;
-	}	
-  
+	}
+
 	std::shared_ptr<const i2p::data::LeaseSet> LeaseSetDestination::FindLeaseSet (const i2p::data::IdentHash& ident)
 	{
 		std::shared_ptr<i2p::data::LeaseSet> remoteLS;
@@ -164,7 +168,7 @@ namespace client
 				remoteLS = it->second;
 		}
 
-		if (remoteLS)	
+		if (remoteLS)
 		{
 			if (!remoteLS->IsExpired ())
 			{
@@ -193,9 +197,9 @@ namespace client
 				m_RemoteLeaseSets.erase (ident);
 				return nullptr;
 			}
-		}	
+		}
 		else
-		{	
+		{
 			auto ls = i2p::data::netdb.FindLeaseSet (ident);
 			if (ls && !ls->IsExpired ())
 			{
@@ -203,7 +207,7 @@ namespace client
 				std::lock_guard<std::mutex> _lock(m_RemoteLeaseSetsMutex);
 				m_RemoteLeaseSets[ident] = ls;
 				return ls;
-			}	
+			}
 		}
 		return nullptr;
 	}
@@ -215,11 +219,11 @@ namespace client
 			UpdateLeaseSet ();
 		std::lock_guard<std::mutex> l(m_LeaseSetMutex);
 		return m_LeaseSet;
-	}	
+	}
 
 	void LeaseSetDestination::SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet)
 	{
-		{	
+		{
 			std::lock_guard<std::mutex> l(m_LeaseSetMutex);
 			m_LeaseSet.reset (newLeaseSet);
 		}
@@ -229,21 +233,26 @@ namespace client
 			m_PublishVerificationTimer.cancel ();
 			Publish ();
 		}
-	}	
-		
+	}
+
+	void LeaseSetDestination::AddReadyCallback(ReadyCallback cb)
+	{
+		m_ReadyCallbacks.push_back(cb);
+	}
+
 	void LeaseSetDestination::UpdateLeaseSet ()
 	{
-		int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels 
-		if (numTunnels > i2p::data::MAX_NUM_LEASES) numTunnels = i2p::data::MAX_NUM_LEASES; // 16 tunnels maximum 
+		int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels
+		if (numTunnels > i2p::data::MAX_NUM_LEASES) numTunnels = i2p::data::MAX_NUM_LEASES; // 16 tunnels maximum
 		CreateNewLeaseSet (m_Pool->GetInboundTunnels (numTunnels));
-	}	
+	}
 
 	bool LeaseSetDestination::SubmitSessionKey (const uint8_t * key, const uint8_t * tag)
 	{
 		struct
 		{
 			uint8_t k[32], t[32];
-		} data;	
+		} data;
 		memcpy (data.k, key, 32);
 		memcpy (data.t, tag, 32);
 		auto s = shared_from_this ();
@@ -256,42 +265,42 @@ namespace client
 
 	void LeaseSetDestination::ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg)
 	{
-		m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg)); 
+		m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg));
 	}
 
 	void LeaseSetDestination::ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg)
 	{
-		m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); 
+		m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg));
 	}
 
 	void LeaseSetDestination::HandleI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr<i2p::tunnel::InboundTunnel> from)
 	{
 		uint8_t typeID = buf[I2NP_HEADER_TYPEID_OFFSET];
 		switch (typeID)
-		{	
+		{
 			case eI2NPData:
 				HandleDataMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET));
 			break;
 			case eI2NPDeliveryStatus:
 				// we assume tunnel tests non-encrypted
 				HandleDeliveryStatusMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from));
-			break;	
+			break;
 			case eI2NPDatabaseStore:
 				HandleDatabaseStoreMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET));
 			break;
 			case eI2NPDatabaseSearchReply:
 				HandleDatabaseSearchReplyMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET));
-			break;	
+			break;
 			default:
 				i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from));
-		}		
-	}	
+		}
+	}
 
 	void LeaseSetDestination::HandleDatabaseStoreMessage (const uint8_t * buf, size_t len)
 	{
 		uint32_t replyToken = bufbe32toh (buf + DATABASE_STORE_REPLY_TOKEN_OFFSET);
 		size_t offset = DATABASE_STORE_HEADER_SIZE;
-		if (replyToken) 
+		if (replyToken)
 		{
 			LogPrint (eLogInfo, "Destination: Reply token is ignored for DatabaseStore");
 			offset += 36;
@@ -307,8 +316,8 @@ namespace client
 			{
 				leaseSet = it->second;
 				if (leaseSet->IsNewer (buf + offset, len - offset))
-				{	
-					leaseSet->Update (buf + offset, len - offset); 
+				{
+					leaseSet->Update (buf + offset, len - offset);
 					if (leaseSet->IsValid () && leaseSet->GetIdentHash () == key)
 						LogPrint (eLogDebug, "Destination: Remote LeaseSet updated");
 					else
@@ -322,7 +331,7 @@ namespace client
 					LogPrint (eLogDebug, "Destination: Remote LeaseSet is older. Not updated");
 			}
 			else
-			{	
+			{
 				leaseSet = std::make_shared<i2p::data::LeaseSet> (buf + offset, len - offset);
 				if (leaseSet->IsValid () && leaseSet->GetIdentHash () == key)
 				{
@@ -339,18 +348,18 @@ namespace client
 					LogPrint (eLogError, "Destination: New remote LeaseSet failed");
 					leaseSet = nullptr;
 				}
-			}	
-		}	
+			}
+		}
 		else
 			LogPrint (eLogError, "Destination: Unexpected client's DatabaseStore type ", buf[DATABASE_STORE_TYPE_OFFSET], ", dropped");
-		
+
 		auto it1 = m_LeaseSetRequests.find (key);
 		if (it1 != m_LeaseSetRequests.end ())
 		{
 			it1->second->requestTimeoutTimer.cancel ();
 			if (it1->second) it1->second->Complete (leaseSet);
 			m_LeaseSetRequests.erase (it1);
-		}	
+		}
 	}
 
 	void LeaseSetDestination::HandleDatabaseSearchReplyMessage (const uint8_t * buf, size_t len)
@@ -364,7 +373,7 @@ namespace client
 			auto request = it->second;
 			bool found = false;
 			if (request->excluded.size () < MAX_NUM_FLOODFILLS_PER_REQUEST)
-			{	
+			{
 				for (int i = 0; i < num; i++)
 			{
 				i2p::data::IdentHash peerHash (buf + 33 + i*32);
@@ -372,28 +381,28 @@ namespace client
 				{
 					LogPrint (eLogInfo, "Destination: Found new floodfill, request it"); // TODO: recheck this message
 					i2p::data::netdb.RequestDestination (peerHash);
-				}	
+				}
 			}
-				
+
 				auto floodfill = i2p::data::netdb.GetClosestFloodfill (key, request->excluded);
 				if (floodfill)
 				{
 					LogPrint (eLogInfo, "Destination: Requesting ", key.ToBase64 (), " at ", floodfill->GetIdentHash ().ToBase64 ());
 					if (SendLeaseSetRequest (key, floodfill, request))
 						found = true;
-				}	
-			}	
+				}
+			}
 			if (!found)
-			{	
+			{
 				LogPrint (eLogInfo, "Destination: ", key.ToBase64 (), " was not found on ", MAX_NUM_FLOODFILLS_PER_REQUEST, " floodfills");
 				request->Complete (nullptr);
 				m_LeaseSetRequests.erase (key);
-			}	
-		}	
-		else	
+			}
+		}
+		else
 			LogPrint (eLogWarning, "Destination: Request for ", key.ToBase64 (), " not found");
-	}	
-		
+	}
+
 	void LeaseSetDestination::HandleDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg)
 	{
 		uint32_t msgID = bufbe32toh (msg->GetPayload () + DELIVERY_STATUS_MSGID_OFFSET);
@@ -405,20 +414,20 @@ namespace client
 			// schedule verification
 			m_PublishVerificationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_VERIFICATION_TIMEOUT));
 			m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer,
-			shared_from_this (), std::placeholders::_1));	
+			shared_from_this (), std::placeholders::_1));
 		}
 		else
 			i2p::garlic::GarlicDestination::HandleDeliveryStatusMessage (msg);
-	}	
+	}
 
 	void LeaseSetDestination::SetLeaseSetUpdated ()
-	{	
+	{
 		UpdateLeaseSet ();
 	}
-		
+
 	void LeaseSetDestination::Publish ()
-	{	
-		if (!m_LeaseSet || !m_Pool) 
+	{
+		if (!m_LeaseSet || !m_Pool)
 		{
 			LogPrint (eLogError, "Destination: Can't publish non-existing LeaseSet");
 			return;
@@ -435,9 +444,9 @@ namespace client
 			m_PublishDelayTimer.cancel ();
 			m_PublishDelayTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_MIN_INTERVAL));
 			m_PublishDelayTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishDelayTimer,
-				shared_from_this (), std::placeholders::_1));	
+				shared_from_this (), std::placeholders::_1));
 			return;
-		}	
+		}
 		auto outbound = m_Pool->GetNextOutboundTunnel ();
 		if (!outbound)
 		{
@@ -450,28 +459,28 @@ namespace client
 			LogPrint (eLogError, "Destination: Can't publish LeaseSet. No inbound tunnels");
 			return;
 		}
-		auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), m_ExcludedFloodfills);	
+		auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), m_ExcludedFloodfills);
 		if (!floodfill)
 		{
 			LogPrint (eLogError, "Destination: Can't publish LeaseSet, no more floodfills found");
 			m_ExcludedFloodfills.clear ();
 			return;
-		}	
+		}
 		m_ExcludedFloodfills.insert (floodfill->GetIdentHash ());
 		LogPrint (eLogDebug, "Destination: Publish LeaseSet of ", GetIdentHash ().ToBase32 ());
 		RAND_bytes ((uint8_t *)&m_PublishReplyToken, 4);
-		auto msg = WrapMessage (floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken, inbound));			
+		auto msg = WrapMessage (floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken, inbound));
 		m_PublishConfirmationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_CONFIRMATION_TIMEOUT));
 		m_PublishConfirmationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishConfirmationTimer,
-			shared_from_this (), std::placeholders::_1));	
-		outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg);	
+			shared_from_this (), std::placeholders::_1));
+		outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg);
 		m_LastSubmissionTime = ts;
 	}
 
 	void LeaseSetDestination::HandlePublishConfirmationTimer (const boost::system::error_code& ecode)
 	{
 		if (ecode != boost::asio::error::operation_aborted)
-		{	
+		{
 			if (m_PublishReplyToken)
 			{
 				LogPrint (eLogWarning, "Destination: Publish confirmation was not received in ", PUBLISH_CONFIRMATION_TIMEOUT,  " seconds, will try again");
@@ -484,25 +493,25 @@ namespace client
 	void LeaseSetDestination::HandlePublishVerificationTimer (const boost::system::error_code& ecode)
 	{
 		if (ecode != boost::asio::error::operation_aborted)
-		{	
+		{
 			auto s = shared_from_this ();
-			RequestLeaseSet (GetIdentHash (), 
+			RequestLeaseSet (GetIdentHash (),
 				// "this" added due to bug in gcc 4.7-4.8
 				[s,this](std::shared_ptr<i2p::data::LeaseSet> leaseSet)
 				{
-					if (leaseSet)		
+					if (leaseSet)
 					{
 						if (s->m_LeaseSet && *s->m_LeaseSet == *leaseSet)
 						{
 							// we got latest LeasetSet
 							LogPrint (eLogDebug, "Destination: published LeaseSet verified for ", GetIdentHash().ToBase32());
 							s->m_PublishVerificationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_REGULAR_VERIFICATION_INTERNAL));
-							s->m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, s, std::placeholders::_1));	
+							s->m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, s, std::placeholders::_1));
 							return;
-						}	
+						}
 						else
 							LogPrint (eLogDebug, "Destination: LeaseSet is different than just published for ", GetIdentHash().ToBase32());
-					}	
+					}
 					else
 						LogPrint (eLogWarning, "Destination: couldn't find published LeaseSet for ", GetIdentHash().ToBase32());
 					// we have to publish again
@@ -515,16 +524,16 @@ namespace client
 	{
 		if (ecode != boost::asio::error::operation_aborted)
 			Publish ();
-	}	
-		
+	}
+
 	bool LeaseSetDestination::RequestDestination (const i2p::data::IdentHash& dest, RequestComplete requestComplete)
 	{
-		if (!m_Pool || !IsReady ()) 
-		{	
-			if (requestComplete) 
+		if (!m_Pool || !IsReady ())
+		{
+			if (requestComplete)
 				m_Service.post ([requestComplete](void){requestComplete (nullptr);});
 			return false;
-		}	
+		}
 		m_Service.post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), dest, requestComplete));
 		return true;
 	}
@@ -536,14 +545,14 @@ namespace client
 			{
 				auto it = s->m_LeaseSetRequests.find (dest);
 				if (it != s->m_LeaseSetRequests.end ())
-				{	
-					auto requestComplete = it->second; 
+				{
+					auto requestComplete = it->second;
 					s->m_LeaseSetRequests.erase (it);
 					if (notify && requestComplete) requestComplete->Complete (nullptr);
-				}	
-			});				
+				}
+			});
 	}
-		
+
 	void LeaseSetDestination::RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete)
 	{
 		std::set<i2p::data::IdentHash> excluded;
@@ -564,28 +573,28 @@ namespace client
 					m_LeaseSetRequests.erase (ret.first);
 					if (requestComplete) requestComplete (nullptr);
 				}
-			}	
+			}
 			else // duplicate
 			{
 				LogPrint (eLogInfo, "Destination: Request of LeaseSet ", dest.ToBase64 (), " is pending already");
 				if (ts > ret.first->second->requestTime + MAX_LEASESET_REQUEST_TIMEOUT)
-				{	
+				{
 					// something went wrong
 					m_LeaseSetRequests.erase (ret.first);
 					if (requestComplete) requestComplete (nullptr);
 				}
 				else if (requestComplete)
 					ret.first->second->requestComplete.push_back (requestComplete);
-			}	
-		}	
+			}
+		}
 		else
-		{	
+		{
 			LogPrint (eLogError, "Destination: Can't request LeaseSet, no floodfills found");
 			if (requestComplete) requestComplete (nullptr);
-		}	
-	}	
-		
-	bool LeaseSetDestination::SendLeaseSetRequest (const i2p::data::IdentHash& dest, 
+		}
+	}
+
+	bool LeaseSetDestination::SendLeaseSetRequest (const i2p::data::IdentHash& dest,
 		std::shared_ptr<const i2p::data::RouterInfo>  nextFloodfill, std::shared_ptr<LeaseSetRequest> request)
 	{
 		if (!request->replyTunnel || !request->replyTunnel->IsEstablished ())
@@ -594,36 +603,36 @@ namespace client
 		if (!request->outboundTunnel || !request->outboundTunnel->IsEstablished ())
 			request->outboundTunnel = m_Pool->GetNextOutboundTunnel ();
 		if (!request->outboundTunnel) LogPrint (eLogError, "Destination: Can't send LeaseSet request, no outbound tunnels found");
-			
+
 		if (request->replyTunnel && request->outboundTunnel)
-		{	
+		{
 			request->excluded.insert (nextFloodfill->GetIdentHash ());
 			request->requestTimeoutTimer.cancel ();
 
 			uint8_t replyKey[32], replyTag[32];
-			RAND_bytes (replyKey, 32); // random session key 
+			RAND_bytes (replyKey, 32); // random session key
 			RAND_bytes (replyTag, 32); // random session tag
 			AddSessionKey (replyKey, replyTag);
 
 			auto msg = WrapMessage (nextFloodfill,
-				CreateLeaseSetDatabaseLookupMsg (dest, request->excluded, 
+				CreateLeaseSetDatabaseLookupMsg (dest, request->excluded,
 					request->replyTunnel, replyKey, replyTag));
 			request->outboundTunnel->SendTunnelDataMsg (
 				{
-					i2p::tunnel::TunnelMessageBlock 
-					{ 
+					i2p::tunnel::TunnelMessageBlock
+					{
 						i2p::tunnel::eDeliveryTypeRouter,
 						nextFloodfill->GetIdentHash (), 0, msg
 					}
-				});	
+				});
 			request->requestTimeoutTimer.expires_from_now (boost::posix_time::seconds(LEASESET_REQUEST_TIMEOUT));
 			request->requestTimeoutTimer.async_wait (std::bind (&LeaseSetDestination::HandleRequestTimoutTimer,
 				shared_from_this (), std::placeholders::_1, dest));
-		}	
+		}
 		else
 			return false;
 		return true;
-	}	
+	}
 
 	void LeaseSetDestination::HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest)
 	{
@@ -641,26 +650,26 @@ namespace client
 					{
 						// reset tunnels, because one them might fail
 						it->second->outboundTunnel = nullptr;
-						it->second->replyTunnel = nullptr;		
+						it->second->replyTunnel = nullptr;
 						done = !SendLeaseSetRequest (dest, floodfill, it->second);
 					}
 					else
 						done = true;
 				}
 				else
-				{	
+				{
 					LogPrint (eLogWarning, "Destination: ", dest.ToBase64 (), " was not found within ",  MAX_LEASESET_REQUEST_TIMEOUT, " seconds");
 					done = true;
 				}
-				
+
 				if (done)
 				{
-					auto requestComplete = it->second; 
+					auto requestComplete = it->second;
 					m_LeaseSetRequests.erase (it);
 					if (requestComplete) requestComplete->Complete (nullptr);
-				}	
-			}	
-		}	
+				}
+			}
+		}
 	}
 
 	void LeaseSetDestination::HandleCleanupTimer (const boost::system::error_code& ecode)
@@ -674,7 +683,31 @@ namespace client
 			m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer,
 				shared_from_this (), std::placeholders::_1));
 		}
-	}	
+	}
+
+	void LeaseSetDestination::HandleReadyCheckTimer(const boost::system::error_code & ec)
+	{
+		if (ec != boost::asio::error::operation_aborted)
+		{
+			// TODO: locking ?
+			if(IsReady())
+			{
+				for (auto & itr : m_ReadyCallbacks)
+				{
+					itr(ec);
+				}
+				m_ReadyCallbacks.clear();
+			}
+		}
+		else
+		{
+			for (auto & itr : m_ReadyCallbacks)
+			{
+					itr(ec);
+			}
+			m_ReadyCallbacks.clear();
+		}
+	}
 
 	void LeaseSetDestination::CleanupRemoteLeaseSets ()
 	{
@@ -686,11 +719,11 @@ namespace client
 			{
 				LogPrint (eLogWarning, "Destination: Remote LeaseSet ", it->second->GetIdentHash ().ToBase64 (), " expired");
 				it = m_RemoteLeaseSets.erase (it);
-			}	
-			else 
+			}
+			else
 				++it;
 		}
-	}	
+	}
 
 	ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params):
 		LeaseSetDestination (isPublic, params),
@@ -703,26 +736,26 @@ namespace client
 			i2p::crypto::GenerateElGamalKeyPair(m_EncryptionPrivateKey, m_EncryptionPublicKey);
 		if (isPublic)
 			LogPrint (eLogInfo, "Destination: Local address ", GetIdentHash().ToBase32 (), " created");
-	}	
+	}
 
-	ClientDestination::~ClientDestination ()	
+	ClientDestination::~ClientDestination ()
 	{
-	}	
-		
+	}
+
 	bool ClientDestination::Start ()
 	{
 		if (LeaseSetDestination::Start ())
-		{	
+		{
 			m_StreamingDestination = std::make_shared<i2p::stream::StreamingDestination> (GetSharedFromThis ()); // TODO:
-			m_StreamingDestination->Start ();	
+			m_StreamingDestination->Start ();
 			for (auto& it: m_StreamingDestinationsByPorts)
 				it.second->Start ();
 			return true;
-		}	
+		}
 		else
 			return false;
-	}	
-		
+	}
+
 	bool ClientDestination::Stop ()
 	{
 		if (LeaseSetDestination::Stop ())
@@ -732,21 +765,21 @@ namespace client
 			//m_StreamingDestination->SetOwner (nullptr);
 			m_StreamingDestination = nullptr;
 			for (auto& it: m_StreamingDestinationsByPorts)
-			{	
+			{
 				it.second->Stop ();
 				//it.second->SetOwner (nullptr);
 			}
 			m_StreamingDestinationsByPorts.clear ();
 			if (m_DatagramDestination)
-			{	
+			{
 				delete m_DatagramDestination;
 				m_DatagramDestination = nullptr;
-			}	
+			}
 		  	return true;
 		}
 		else
 			return false;
-	}	
+	}
 
 #ifdef I2LUA
 	void ClientDestination::Ready(ReadyPromise & p)
@@ -773,14 +806,14 @@ namespace client
 			ScheduleCheckForReady(p);
 	}
 #endif
-	
+
 	void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len)
 	{
 		uint32_t length = bufbe32toh (buf);
 		buf += 4;
 		// we assume I2CP payload
 		uint16_t fromPort = bufbe16toh (buf + 4), // source
-			toPort = bufbe16toh (buf + 6); // destination 
+			toPort = bufbe16toh (buf + 6); // destination
 		switch (buf[9])
 		{
 			case PROTOCOL_TYPE_STREAMING:
@@ -804,28 +837,41 @@ namespace client
 				LogPrint (eLogError, "Destination: Data: unexpected protocol ", buf[9]);
 		}
 	}
-		
-	void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) 
+
+	void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port)
 	{
-		if (!streamRequestComplete) 
+		if (!streamRequestComplete)
 		{
 			LogPrint (eLogError, "Destination: request callback is not specified in CreateStream");
 			return;
-		}	
-		auto leaseSet = FindLeaseSet (dest);
-		if (leaseSet)
-			streamRequestComplete(CreateStream (leaseSet, port));
+		}
+		if(IsReady())
+		{
+			auto leaseSet = FindLeaseSet (dest);
+			if (leaseSet)
+				streamRequestComplete(CreateStream (leaseSet, port));
+			else
+			{
+				auto s = GetSharedFromThis ();
+				RequestDestination (dest,
+					[s, streamRequestComplete, port](std::shared_ptr<i2p::data::LeaseSet> ls)
+					{
+						if (ls)
+							streamRequestComplete(s->CreateStream (ls, port));
+						else
+							streamRequestComplete (nullptr);
+					});
+			}
+		}
 		else
 		{
-			auto s = GetSharedFromThis ();
-			RequestDestination (dest,
-				[s, streamRequestComplete, port](std::shared_ptr<i2p::data::LeaseSet> ls)
-				{
-					if (ls)
-						streamRequestComplete(s->CreateStream (ls, port));
+			// call if tunnel is not ready
+			AddReadyCallback([&](const boost::system::error_code & ec) {
+					if(ec)
+						streamRequestComplete(nullptr);
 					else
-						streamRequestComplete (nullptr);
-				});
+						CreateStream(streamRequestComplete, dest, port);
+			});
 		}
 	}
 
@@ -837,18 +883,18 @@ namespace client
 			return nullptr;
 	}
 
-	std::shared_ptr<i2p::stream::StreamingDestination> ClientDestination::GetStreamingDestination (int port) const 
-	{ 
-		if (port) 
+	std::shared_ptr<i2p::stream::StreamingDestination> ClientDestination::GetStreamingDestination (int port) const
+	{
+		if (port)
 		{
 			auto it = m_StreamingDestinationsByPorts.find (port);
 			if (it != m_StreamingDestinationsByPorts.end ())
 				return it->second;
-		}	
+		}
 		// if port is zero or not found, use default destination
-		return m_StreamingDestination; 
+		return m_StreamingDestination;
 	}
-		
+
 	void ClientDestination::AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor)
 	{
 		if (m_StreamingDestination)
@@ -860,35 +906,35 @@ namespace client
 		if (m_StreamingDestination)
 			m_StreamingDestination->ResetAcceptor ();
 	}
-		
+
 	bool ClientDestination::IsAcceptingStreams () const
 	{
 		if (m_StreamingDestination)
 			return m_StreamingDestination->IsAcceptorSet ();
 		return false;
-	}	
+	}
 
 	void ClientDestination::AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor)
 	{
 		if (m_StreamingDestination)
 			m_StreamingDestination->AcceptOnce (acceptor);
-	}	
-		
+	}
+
 	std::shared_ptr<i2p::stream::StreamingDestination> ClientDestination::CreateStreamingDestination (int port, bool gzip)
 	{
-		auto dest = std::make_shared<i2p::stream::StreamingDestination> (GetSharedFromThis (), port, gzip); 
+		auto dest = std::make_shared<i2p::stream::StreamingDestination> (GetSharedFromThis (), port, gzip);
 		if (port)
 			m_StreamingDestinationsByPorts[port] = dest;
-		else // update default 
+		else // update default
 			m_StreamingDestination = dest;
 		return dest;
-	}	
-		
+	}
+
   i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination ()
 	{
 		if (m_DatagramDestination == nullptr)
 			m_DatagramDestination = new i2p::datagram::DatagramDestination (GetSharedFromThis ());
-		return m_DatagramDestination;	
+		return m_DatagramDestination;
 	}
 
 	std::vector<std::shared_ptr<const i2p::stream::Stream> > ClientDestination::GetAllStreams () const
@@ -898,12 +944,12 @@ namespace client
 		{
 			for (auto& it: m_StreamingDestination->GetStreams ())
 				ret.push_back (it.second);
-		}	
+		}
 		for (auto& it: m_StreamingDestinationsByPorts)
 			for (auto& it1: it.second->GetStreams ())
 				ret.push_back (it1.second);
 		return ret;
-	}	
+	}
 
 	void ClientDestination::PersistTemporaryKeys ()
 	{
@@ -927,7 +973,7 @@ namespace client
 			return;
 		}
 		LogPrint(eLogError, "Destinations: Can't save keys to ", path);
-	}	
+	}
 
 	void ClientDestination::CreateNewLeaseSet (std::vector<std::shared_ptr<i2p::tunnel::InboundTunnel> > tunnels)
 	{
@@ -935,7 +981,7 @@ namespace client
 		// sign
 		Sign (leaseSet->GetBuffer (), leaseSet->GetBufferLen () - leaseSet->GetSignatureLen (), leaseSet->GetSignature ()); // TODO
 		SetLeaseSet (leaseSet);
-	}	
+	}
 
 	void ClientDestination::CleanupDestination ()
 	{
diff --git a/libi2pd/Destination.h b/libi2pd/Destination.h
index ef7437fb..4e299322 100644
--- a/libi2pd/Destination.h
+++ b/libi2pd/Destination.h
@@ -63,6 +63,7 @@ namespace client
 		public std::enable_shared_from_this<LeaseSetDestination>
 	{
 		typedef std::function<void (std::shared_ptr<i2p::data::LeaseSet> leaseSet)> RequestComplete;
+    typedef std::function<void (const boost::system::error_code &)> ReadyCallback;
 		// leaseSet = nullptr means not found
 		struct LeaseSetRequest
 		{
@@ -108,6 +109,8 @@ namespace client
 			void ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg);
 			void SetLeaseSetUpdated ();
 
+    void AddReadyCallback(ReadyCallback cb);
+
 		protected:
 
 			void SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet);
@@ -131,7 +134,8 @@ namespace client
 			void RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete);
 			bool SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr<const i2p::data::RouterInfo>  nextFloodfill, std::shared_ptr<LeaseSetRequest> request);
 			void HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest);
-			void HandleCleanupTimer (const boost::system::error_code& ecode);
+    void HandleCleanupTimer (const boost::system::error_code& ecode);
+    void HandleReadyCheckTimer (const boost::system::error_code& ecode);
 			void CleanupRemoteLeaseSets ();
 
 		private:
@@ -152,7 +156,9 @@ namespace client
 			std::set<i2p::data::IdentHash> m_ExcludedFloodfills; // for publishing
 
 			boost::asio::deadline_timer m_PublishConfirmationTimer, m_PublishVerificationTimer,
-				m_PublishDelayTimer, m_CleanupTimer;
+				m_PublishDelayTimer, m_CleanupTimer, m_ReadyCheckTimer;
+
+    std::vector<ReadyCallback> m_ReadyCallbacks;
 
 		public:
 
@@ -182,9 +188,9 @@ namespace client
 			void Sign (const uint8_t * buf, int len, uint8_t * signature) const { m_Keys.Sign (buf, len, signature); };
 
 			// ref counter
-			int Acquire () { return ++m_RefCounter; }; 
+			int Acquire () { return ++m_RefCounter; };
 			int Release () { return --m_RefCounter; };
-			int GetRefCounter () const { return m_RefCounter; }; 
+			int GetRefCounter () const { return m_RefCounter; };
 
 			// streaming
 			std::shared_ptr<i2p::stream::StreamingDestination> CreateStreamingDestination (int port, bool gzip = true); // additional