diff --git a/libi2pd_client/I2PTunnel.cpp b/libi2pd_client/I2PTunnel.cpp index d6436c78..ae9849db 100644 --- a/libi2pd_client/I2PTunnel.cpp +++ b/libi2pd_client/I2PTunnel.cpp @@ -32,7 +32,8 @@ namespace client I2PTunnelConnection::I2PTunnelConnection (I2PService * owner, std::shared_ptr socket, std::shared_ptr leaseSet, uint16_t port): - I2PServiceHandler(owner), m_Socket (socket), m_RemoteEndpoint (socket->remote_endpoint ()) + I2PServiceHandler(owner), m_Socket (socket), m_RemoteEndpoint (socket->remote_endpoint ()), + m_IsReceiving (false) { m_Stream = GetOwner()->GetLocalDestination ()->CreateStream (leaseSet, port); } @@ -40,13 +41,13 @@ namespace client I2PTunnelConnection::I2PTunnelConnection (I2PService * owner, std::shared_ptr socket, std::shared_ptr stream): I2PServiceHandler(owner), m_Socket (socket), m_Stream (stream), - m_RemoteEndpoint (socket->remote_endpoint ()) + m_RemoteEndpoint (socket->remote_endpoint ()), m_IsReceiving (false) { } I2PTunnelConnection::I2PTunnelConnection (I2PService * owner, std::shared_ptr stream, const boost::asio::ip::tcp::endpoint& target,std::shared_ptr sslCtx): - I2PServiceHandler(owner), m_Stream (stream), m_RemoteEndpoint (target) + I2PServiceHandler(owner), m_Stream (stream), m_RemoteEndpoint (target), m_IsReceiving (false) { m_Socket = std::make_shared (owner->GetService ()); if (sslCtx) @@ -149,18 +150,23 @@ namespace client void I2PTunnelConnection::Receive () { + if (m_IsReceiving) return; // already receiving + size_t unsentSize = m_Stream ? m_Stream->GetSendBufferSize () : 0; + if (unsentSize >= I2P_TUNNEL_CONNECTION_BUFFER_SIZE) return; // buffer is full + m_IsReceiving = true; if (m_SSL) - m_SSL->async_read_some (boost::asio::buffer(m_Buffer, I2P_TUNNEL_CONNECTION_BUFFER_SIZE), + m_SSL->async_read_some (boost::asio::buffer(m_Buffer, I2P_TUNNEL_CONNECTION_BUFFER_SIZE - unsentSize), std::bind(&I2PTunnelConnection::HandleReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); else - m_Socket->async_read_some (boost::asio::buffer(m_Buffer, I2P_TUNNEL_CONNECTION_BUFFER_SIZE), + m_Socket->async_read_some (boost::asio::buffer(m_Buffer, I2P_TUNNEL_CONNECTION_BUFFER_SIZE - unsentSize), std::bind(&I2PTunnelConnection::HandleReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } void I2PTunnelConnection::HandleReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) { + m_IsReceiving = false; if (ecode) { if (ecode != boost::asio::error::operation_aborted) @@ -170,16 +176,18 @@ namespace client } } else + { WriteToStream (m_Buffer, bytes_transferred); + Receive (); // try to receive more while being sent to stream + } } void I2PTunnelConnection::WriteToStream (const uint8_t * buf, size_t len) { if (m_Stream) { - auto s = shared_from_this (); m_Stream->AsyncSend (buf, len, - [s](const boost::system::error_code& ecode) + [s = shared_from_this ()](const boost::system::error_code& ecode) { if (!ecode) s->Receive (); diff --git a/libi2pd_client/I2PTunnel.h b/libi2pd_client/I2PTunnel.h index 7d4c3400..be35cfec 100644 --- a/libi2pd_client/I2PTunnel.h +++ b/libi2pd_client/I2PTunnel.h @@ -82,6 +82,7 @@ namespace client std::shared_ptr > m_SSL; std::shared_ptr m_Stream; boost::asio::ip::tcp::endpoint m_RemoteEndpoint; + bool m_IsReceiving; }; class I2PClientTunnelConnectionHTTP: public I2PTunnelConnection