I2NPMessagesHandler

This commit is contained in:
orignal 2015-01-22 22:00:41 -05:00
parent 3ed1fee7ce
commit 0c73aff0a2
7 changed files with 64 additions and 3 deletions

View file

@ -579,4 +579,32 @@ namespace i2p
}
}
}
I2NPMessagesHandler::~I2NPMessagesHandler ()
{
Flush ();
}
void I2NPMessagesHandler::PutNextMessage (I2NPMessage * msg)
{
if (msg)
{
if (msg->GetTypeID () == eI2NPTunnelData)
{
LogPrint ("TunnelData");
m_TunnelMsgs.push_back (msg);
}
else
HandleI2NPMessage (msg);
}
}
void I2NPMessagesHandler::Flush ()
{
if (!m_TunnelMsgs.empty ())
{
i2p::tunnel::tunnels.PostTunnelData (m_TunnelMsgs);
m_TunnelMsgs.clear ();
}
}
}

View file

@ -216,6 +216,19 @@ namespace tunnel
size_t GetI2NPMessageLength (const uint8_t * msg);
void HandleI2NPMessage (uint8_t * msg, size_t len);
void HandleI2NPMessage (I2NPMessage * msg);
class I2NPMessagesHandler
{
public:
~I2NPMessagesHandler ();
void PutNextMessage (I2NPMessage * msg);
void Flush ();
private:
std::vector<I2NPMessage *> m_TunnelMsgs;
};
}
#endif

View file

@ -481,6 +481,7 @@ namespace transport
}
if (m_ReceiveBufferOffset > 0)
memcpy (m_ReceiveBuffer, nextBlock, m_ReceiveBufferOffset);
m_Handler.Flush ();
}
ScheduleTermination (); // reset termination timer
@ -529,7 +530,7 @@ namespace transport
if (m_NextMessageOffset >= m_NextMessage->len + 4) // +checksum
{
// we have a complete I2NP message
i2p::HandleI2NPMessage (m_NextMessage);
m_Handler.PutNextMessage (m_NextMessage);
m_NextMessage = nullptr;
}
return true;

View file

@ -42,7 +42,7 @@ namespace transport
#pragma pack()
const size_t NTCP_MAX_MESSAGE_SIZE = 16384;
const size_t NTCP_BUFFER_SIZE = 1040; // fits one tunnel message (1028)
const size_t NTCP_BUFFER_SIZE = 4160; // fits 4 tunnel messages (4*1028)
const int NTCP_TERMINATION_TIMEOUT = 120; // 2 minutes
const size_t NTCP_DEFAULT_PHASE3_SIZE = 2/*size*/ + i2p::data::DEFAULT_IDENTITY_SIZE/*387*/ + 4/*ts*/ + 15/*padding*/ + 40/*signature*/; // 448
@ -131,7 +131,8 @@ namespace transport
i2p::I2NPMessage * m_NextMessage;
size_t m_NextMessageOffset;
i2p::I2NPMessagesHandler m_Handler;
size_t m_NumSentBytes, m_NumReceivedBytes;
};

12
Queue.h
View file

@ -2,6 +2,7 @@
#define QUEUE_H__
#include <queue>
#include <vector>
#include <mutex>
#include <thread>
#include <condition_variable>
@ -23,6 +24,17 @@ namespace util
m_NonEmpty.notify_one ();
}
void Put (const std::vector<Element *>& vec)
{
if (!vec.empty ())
{
std::unique_lock<std::mutex> l(m_QueueMutex);
for (auto it: vec)
m_Queue.push (it);
m_NonEmpty.notify_one ();
}
}
Element * GetNext ()
{
std::unique_lock<std::mutex> l(m_QueueMutex);

View file

@ -576,6 +576,11 @@ namespace tunnel
if (msg) m_Queue.Put (msg);
}
void Tunnels::PostTunnelData (const std::vector<I2NPMessage *>& msgs)
{
m_Queue.Put (msgs);
}
template<class TTunnel>
TTunnel * Tunnels::CreateTunnel (TunnelConfig * config, OutboundTunnel * outboundTunnel)
{

View file

@ -130,6 +130,7 @@ namespace tunnel
void AddOutboundTunnel (OutboundTunnel * newTunnel);
void AddInboundTunnel (InboundTunnel * newTunnel);
void PostTunnelData (I2NPMessage * msg);
void PostTunnelData (const std::vector<I2NPMessage *>& msgs);
template<class TTunnel>
TTunnel * CreateTunnel (TunnelConfig * config, OutboundTunnel * outboundTunnel = 0);
std::shared_ptr<TunnelPool> CreateTunnelPool (i2p::garlic::GarlicDestination * localDestination, int numInboundHops, int numOuboundHops);