move buffer when insert to buffer queue. clean entire queue in one call
Some checks are pending
Build Debian packages / bookworm (push) Waiting to run
Build Debian packages / bullseye (push) Waiting to run
Build Debian packages / buster (push) Waiting to run
Build on FreeBSD / with UPnP (push) Waiting to run
Build on OSX / With USE_UPNP=no (push) Waiting to run
Build on OSX / With USE_UPNP=yes (push) Waiting to run
Build on Windows / clang-x86_64 (push) Waiting to run
Build on Windows / i686 (push) Waiting to run
Build on Windows / ucrt-x86_64 (push) Waiting to run
Build on Windows / x86_64 (push) Waiting to run
Build on Windows / CMake clang-x86_64 (push) Waiting to run
Build on Windows / CMake i686 (push) Waiting to run
Build on Windows / CMake ucrt-x86_64 (push) Waiting to run
Build on Windows / CMake x86_64 (push) Waiting to run
Build on Windows / XP (push) Waiting to run
Build on Ubuntu / Make with USE_UPNP=no (push) Waiting to run
Build on Ubuntu / Make with USE_UPNP=yes (push) Waiting to run
Build on Ubuntu / CMake with -DWITH_UPNP=OFF (push) Waiting to run
Build on Ubuntu / CMake with -DWITH_UPNP=ON (push) Waiting to run
Build containers / Building container for linux/amd64 (push) Waiting to run
Build containers / Building container for linux/arm64 (push) Waiting to run
Build containers / Building container for linux/arm/v7 (push) Waiting to run
Build containers / Building container for linux/386 (push) Waiting to run
Build containers / Pushing merged manifest (push) Blocked by required conditions

This commit is contained in:
orignal 2025-04-03 18:42:34 -04:00
parent 78d97179b8
commit 3be4c7217f
3 changed files with 41 additions and 25 deletions

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2024, The PurpleI2P Project * Copyright (c) 2013-2025, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -19,39 +19,55 @@ namespace i2p
{ {
namespace stream namespace stream
{ {
void SendBufferQueue::Add (std::shared_ptr<SendBuffer> buf) void SendBufferQueue::Add (std::shared_ptr<SendBuffer>&& buf)
{ {
if (buf) if (buf)
{ {
m_Buffers.push_back (buf);
m_Size += buf->len; m_Size += buf->len;
m_Buffers.push_back (std::move (buf));
} }
} }
size_t SendBufferQueue::Get (uint8_t * buf, size_t len) size_t SendBufferQueue::Get (uint8_t * buf, size_t len)
{ {
if (!m_Size) return 0;
size_t offset = 0; size_t offset = 0;
while (!m_Buffers.empty () && offset < len) if (len >= m_Size)
{ {
auto nextBuffer = m_Buffers.front (); for (auto& it: m_Buffers)
auto rem = nextBuffer->GetRemainingSize ();
if (offset + rem <= len)
{ {
// whole buffer auto rem = it->GetRemainingSize ();
memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem); memcpy (buf + offset, it->GetRemaningBuffer (), rem);
offset += rem; offset += rem;
m_Buffers.pop_front (); // delete it
} }
else m_Buffers.clear ();
m_Size = 0;
return offset;
}
else
{
while (!m_Buffers.empty () && offset < len)
{ {
// partially auto nextBuffer = m_Buffers.front ();
rem = len - offset; auto rem = nextBuffer->GetRemainingSize ();
memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem); if (offset + rem <= len)
nextBuffer->offset += rem; {
offset = len; // break // whole buffer
memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem);
offset += rem;
m_Buffers.pop_front (); // delete it
}
else
{
// partially
rem = len - offset;
memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem);
nextBuffer->offset += rem;
offset = len; // break
}
} }
} m_Size -= offset;
m_Size -= offset; }
return offset; return offset;
} }
@ -59,7 +75,7 @@ namespace stream
{ {
if (!m_Buffers.empty ()) if (!m_Buffers.empty ())
{ {
for (auto it: m_Buffers) for (auto& it: m_Buffers)
it->Cancel (); it->Cancel ();
m_Buffers.clear (); m_Buffers.clear ();
m_Size = 0; m_Size = 0;
@ -717,10 +733,10 @@ namespace stream
else if (handler) else if (handler)
handler(boost::system::error_code ()); handler(boost::system::error_code ());
auto s = shared_from_this (); auto s = shared_from_this ();
boost::asio::post (m_Service, [s, buffer]() boost::asio::post (m_Service, [s, buffer = std::move(buffer)]() mutable
{ {
if (buffer) if (buffer)
s->m_SendBuffer.Add (buffer); s->m_SendBuffer.Add (std::move(buffer));
s->SendBuffer (); s->SendBuffer ();
}); });
} }

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2024, The PurpleI2P Project * Copyright (c) 2013-2025, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -150,7 +150,7 @@ namespace stream
SendBufferQueue (): m_Size (0) {}; SendBufferQueue (): m_Size (0) {};
~SendBufferQueue () { CleanUp (); }; ~SendBufferQueue () { CleanUp (); };
void Add (std::shared_ptr<SendBuffer> buf); void Add (std::shared_ptr<SendBuffer>&& buf);
size_t Get (uint8_t * buf, size_t len); size_t Get (uint8_t * buf, size_t len);
size_t GetSize () const { return m_Size; }; size_t GetSize () const { return m_Size; };
bool IsEmpty () const { return m_Buffers.empty (); }; bool IsEmpty () const { return m_Buffers.empty (); };

View file

@ -533,7 +533,7 @@ namespace client
if (sendBuf) if (sendBuf)
{ {
if (m_SendQueue.GetSize () < I2CP_MAX_SEND_QUEUE_SIZE) if (m_SendQueue.GetSize () < I2CP_MAX_SEND_QUEUE_SIZE)
m_SendQueue.Add (sendBuf); m_SendQueue.Add (std::move(sendBuf));
else else
{ {
LogPrint (eLogWarning, "I2CP: Send queue size exceeds ", I2CP_MAX_SEND_QUEUE_SIZE); LogPrint (eLogWarning, "I2CP: Send queue size exceeds ", I2CP_MAX_SEND_QUEUE_SIZE);
@ -1079,7 +1079,7 @@ namespace client
if (sendBuf) if (sendBuf)
{ {
if (m_SendQueue.GetSize () < I2CP_MAX_SEND_QUEUE_SIZE) if (m_SendQueue.GetSize () < I2CP_MAX_SEND_QUEUE_SIZE)
m_SendQueue.Add (sendBuf); m_SendQueue.Add (std::move(sendBuf));
else else
{ {
LogPrint (eLogWarning, "I2CP: Send queue size exceeds ", I2CP_MAX_SEND_QUEUE_SIZE); LogPrint (eLogWarning, "I2CP: Send queue size exceeds ", I2CP_MAX_SEND_QUEUE_SIZE);