From 539cd5a656d937e3bab94744c74ed718f91af2ec Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 2 May 2025 13:36:59 -0400 Subject: [PATCH] max i2p stream buffer size --- libi2pd_client/SAM.cpp | 45 ++++++++++++++++++++++++------------------ libi2pd_client/SAM.h | 7 +++++-- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 2c0f8d92..7ec94dcb 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -27,7 +27,7 @@ namespace client m_Owner (owner), m_Socket(owner.GetService()), m_Timer (m_Owner.GetService ()), m_BufferOffset (0), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false), - m_IsAccepting (false), m_Stream (nullptr) + m_IsAccepting (false), m_IsReceiving (false) { } @@ -944,13 +944,23 @@ namespace client void SAMSocket::Receive () { - m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), - std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage, - shared_from_this (), std::placeholders::_1, std::placeholders::_2)); + if (m_SocketType == eSAMSocketTypeStream) + { + if (m_IsReceiving) return; + size_t unsentSize = m_Stream ? m_Stream->GetSendBufferSize () : 0; + if (unsentSize >= SAM_STREAM_MAX_SEND_BUFFER_SIZE) return; // buffer is full + m_IsReceiving = true; + m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), + std::bind(&SAMSocket::HandleReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); + } + else + m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), + std::bind(&SAMSocket::HandleMessage, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) { + m_IsReceiving = false; if (ecode) { LogPrint (eLogError, "SAM: Read error: ", ecode.message ()); @@ -960,16 +970,13 @@ namespace client else { if (m_Stream) - { - bytes_transferred += m_BufferOffset; - m_BufferOffset = 0; + { m_Stream->AsyncSend ((uint8_t *)m_Buffer, bytes_transferred, std::bind(&SAMSocket::HandleStreamSend, shared_from_this(), std::placeholders::_1)); - } + Receive (); + } else - { Terminate("No Stream Remaining"); - } } } @@ -980,16 +987,16 @@ namespace client if (m_Stream->GetStatus () == i2p::stream::eStreamStatusNew || m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular { - m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE), + m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_STREAM_BUFFER_SIZE), std::bind (&SAMSocket::HandleI2PReceive, shared_from_this(), std::placeholders::_1, std::placeholders::_2), SAM_SOCKET_CONNECTION_MAX_IDLE); } else // closed by peer { - uint8_t * buff = new uint8_t[SAM_SOCKET_BUFFER_SIZE]; + uint8_t * buff = new uint8_t[SAM_STREAM_BUFFER_SIZE]; // get remaining data - auto len = m_Stream->ReadSome (buff, SAM_SOCKET_BUFFER_SIZE); + auto len = m_Stream->ReadSome (buff, SAM_STREAM_BUFFER_SIZE); if (len > 0) // still some data { WriteI2PDataImmediate(buff, len); @@ -1186,11 +1193,11 @@ namespace client else { #ifdef _MSC_VER - size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); + size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_STREAM_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); #else - size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); + size_t l = snprintf ((char *)m_StreamBuffer, SAM_STREAM_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len); #endif - if (len < SAM_SOCKET_BUFFER_SIZE - l) + if (len < SAM_STREAM_BUFFER_SIZE - l) { memcpy (m_StreamBuffer + l, buf, len); WriteI2PData(len + l); @@ -1214,11 +1221,11 @@ namespace client else { #ifdef _MSC_VER - size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_RAW_RECEIVED, (long unsigned int)len); + size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_STREAM_BUFFER_SIZE, SAM_RAW_RECEIVED, (long unsigned int)len); #else - size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_RAW_RECEIVED, (long unsigned int)len); + size_t l = snprintf ((char *)m_StreamBuffer, SAM_STREAM_BUFFER_SIZE, SAM_RAW_RECEIVED, (long unsigned int)len); #endif - if (len < SAM_SOCKET_BUFFER_SIZE - l) + if (len < SAM_STREAM_BUFFER_SIZE - l) { memcpy (m_StreamBuffer + l, buf, len); WriteI2PData(len + l); diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index 1886324a..cd619678 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -29,6 +29,8 @@ namespace i2p namespace client { const size_t SAM_SOCKET_BUFFER_SIZE = 8192; + const size_t SAM_STREAM_BUFFER_SIZE = 16384; + const size_t SAM_STREAM_MAX_SEND_BUFFER_SIZE = 8*SAM_SOCKET_BUFFER_SIZE; const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds const int SAM_SESSION_READINESS_CHECK_INTERVAL = 3; // in seconds const size_t SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE = 50; @@ -170,12 +172,13 @@ namespace client Socket_t m_Socket; boost::asio::deadline_timer m_Timer; char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1]; - size_t m_BufferOffset; - uint8_t m_StreamBuffer[SAM_SOCKET_BUFFER_SIZE]; + size_t m_BufferOffset; // for session only + uint8_t m_StreamBuffer[SAM_STREAM_BUFFER_SIZE]; SAMSocketType m_SocketType; std::string m_ID; // nickname bool m_IsSilent; bool m_IsAccepting; // for eSAMSocketTypeAcceptor only + bool m_IsReceiving; // for eSAMSocketTypeStream only std::shared_ptr m_Stream; };