From d399ce9a74eb0189d66ff243cbc3badae4ff75cc Mon Sep 17 00:00:00 2001 From: det Date: Sun, 20 Mar 2022 16:57:38 +0000 Subject: [PATCH] Setting up some response to file descriptor exhaustion --- libi2pd/NTCP2.cpp | 57 ++++++++++++++++++++++++++++++++++++++++++----- libi2pd/NTCP2.h | 8 +++++++ 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/libi2pd/NTCP2.cpp b/libi2pd/NTCP2.cpp index 4714506a..28174fe7 100644 --- a/libi2pd/NTCP2.cpp +++ b/libi2pd/NTCP2.cpp @@ -1177,7 +1177,8 @@ namespace transport NTCP2Server::NTCP2Server (): RunnableServiceWithWork ("NTCP2"), m_TerminationTimer (GetService ()), - m_ProxyType(eNoProxy), m_Resolver(GetService ()) + m_ProxyType(eNoProxy), m_Resolver(GetService ()), + m_listen_backlog(boost::asio::socket_base::max_listen_connections) { } @@ -1223,6 +1224,7 @@ namespace transport auto ep = m_Address4 ? boost::asio::ip::tcp::endpoint (m_Address4->address(), address->port): boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), address->port); m_NTCP2Acceptor.reset (new boost::asio::ip::tcp::acceptor (GetService (), ep)); + m_NTCP2Acceptor->listen(this->m_listen_backlog); } catch ( std::exception & ex ) { @@ -1261,7 +1263,7 @@ namespace transport else if (m_YggdrasilAddress && !context.SupportsV6 ()) ep = boost::asio::ip::tcp::endpoint (m_YggdrasilAddress->address(), address->port); m_NTCP2V6Acceptor->bind (ep); - m_NTCP2V6Acceptor->listen (); + m_NTCP2V6Acceptor->listen(this->m_listen_backlog); LogPrint (eLogInfo, "NTCP2: Start listening v6 TCP port ", address->port); auto conn = std::make_shared (*this); @@ -1325,6 +1327,7 @@ namespace transport { if (session && session->GetRemoteIdentity ()) m_NTCP2Sessions.erase (session->GetRemoteIdentity ()->GetIdentHash ()); + this->ResumeAcceptingIfOverloaded(); } std::shared_ptr NTCP2Server::FindNTCP2Session (const i2p::data::IdentHash& ident) @@ -1422,9 +1425,10 @@ namespace transport } else LogPrint (eLogError, "NTCP2: Connected from error ", ec.message ()); - } - else + } else { LogPrint (eLogError, "NTCP2: Accept error ", error.message ()); + this->CheckIfOverloaded(conn, error, true); + } if (error != boost::asio::error::operation_aborted) { @@ -1455,11 +1459,54 @@ namespace transport } else LogPrint (eLogError, "NTCP2: Connected from error ", ec.message ()); + } else { + LogPrint (eLogError, "NTCP2: Accept error ", error.message ()); + this->CheckIfOverloaded(conn, error, false); } if (error != boost::asio::error::operation_aborted) { - conn = std::make_shared (*this); + if (!conn) // connection is used, create new one + conn = std::make_shared (*this); + else // reuse failed + conn->Close (); + m_NTCP2V6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCP2Server::HandleAcceptV6, this, + conn, std::placeholders::_1)); + } + } + + void NTCP2Server::CheckIfOverloaded (std::shared_ptr conn, const boost::system::error_code& error, bool isv4) + { + // TODO: high water / low water sophistry + if(error == boost::system::errc::too_many_files_open_in_system || + error == boost::system::errc::too_many_files_open) { + // I have to wait until a file closes to accept more. + conn->Close(); + this->m_overloaded = true; + this->m_listen_backlog = (this->m_listen_backlog + 128) >> 1; + m_NTCP2Acceptor->listen(this->m_listen_backlog); + m_NTCP2V6Acceptor->listen(this->m_listen_backlog); + } + if(this->m_overloaded) { + if(isv4) { + this->need_resume_v4 = true; + } else { + this->need_resume_v6 = true; + } + } + } + + void NTCP2Server::ResumeAcceptingIfOverloaded () + { + if(!this->m_overloaded) return; + std::shared_ptr conn = std::make_shared(*this); + if(this->need_resume_v4) { + this->need_resume_v4 = false; + m_NTCP2Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCP2Server::HandleAccept, this, + conn, std::placeholders::_1)); + } + if(this->need_resume_v6) { + this->need_resume_v6 = false; m_NTCP2V6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCP2Server::HandleAcceptV6, this, conn, std::placeholders::_1)); } diff --git a/libi2pd/NTCP2.h b/libi2pd/NTCP2.h index 46c47756..8561500b 100644 --- a/libi2pd/NTCP2.h +++ b/libi2pd/NTCP2.h @@ -264,6 +264,9 @@ namespace transport void HandleAccept (std::shared_ptr conn, const boost::system::error_code& error); void HandleAcceptV6 (std::shared_ptr conn, const boost::system::error_code& error); + void CheckIfOverloaded (std::shared_ptr conn, const boost::system::error_code& error, bool isv4); + void ResumeAcceptingIfOverloaded(); + void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn, std::shared_ptr timer); void HandleProxyConnect(const boost::system::error_code& ecode, std::shared_ptr conn, std::shared_ptr timer); void AfterSocksHandshake(std::shared_ptr conn, std::shared_ptr timer); @@ -286,6 +289,11 @@ namespace transport std::unique_ptr m_ProxyEndpoint; std::shared_ptr m_Address4, m_Address6, m_YggdrasilAddress; + bool m_overloaded; + bool need_resume_v4; + bool need_resume_v6; + int m_listen_backlog; + public: // for HTTP/I2PControl