sync Receive from stream

This commit is contained in:
orignal 2022-11-08 18:34:59 -05:00
parent c88638afe4
commit c6a6a4e0e8
3 changed files with 38 additions and 19 deletions

View file

@ -474,6 +474,29 @@ namespace stream
Close (); // check is all outgoing messages have been sent and we can send close Close (); // check is all outgoing messages have been sent and we can send close
} }
size_t Stream::Receive (uint8_t * buf, size_t len, int timeout)
{
if (!len) return 0;
size_t ret = 0;
std::condition_variable newDataReceived;
std::mutex newDataReceivedMutex;
std::unique_lock<std::mutex> l(newDataReceivedMutex);
AsyncReceive (boost::asio::buffer (buf, len),
[&ret, &newDataReceived, &newDataReceivedMutex](const boost::system::error_code& ecode, std::size_t bytes_transferred)
{
if (ecode == boost::asio::error::timed_out)
ret = 0;
else
ret = bytes_transferred;
std::unique_lock<std::mutex> l(newDataReceivedMutex);
newDataReceived.notify_all ();
},
timeout);
if (newDataReceived.wait_for (l, std::chrono::seconds (timeout)) == std::cv_status::timeout)
ret = 0;
return ret;
}
size_t Stream::Send (const uint8_t * buf, size_t len) size_t Stream::Send (const uint8_t * buf, size_t len)
{ {
AsyncSend (buf, len, nullptr); AsyncSend (buf, len, nullptr);

View file

@ -185,7 +185,8 @@ namespace stream
template<typename Buffer, typename ReceiveHandler> template<typename Buffer, typename ReceiveHandler>
void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0);
size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); }; size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); };
size_t Receive (uint8_t * buf, size_t len, int timeout);
void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); }; void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); };
/** only call close from destination thread, use Stream::AsyncClose for other threads */ /** only call close from destination thread, use Stream::AsyncClose for other threads */
@ -336,11 +337,10 @@ namespace stream
int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout; int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout;
s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t)); s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t));
int left = timeout - t; int left = timeout - t;
auto self = s->shared_from_this(); s->m_ReceiveTimer.async_wait (
self->m_ReceiveTimer.async_wait ( [s, buffer, handler, left](const boost::system::error_code & ec)
[self, buffer, handler, left](const boost::system::error_code & ec)
{ {
self->HandleReceiveTimer(ec, buffer, handler, left); s->HandleReceiveTimer(ec, buffer, handler, left);
}); });
} }
}); });

View file

@ -892,24 +892,20 @@ namespace client
int numAttempts = 0; int numAttempts = 0;
while (!end) while (!end)
{ {
stream->AsyncReceive (boost::asio::buffer (recv_buf, 4096), size_t received = stream->Receive (recv_buf, 4096, SUBSCRIPTION_REQUEST_TIMEOUT);
[&](const boost::system::error_code& ecode, std::size_t bytes_transferred) if (received)
{ {
if (bytes_transferred) response.append ((char *)recv_buf, received);
response.append ((char *)recv_buf, bytes_transferred); if (!stream->IsOpen ()) end = true;
if (ecode == boost::asio::error::timed_out || !stream->IsOpen ()) }
end = true; else if (!stream->IsOpen ())
newDataReceived.notify_all (); end = true;
}, else
SUBSCRIPTION_REQUEST_TIMEOUT);
std::unique_lock<std::mutex> l(newDataReceivedMutex);
// wait 1 more second
if (newDataReceived.wait_for (l, std::chrono::seconds (SUBSCRIPTION_REQUEST_TIMEOUT + 1)) == std::cv_status::timeout)
{ {
LogPrint (eLogError, "Addressbook: Subscriptions request timeout expired"); LogPrint (eLogError, "Addressbook: Subscriptions request timeout expired");
numAttempts++; numAttempts++;
if (numAttempts > 5) end = true; if (numAttempts > 5) end = true;
} }
} }
// process remaining buffer // process remaining buffer
while (size_t len = stream->ReadSome (recv_buf, sizeof(recv_buf))) while (size_t len = stream->ReadSome (recv_buf, sizeof(recv_buf)))