diff --git a/libi2pd/ECIESX25519AEADRatchetSession.h b/libi2pd/ECIESX25519AEADRatchetSession.h index fd9cc45d..5fa947b7 100644 --- a/libi2pd/ECIESX25519AEADRatchetSession.h +++ b/libi2pd/ECIESX25519AEADRatchetSession.h @@ -181,6 +181,7 @@ namespace garlic { if (!m_Destination) m_Destination.reset (new i2p::data::IdentHash (dest)); } + const i2p::data::IdentHash * GetDestinationPtr () const { return m_Destination ? m_Destination.get () : nullptr; }; // for pongs bool CheckExpired (uint64_t ts); // true is expired bool CanBeRestarted (uint64_t ts) const { return ts > m_SessionCreatedTimestamp + ECIESX25519_RESTART_TIMEOUT; } bool IsInactive (uint64_t ts) const { return ts > m_LastActivityTimestamp + ECIESX25519_INACTIVITY_TIMEOUT && CanBeRestarted (ts); } diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index 2651fd55..7e8747ad 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -1991,6 +1991,7 @@ namespace stream void StreamingDestination::Stop () { ResetAcceptor (); + ResetPongHandler (); m_PendingIncomingTimer.cancel (); m_PendingIncomingStreams.clear (); { @@ -2037,6 +2038,8 @@ namespace stream { // pong LogPrint (eLogInfo, "Streaming: Pong received rSID=", packet->GetReceiveStreamID ()); + if (m_PongHandler != nullptr) + m_PongHandler (packet->from ? packet->from->GetDestinationPtr () : nullptr); DeletePacket (packet); return; } @@ -2216,6 +2219,16 @@ namespace stream m_Acceptor = nullptr; } + void StreamingDestination::SetPongHandler (const PongHandler& handler) + { + m_PongHandler = handler; + } + + void StreamingDestination::ResetPongHandler () + { + m_PongHandler = nullptr; + } + void StreamingDestination::AcceptOnce (const Acceptor& acceptor) { boost::asio::post (m_Owner->GetService (), [acceptor, this](void) diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 5aeced26..d3bd314e 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -312,6 +312,7 @@ namespace stream public: typedef std::function)> Acceptor; + typedef std::function PongHandler; StreamingDestination (std::shared_ptr owner, uint16_t localPort = 0, bool gzip = false); ~StreamingDestination (); @@ -330,7 +331,9 @@ namespace stream void AcceptOnce (const Acceptor& acceptor); void AcceptOnceAcceptor (std::shared_ptr stream, Acceptor acceptor, Acceptor prev); std::shared_ptr AcceptStream (int timeout = 0); // sync - + void SetPongHandler (const PongHandler& handler); + void ResetPongHandler (); + std::shared_ptr GetOwner () const { return m_Owner; }; void SetOwner (std::shared_ptr owner) { m_Owner = owner; }; uint16_t GetLocalPort () const { return m_LocalPort; }; @@ -358,6 +361,7 @@ namespace stream std::unordered_map > m_IncomingStreams; // receiveStreamID->stream std::shared_ptr m_LastStream; Acceptor m_Acceptor; + PongHandler m_PongHandler; std::list > m_PendingIncomingStreams; boost::asio::deadline_timer m_PendingIncomingTimer; std::unordered_map > m_SavedPackets; // receiveStreamID->packets, arrived before SYN diff --git a/libi2pd_client/BOB.cpp b/libi2pd_client/BOB.cpp index 3f02f779..d8bf76a9 100644 --- a/libi2pd_client/BOB.cpp +++ b/libi2pd_client/BOB.cpp @@ -355,6 +355,17 @@ namespace client Send (); } + void BOBCommandSession::SendReplyOK (const std::vector& strings) + { + std::ostream os(&m_SendBuffer); + os << "OK"; + if (!strings.empty ()) os << " "; + for (auto& it: strings) + os << it; + os << std::endl; + Send (); + } + void BOBCommandSession::SendReplyError (const char * msg) { std::ostream os(&m_SendBuffer); @@ -802,6 +813,64 @@ namespace client SendReplyError ("empty lookup address"); } + void BOBCommandSession::PingCommandHandler (const char * operand, size_t len) + { + LogPrint (eLogDebug, "BOB: ping ", operand); + if (*operand) + { + auto addr = context.GetAddressBook ().GetAddress (operand); + if (!addr) + { + SendReplyError ("Address Not found"); + return; + } + auto localDestination = (m_CurrentDestination && m_CurrentDestination->IsRunning ()) ? + m_CurrentDestination->GetLocalDestination () : i2p::client::context.GetSharedLocalDestination (); + if (!localDestination) + { + SendReplyError ("No local destination"); + return; + } + auto streamingDestination = localDestination->GetStreamingDestination (); + if (!streamingDestination) + { + SendReplyError ("No streaming destination"); + return; + } + auto timer = std::make_shared(localDestination->GetService ()); + timer->expires_from_now (boost::posix_time::milliseconds(BOB_PING_TIMEOUT)); + timer->async_wait ([streamingDestination, s = shared_from_this ()](const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + LogPrint (eLogDebug, "BOB: Pong not received after ", BOB_PING_TIMEOUT, " millliseconds"); + streamingDestination->ResetPongHandler (); + s->SendReplyError ("timeout"); + } + }); + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + streamingDestination->SetPongHandler ( + [streamingDestination, timer, ts, s = shared_from_this ()](const i2p::data::IdentHash * from) + { + int t = i2p::util::GetMillisecondsSinceEpoch () - ts; + if (t < 0) t = 0; + streamingDestination->ResetPongHandler (); + timer->cancel (); + if (from) + s->SendReplyOK ({"pong ", "from ", from->ToBase32(), ".b32.i2p: time=", std::to_string (t), " ms"}); + else + s->SendReplyOK ({"pong: time=", std::to_string (t), " ms"}); + }); + + if (addr->IsIdentHash ()) + localDestination->SendPing (addr->identHash); + else + localDestination->SendPing (addr->blindedPublicKey); + } + else + SendReplyError ("empty ping address"); + } + void BOBCommandSession::ClearCommandHandler (const char * operand, size_t len) { LogPrint (eLogDebug, "BOB: clear"); @@ -949,6 +1018,7 @@ namespace client m_CommandHandlers[BOB_COMMAND_QUIET] = &BOBCommandSession::QuietCommandHandler; m_CommandHandlers[BOB_COMMAND_LOOKUP] = &BOBCommandSession::LookupCommandHandler; m_CommandHandlers[BOB_COMMAND_LOOKUP_LOCAL] = &BOBCommandSession::LookupLocalCommandHandler; + m_CommandHandlers[BOB_COMMAND_PING] = &BOBCommandSession::PingCommandHandler; m_CommandHandlers[BOB_COMMAND_CLEAR] = &BOBCommandSession::ClearCommandHandler; m_CommandHandlers[BOB_COMMAND_LIST] = &BOBCommandSession::ListCommandHandler; m_CommandHandlers[BOB_COMMAND_OPTION] = &BOBCommandSession::OptionCommandHandler; diff --git a/libi2pd_client/BOB.h b/libi2pd_client/BOB.h index 7b92c85d..c0a5da14 100644 --- a/libi2pd_client/BOB.h +++ b/libi2pd_client/BOB.h @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include #include #include "util.h" @@ -29,6 +31,8 @@ namespace i2p namespace client { const size_t BOB_COMMAND_BUFFER_SIZE = 1024; + const int BOB_PING_TIMEOUT = 8000; // in milliseconds + const char BOB_COMMAND_ZAP[] = "zap"; const char BOB_COMMAND_QUIT[] = "quit"; const char BOB_COMMAND_START[] = "start"; @@ -46,6 +50,7 @@ namespace client const char BOB_COMMAND_QUIET[] = "quiet"; const char BOB_COMMAND_LOOKUP[] = "lookup"; const char BOB_COMMAND_LOOKUP_LOCAL[] = "lookuplocal"; + const char BOB_COMMAND_PING[] = "ping"; const char BOB_COMMAND_CLEAR[] = "clear"; const char BOB_COMMAND_LIST[] = "list"; const char BOB_COMMAND_OPTION[] = "option"; @@ -233,6 +238,7 @@ namespace client void QuietCommandHandler (const char * operand, size_t len); void LookupCommandHandler (const char * operand, size_t len); void LookupLocalCommandHandler (const char * operand, size_t len); + void PingCommandHandler (const char * operand, size_t len); void ClearCommandHandler (const char * operand, size_t len); void ListCommandHandler (const char * operand, size_t len); void OptionCommandHandler (const char * operand, size_t len); @@ -249,6 +255,7 @@ namespace client void Send (); void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void SendReplyOK (const char * msg = nullptr); + void SendReplyOK (const std::vector& strings); void SendReplyError (const char * msg); void SendRaw (const char * data);