diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index 5b4468fd..dda74015 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -478,22 +478,39 @@ namespace stream { if (!len) return 0; size_t ret = 0; + volatile bool done = false; std::condition_variable newDataReceived; std::mutex newDataReceivedMutex; std::unique_lock l(newDataReceivedMutex); AsyncReceive (boost::asio::buffer (buf, len), - [&ret, &newDataReceived, &newDataReceivedMutex](const boost::system::error_code& ecode, std::size_t bytes_transferred) + [&ret, &done, &newDataReceived, &newDataReceivedMutex](const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode == boost::asio::error::timed_out) ret = 0; else ret = bytes_transferred; std::unique_lock l(newDataReceivedMutex); + done = true; newDataReceived.notify_all (); }, timeout); - if (newDataReceived.wait_for (l, std::chrono::seconds (timeout)) == std::cv_status::timeout) + if (newDataReceived.wait_for (l, std::chrono::seconds (timeout)) == std::cv_status::timeout) ret = 0; + if (!done) + { + // make sure that AsycReceive complete + auto s = shared_from_this(); + m_Service.post ([s]() + { + s->m_ReceiveTimer.cancel (); + }); + int i = 0; + while (!done && i < 100) // 1 sec + { + std::this_thread::sleep_for (std::chrono::milliseconds(10)); + i++; + } + } return ret; }