mirror of
https://github.com/PurpleI2P/i2pd.git
synced 2025-04-27 11:17:49 +02:00
add initial connection timeout for i2ptunnel
This commit is contained in:
parent
416589cc93
commit
1ea6d2016d
6 changed files with 408 additions and 298 deletions
|
@ -2,6 +2,7 @@
|
|||
#include "Identity.h"
|
||||
#include "ClientContext.h"
|
||||
#include "I2PService.h"
|
||||
#include <boost/asio/error.hpp>
|
||||
|
||||
namespace i2p
|
||||
{
|
||||
|
@ -11,37 +12,101 @@ namespace client
|
|||
|
||||
I2PService::I2PService (std::shared_ptr<ClientDestination> localDestination):
|
||||
m_LocalDestination (localDestination ? localDestination :
|
||||
i2p::client::context.CreateNewLocalDestination (false, I2P_SERVICE_DEFAULT_KEY_TYPE)), isUpdated (true)
|
||||
i2p::client::context.CreateNewLocalDestination (false, I2P_SERVICE_DEFAULT_KEY_TYPE)),
|
||||
m_ReadyTimer(m_LocalDestination->GetService()),
|
||||
m_ConnectTimeout(0),
|
||||
isUpdated (true)
|
||||
{
|
||||
m_LocalDestination->Acquire ();
|
||||
m_LocalDestination->Acquire ();
|
||||
}
|
||||
|
||||
|
||||
I2PService::I2PService (i2p::data::SigningKeyType kt):
|
||||
m_LocalDestination (i2p::client::context.CreateNewLocalDestination (false, kt)),
|
||||
m_ReadyTimer(m_LocalDestination->GetService()),
|
||||
m_ConnectTimeout(0),
|
||||
isUpdated (true)
|
||||
{
|
||||
m_LocalDestination->Acquire ();
|
||||
}
|
||||
|
||||
I2PService::~I2PService ()
|
||||
{
|
||||
ClearHandlers ();
|
||||
if (m_LocalDestination) m_LocalDestination->Release ();
|
||||
|
||||
I2PService::~I2PService ()
|
||||
{
|
||||
ClearHandlers ();
|
||||
if (m_LocalDestination) m_LocalDestination->Release ();
|
||||
}
|
||||
|
||||
void I2PService::ClearHandlers ()
|
||||
{
|
||||
if(m_ConnectTimeout)
|
||||
m_ReadyTimer.cancel();
|
||||
std::unique_lock<std::mutex> l(m_HandlersMutex);
|
||||
for (auto it: m_Handlers)
|
||||
it->Terminate ();
|
||||
m_Handlers.clear();
|
||||
}
|
||||
|
||||
|
||||
void I2PService::SetConnectTimeout(uint32_t timeout)
|
||||
{
|
||||
if(timeout && !m_ConnectTimeout)
|
||||
{
|
||||
TriggerReadyCheckTimer();
|
||||
}
|
||||
else if (m_ConnectTimeout && !timeout)
|
||||
{
|
||||
m_ReadyTimer.cancel();
|
||||
}
|
||||
m_ConnectTimeout = timeout;
|
||||
}
|
||||
|
||||
void I2PService::AddReadyCallback(ReadyCallback cb)
|
||||
{
|
||||
uint32_t now = i2p::util::GetSecondsSinceEpoch();
|
||||
uint32_t tm = now + m_ConnectTimeout;
|
||||
LogPrint(eLogDebug, "I2PService::AddReadyCallback() ", tm, " ", now);
|
||||
m_ReadyCallbacks.push_back({cb, tm});
|
||||
}
|
||||
|
||||
void I2PService::TriggerReadyCheckTimer()
|
||||
{
|
||||
m_ReadyTimer.expires_from_now(boost::posix_time::seconds (1));
|
||||
m_ReadyTimer.async_wait(std::bind(&I2PService::HandleReadyCheckTimer, this, std::placeholders::_1));
|
||||
}
|
||||
|
||||
void I2PService::HandleReadyCheckTimer(const boost::system::error_code &ec)
|
||||
{
|
||||
if(ec || m_LocalDestination->IsReady())
|
||||
{
|
||||
for(auto & itr : m_ReadyCallbacks)
|
||||
itr.first(ec);
|
||||
m_ReadyCallbacks.clear();
|
||||
}
|
||||
else if(!m_LocalDestination->IsReady())
|
||||
{
|
||||
// expire timed out requests
|
||||
uint32_t now = i2p::util::GetSecondsSinceEpoch ();
|
||||
auto itr = m_ReadyCallbacks.begin();
|
||||
while(itr != m_ReadyCallbacks.end())
|
||||
{
|
||||
if(itr->second >= now)
|
||||
{
|
||||
itr->first(boost::asio::error::timed_out);
|
||||
itr = m_ReadyCallbacks.erase(itr);
|
||||
}
|
||||
else
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
if(!ec)
|
||||
TriggerReadyCheckTimer();
|
||||
}
|
||||
|
||||
void I2PService::CreateStream (StreamRequestComplete streamRequestComplete, const std::string& dest, int port) {
|
||||
assert(streamRequestComplete);
|
||||
i2p::data::IdentHash identHash;
|
||||
if (i2p::client::context.GetAddressBook ().GetIdentHash (dest, identHash))
|
||||
m_LocalDestination->CreateStream (streamRequestComplete, identHash, port);
|
||||
{
|
||||
CreateStream(streamRequestComplete, identHash, port);
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint (eLogWarning, "I2PService: Remote destination not found: ", dest);
|
||||
|
@ -49,6 +114,29 @@ namespace client
|
|||
}
|
||||
}
|
||||
|
||||
void I2PService::CreateStream(StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash & identHash, int port)
|
||||
{
|
||||
if(m_ConnectTimeout)
|
||||
{
|
||||
if(m_LocalDestination->IsReady())
|
||||
m_LocalDestination->CreateStream (streamRequestComplete, identHash, port);
|
||||
else
|
||||
{
|
||||
AddReadyCallback([this, streamRequestComplete, identHash, port] (const boost::system::error_code & ec) {
|
||||
if(ec)
|
||||
{
|
||||
LogPrint(eLogWarning, "I2PService::CeateStream() ", ec.message());
|
||||
streamRequestComplete(nullptr);
|
||||
}
|
||||
else
|
||||
this->m_LocalDestination->CreateStream(streamRequestComplete, identHash, port);
|
||||
});
|
||||
}
|
||||
}
|
||||
else
|
||||
m_LocalDestination->CreateStream(streamRequestComplete, identHash, port);
|
||||
}
|
||||
|
||||
TCPIPPipe::TCPIPPipe(I2PService * owner, std::shared_ptr<boost::asio::ip::tcp::socket> upstream, std::shared_ptr<boost::asio::ip::tcp::socket> downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream)
|
||||
{
|
||||
boost::asio::socket_base::receive_buffer_size option(TCP_IP_PIPE_BUFFER_SIZE);
|
||||
|
@ -60,7 +148,7 @@ namespace client
|
|||
{
|
||||
Terminate();
|
||||
}
|
||||
|
||||
|
||||
void TCPIPPipe::Start()
|
||||
{
|
||||
AsyncReceiveUpstream();
|
||||
|
@ -84,7 +172,7 @@ namespace client
|
|||
}
|
||||
Done(shared_from_this());
|
||||
}
|
||||
|
||||
|
||||
void TCPIPPipe::AsyncReceiveUpstream()
|
||||
{
|
||||
if (m_up) {
|
||||
|
@ -132,12 +220,12 @@ namespace client
|
|||
shared_from_this(),
|
||||
std::placeholders::_1)
|
||||
);
|
||||
} else {
|
||||
} else {
|
||||
LogPrint(eLogError, "TCPIPPipe: downstream write: no socket");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
void TCPIPPipe::HandleDownstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transfered)
|
||||
{
|
||||
LogPrint(eLogDebug, "TCPIPPipe: downstream: ", (int) bytes_transfered, " bytes received");
|
||||
|
@ -162,7 +250,7 @@ namespace client
|
|||
AsyncReceiveUpstream();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void TCPIPPipe::HandleUpstreamWrite(const boost::system::error_code & ecode) {
|
||||
if (ecode) {
|
||||
LogPrint(eLogError, "TCPIPPipe: upstream write error:" , ecode.message());
|
||||
|
@ -172,7 +260,7 @@ namespace client
|
|||
AsyncReceiveDownstream();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void TCPIPPipe::HandleUpstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transfered)
|
||||
{
|
||||
LogPrint(eLogDebug, "TCPIPPipe: upstream ", (int)bytes_transfered, " bytes received");
|
||||
|
@ -187,7 +275,7 @@ namespace client
|
|||
DownstreamWrite(bytes_transfered);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void TCPIPAcceptor::Start ()
|
||||
{
|
||||
m_Acceptor.reset (new boost::asio::ip::tcp::acceptor (GetService (), m_LocalEndpoint));
|
||||
|
@ -198,10 +286,10 @@ namespace client
|
|||
void TCPIPAcceptor::Stop ()
|
||||
{
|
||||
if (m_Acceptor)
|
||||
{
|
||||
{
|
||||
m_Acceptor->close();
|
||||
m_Acceptor.reset (nullptr);
|
||||
}
|
||||
}
|
||||
m_Timer.cancel ();
|
||||
ClearHandlers();
|
||||
}
|
||||
|
@ -219,12 +307,12 @@ namespace client
|
|||
{
|
||||
LogPrint(eLogDebug, "I2PService: ", GetName(), " accepted");
|
||||
auto handler = CreateHandler(socket);
|
||||
if (handler)
|
||||
if (handler)
|
||||
{
|
||||
AddHandler(handler);
|
||||
handler->Handle();
|
||||
}
|
||||
else
|
||||
}
|
||||
else
|
||||
socket->close();
|
||||
Accept();
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue