diff --git a/libi2pd_client/UDPTunnel.cpp b/libi2pd_client/UDPTunnel.cpp index ea3df58e..0edf5518 100644 --- a/libi2pd_client/UDPTunnel.cpp +++ b/libi2pd_client/UDPTunnel.cpp @@ -148,8 +148,18 @@ namespace client return s; } + void UDPConnection::Stop () + { + m_AckTimer.cancel (); + } + void UDPConnection::Acked (uint32_t seqn) { + if (m_AckTimerSeqn && seqn >= m_AckTimerSeqn) + { + m_AckTimerSeqn = 0; + m_AckTimer.cancel (); + } if (m_UnackedDatagrams.empty () && seqn < m_UnackedDatagrams.front ().first) return; auto it = m_UnackedDatagrams.begin (); while (it != m_UnackedDatagrams.end ()) @@ -164,16 +174,42 @@ namespace client } m_UnackedDatagrams.erase (m_UnackedDatagrams.begin (), it); } + + void UDPConnection::ScheduleAckTimer (uint32_t seqn) + { + if (!m_AckTimerSeqn) + { + m_AckTimerSeqn = seqn; + m_AckTimer.expires_from_now (boost::posix_time::milliseconds (m_RTT ? 2*m_RTT : I2P_UDP_MAX_UNACKED_DATAGRAM_TIME)); + m_AckTimer.async_wait ([this](const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + LogPrint (eLogInfo, "UDP Connection: Packet ", m_AckTimerSeqn, " was not acked"); + m_AckTimerSeqn = 0; + m_RTT = 0; + // send empty packet with reset path flag + i2p::util::Mapping options; + options.Put (UDP_SESSION_FLAGS, UDP_SESSION_FLAG_RESET_PATH | UDP_SESSION_FLAG_ACK_REQUESTED); + auto session = GetDatagramSession (); + session->DropSharedRoutingPath (); + GetDatagramDestination ()->SendDatagram (session, nullptr, 0, 0, 0, &options); + } + }); + } + } UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint, const std::shared_ptr & localDestination, const boost::asio::ip::udp::endpoint& endpoint, const i2p::data::IdentHash& to, uint16_t ourPort, uint16_t theirPort) : + UDPConnection (localDestination->GetService()), m_Destination(localDestination->GetDatagramDestination()), IPSocket(localDestination->GetService(), localEndpoint), Identity (to), SendEndpoint(endpoint), LastActivity(i2p::util::GetMillisecondsSinceEpoch()), LocalPort(ourPort), RemotePort(theirPort) { + Start (); IPSocket.set_option (boost::asio::socket_base::receive_buffer_size (I2P_UDP_MAX_MTU )); IPSocket.non_blocking (true); Receive(); @@ -200,11 +236,12 @@ namespace client if (session->GetVersion () == i2p::datagram::eDatagramV3) { uint8_t flags = 0; - if (!m_RTT || (!m_UnackedDatagrams.empty () && + if (!m_RTT || !m_AckTimerSeqn || (!m_UnackedDatagrams.empty () && ts > m_UnackedDatagrams.back ().second + repliableDatagramInterval)) // last ack request { flags |= UDP_SESSION_FLAG_ACK_REQUESTED; m_UnackedDatagrams.push_back ({ m_NextSendPacketNum, ts }); + ScheduleAckTimer (m_NextSendPacketNum); } i2p::util::Mapping options; options.Put (UDP_SESSION_SEQN, m_NextSendPacketNum); @@ -213,6 +250,7 @@ namespace client if (flags) options.Put (UDP_SESSION_FLAGS, flags); m_Destination->SendDatagram(session, m_Buffer, len, LocalPort, RemotePort, &options); + ScheduleAckTimer (m_NextSendPacketNum); } else m_Destination->SendDatagram(session, m_Buffer, len, LocalPort, RemotePort); @@ -286,6 +324,7 @@ namespace client dgram->ResetReceiver (m_inPort); dgram->ResetRawReceiver (m_inPort); } + m_Sessions.clear (); } std::vector > I2PUDPServerTunnel::GetSessions () @@ -315,10 +354,10 @@ namespace client const boost::asio::ip::udp::endpoint& localEndpoint, std::shared_ptr localDestination, uint16_t remotePort, bool gzip, i2p::datagram::DatagramVersion datagramVersion) : + UDPConnection (localDestination->GetService ()), m_Name (name), m_RemoteDest (remoteDest), m_LocalDest (localDestination), m_LocalEndpoint (localEndpoint), m_ResolveThread (nullptr), m_LocalSocket (nullptr), RemotePort (remotePort), - m_LastPort (0), m_cancel_resolve (false), m_Gzip (gzip), m_DatagramVersion (datagramVersion), - m_AckTimer (localDestination->GetService ()), m_AckTimerSeqn (0) + m_LastPort (0), m_cancel_resolve (false), m_Gzip (gzip), m_DatagramVersion (datagramVersion) { } @@ -329,6 +368,7 @@ namespace client void I2PUDPClientTunnel::Start () { + UDPConnection::Start (); // Reset flag in case of tunnel reload if (m_cancel_resolve) m_cancel_resolve = false; @@ -376,7 +416,7 @@ namespace client m_ResolveThread = nullptr; } m_RemoteAddr = nullptr; - m_AckTimer.cancel (); + UDPConnection::Stop (); } void I2PUDPClientTunnel::RecvFromLocal () @@ -438,6 +478,7 @@ namespace client { flags |= UDP_SESSION_FLAG_ACK_REQUESTED; m_UnackedDatagrams.push_back ({ m_NextSendPacketNum, ts }); + ScheduleAckTimer (m_NextSendPacketNum); } i2p::util::Mapping options; options.Put (UDP_SESSION_SEQN, m_NextSendPacketNum); @@ -446,7 +487,6 @@ namespace client if (flags) options.Put (UDP_SESSION_FLAGS, flags); m_LocalDest->GetDatagramDestination ()->SendDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort, &options); - ScheduleAckTimer (m_NextSendPacketNum); } else m_LocalDest->GetDatagramDestination ()->SendDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort); @@ -567,40 +607,5 @@ namespace client else LogPrint (eLogWarning, "UDP Client: Not tracking udp session using port ", (int) toPort); } - - void I2PUDPClientTunnel::Acked (uint32_t seqn) - { - if (m_AckTimerSeqn && seqn >= m_AckTimerSeqn) - { - m_AckTimerSeqn = 0; - m_AckTimer.cancel (); - } - UDPConnection::Acked (seqn); - } - - void I2PUDPClientTunnel::ScheduleAckTimer (uint32_t seqn) - { - if (!m_AckTimerSeqn) - { - m_AckTimerSeqn = seqn; - m_AckTimer.expires_from_now (boost::posix_time::milliseconds (m_RTT ? 2*m_RTT : I2P_UDP_MAX_UNACKED_DATAGRAM_TIME)); - m_AckTimer.async_wait ([this](const boost::system::error_code& ecode) - { - if (ecode != boost::asio::error::operation_aborted) - { - LogPrint (eLogInfo, "UDP Client: Packet ", m_AckTimerSeqn, " was not acked"); - m_AckTimerSeqn = 0; - m_RTT = 0; - // send empty packet with reset path flag - i2p::util::Mapping options; - options.Put (UDP_SESSION_FLAGS, UDP_SESSION_FLAG_RESET_PATH | UDP_SESSION_FLAG_ACK_REQUESTED); - auto session = GetDatagramSession (); - session->DropSharedRoutingPath (); - m_LocalDest->GetDatagramDestination ()->SendDatagram (session, nullptr, 0, 0, 0, &options); - } - }); - } - } - } } diff --git a/libi2pd_client/UDPTunnel.h b/libi2pd_client/UDPTunnel.h index 9a72eec0..1e3f7ba0 100644 --- a/libi2pd_client/UDPTunnel.h +++ b/libi2pd_client/UDPTunnel.h @@ -41,8 +41,20 @@ namespace client uint32_t m_NextSendPacketNum = 1, m_LastReceivedPacketNum = 0; std::list > m_UnackedDatagrams; // list of sent but not acked repliable datagrams(seqn, timestamp) in ascending order uint64_t m_RTT = 0; // milliseconds + + boost::asio::deadline_timer m_AckTimer; + uint32_t m_AckTimerSeqn = 0; - virtual void Acked (uint32_t seqn); + UDPConnection (boost::asio::io_context& service): m_AckTimer (service) {}; + virtual ~UDPConnection () { Stop (); }; + virtual void Start () {}; + virtual void Stop (); + + void Acked (uint32_t seqn); + void ScheduleAckTimer (uint32_t seqn); + + virtual std::shared_ptr GetDatagramSession () = 0; + virtual i2p::datagram::DatagramDestination * GetDatagramDestination () const = 0; }; struct UDPSession: public UDPConnection // for server side @@ -66,7 +78,8 @@ namespace client uint16_t ourPort, uint16_t theirPort); void HandleReceived(const boost::system::error_code & ecode, std::size_t len); void Receive(); - std::shared_ptr GetDatagramSession (); + std::shared_ptr GetDatagramSession () override; + i2p::datagram::DatagramDestination * GetDatagramDestination () const override { return m_Destination; } }; @@ -149,8 +162,8 @@ namespace client uint16_t remotePort, bool gzip, i2p::datagram::DatagramVersion datagramVersion); ~I2PUDPClientTunnel (); - void Start (); - void Stop (); + void Start () override; + void Stop () override; const char * GetName () const { return m_Name.c_str(); } std::vector > GetSessions (); @@ -175,9 +188,9 @@ namespace client const uint8_t * buf, size_t len, const i2p::util::Mapping * options); void HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); void TryResolving (); - std::shared_ptr GetDatagramSession (); - void Acked (uint32_t seqn) override; - void ScheduleAckTimer (uint32_t seqn); + std::shared_ptr GetDatagramSession () override; + i2p::datagram::DatagramDestination * GetDatagramDestination () const override + { return m_LocalDest ? m_LocalDest->GetDatagramDestination () : nullptr; } private: @@ -199,9 +212,6 @@ namespace client std::shared_ptr m_LastSession; std::weak_ptr m_LastDatagramSession; - boost::asio::deadline_timer m_AckTimer; - uint32_t m_AckTimerSeqn; - public: bool isUpdated; // transient, used during reload only