Compare commits

..

1 commit

Author SHA1 Message Date
self-related
ca5115c657
Merge 32a70562c4 into 0d09a8be00 2024-11-02 11:53:58 +08:00
4 changed files with 17 additions and 27 deletions

View file

@ -337,14 +337,13 @@ namespace transport
SetTerminationTimeout (SSU2_TERMINATION_TIMEOUT); SetTerminationTimeout (SSU2_TERMINATION_TIMEOUT);
SendQueue (); SendQueue ();
transports.PeerConnected (shared_from_this ()); transports.PeerConnected (shared_from_this ());
LogPrint(eLogDebug, "SSU2: Session with ", GetRemoteEndpoint (),
" (", i2p::data::GetIdentHashAbbreviation (GetRemoteIdentity ()->GetIdentHash ()), ") established");
if (m_OnEstablished) if (m_OnEstablished)
{ {
m_OnEstablished (); m_OnEstablished ();
m_OnEstablished = nullptr; m_OnEstablished = nullptr;
} }
LogPrint(eLogDebug, "SSU2: Session with ", GetRemoteEndpoint (),
" (", i2p::data::GetIdentHashAbbreviation (GetRemoteIdentity ()->GetIdentHash ()), ") established");
} }
void SSU2Session::Done () void SSU2Session::Done ()
@ -1945,28 +1944,21 @@ namespace transport
void SSU2Session::HandleRelayRequest (const uint8_t * buf, size_t len) void SSU2Session::HandleRelayRequest (const uint8_t * buf, size_t len)
{ {
// we are Bob // we are Bob
auto mts = i2p::util::GetMillisecondsSinceEpoch ();
uint32_t nonce = bufbe32toh (buf + 1); // nonce
uint32_t relayTag = bufbe32toh (buf + 5); // relay tag uint32_t relayTag = bufbe32toh (buf + 5); // relay tag
auto session = m_Server.FindRelaySession (relayTag); auto session = m_Server.FindRelaySession (relayTag);
if (!session) if (!session)
{ {
LogPrint (eLogWarning, "SSU2: RelayRequest session with relay tag ", relayTag, " not found"); LogPrint (eLogWarning, "SSU2: RelayRequest session with relay tag ", relayTag, " not found");
// send relay response back to Alice // send relay response back to Alice
auto packet = m_Server.GetSentPacketsPool ().AcquireShared (); uint8_t payload[SSU2_MAX_PACKET_SIZE];
packet->payloadSize = CreateAckBlock (packet->payload, m_MaxPayloadSize); size_t payloadSize = CreateRelayResponseBlock (payload, m_MaxPayloadSize,
packet->payloadSize += CreateRelayResponseBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize, eSSU2RelayResponseCodeBobRelayTagNotFound, bufbe32toh (buf + 1), 0, false);
eSSU2RelayResponseCodeBobRelayTagNotFound, nonce, 0, false); payloadSize += CreatePaddingBlock (payload + payloadSize, m_MaxPayloadSize - payloadSize);
packet->payloadSize += CreatePaddingBlock (packet->payload + packet->payloadSize, m_MaxPayloadSize - packet->payloadSize); SendData (payload, payloadSize);
uint32_t packetNum = SendData (packet->payload, packet->payloadSize);
if (m_RemoteVersion >= SSU2_MIN_RELAY_RESPONSE_RESEND_VERSION)
{
// sometimes Alice doesn't ack this RelayResponse in older versions
packet->sendTime = mts;
m_SentPackets.emplace (packetNum, packet);
}
return; return;
} }
auto mts = i2p::util::GetMillisecondsSinceEpoch ();
uint32_t nonce = bufbe32toh (buf + 1);
if (session->m_RelaySessions.emplace (nonce, std::make_pair (shared_from_this (), mts/1000)).second) if (session->m_RelaySessions.emplace (nonce, std::make_pair (shared_from_this (), mts/1000)).second)
{ {
// send relay intro to Charlie // send relay intro to Charlie

View file

@ -25,7 +25,7 @@ namespace transport
{ {
template<typename Keys> template<typename Keys>
EphemeralKeysSupplier<Keys>::EphemeralKeysSupplier (int size): EphemeralKeysSupplier<Keys>::EphemeralKeysSupplier (int size):
m_QueueSize (size), m_IsRunning (false) m_QueueSize (size), m_IsRunning (false), m_Thread (nullptr)
{ {
} }
@ -39,7 +39,7 @@ namespace transport
void EphemeralKeysSupplier<Keys>::Start () void EphemeralKeysSupplier<Keys>::Start ()
{ {
m_IsRunning = true; m_IsRunning = true;
m_Thread.reset (new std::thread (std::bind (&EphemeralKeysSupplier<Keys>::Run, this))); m_Thread = new std::thread (std::bind (&EphemeralKeysSupplier<Keys>::Run, this));
} }
template<typename Keys> template<typename Keys>
@ -53,7 +53,8 @@ namespace transport
if (m_Thread) if (m_Thread)
{ {
m_Thread->join (); m_Thread->join ();
m_Thread = nullptr; delete m_Thread;
m_Thread = 0;
} }
} }
@ -77,7 +78,6 @@ namespace transport
} }
else else
{ {
m_KeysPool.CleanUpMt ();
std::unique_lock<std::mutex> l(m_AcquiredMutex); std::unique_lock<std::mutex> l(m_AcquiredMutex);
if (!m_IsRunning) break; if (!m_IsRunning) break;
m_Acquired.wait (l); // wait for element gets acquired m_Acquired.wait (l); // wait for element gets acquired
@ -92,7 +92,7 @@ namespace transport
{ {
for (int i = 0; i < num; i++) for (int i = 0; i < num; i++)
{ {
auto pair = m_KeysPool.AcquireSharedMt (); auto pair = std::make_shared<Keys> ();
pair->GenerateKeys (); pair->GenerateKeys ();
std::unique_lock<std::mutex> l(m_AcquiredMutex); std::unique_lock<std::mutex> l(m_AcquiredMutex);
m_Queue.push (pair); m_Queue.push (pair);
@ -114,7 +114,7 @@ namespace transport
} }
} }
// queue is empty, create new // queue is empty, create new
auto pair = m_KeysPool.AcquireSharedMt (); auto pair = std::make_shared<Keys> ();
pair->GenerateKeys (); pair->GenerateKeys ();
return pair; return pair;
} }

View file

@ -26,7 +26,6 @@
#include "RouterInfo.h" #include "RouterInfo.h"
#include "I2NPProtocol.h" #include "I2NPProtocol.h"
#include "Identity.h" #include "Identity.h"
#include "util.h"
namespace i2p namespace i2p
{ {
@ -54,10 +53,9 @@ namespace transport
const int m_QueueSize; const int m_QueueSize;
std::queue<std::shared_ptr<Keys> > m_Queue; std::queue<std::shared_ptr<Keys> > m_Queue;
i2p::util::MemoryPoolMt<Keys> m_KeysPool;
bool m_IsRunning; bool m_IsRunning;
std::unique_ptr<std::thread> m_Thread; std::thread * m_Thread;
std::condition_variable m_Acquired; std::condition_variable m_Acquired;
std::mutex m_AcquiredMutex; std::mutex m_AcquiredMutex;
}; };