diff --git a/Config.cpp b/Config.cpp index 20036b83..cd617f6f 100644 --- a/Config.cpp +++ b/Config.cpp @@ -90,7 +90,8 @@ namespace config { ("httpproxy.inbound.quantity", value()->default_value("5"), "HTTP proxy inbound tunnels quantity") ("httpproxy.outbound.quantity", value()->default_value("5"), "HTTP proxy outbound tunnels quantity") ("httpproxy.latency.min", value()->default_value("0"), "HTTP proxy min latency for tunnels") - ("httpproxy.latency.max", value()->default_value("0"), "HTTP proxy max latency for tunnels") + ("httpproxy.latency.max", value()->default_value("0"), "HTTP proxy max latency for tunnels") + ("httpproxy.outproxy", value()->default_value(""), "HTTP proxy upstream out proxy url") ; options_description socksproxy("SOCKS Proxy options"); diff --git a/HTTPProxy.cpp b/HTTPProxy.cpp index c40989d1..1750e582 100644 --- a/HTTPProxy.cpp +++ b/HTTPProxy.cpp @@ -64,15 +64,37 @@ namespace proxy { void HostNotFound(std::string & host); void SendProxyError(std::string & content); + void ForwardToUpstreamProxy(); + void HandleUpstreamHTTPProxyConnect(const boost::system::error_code & ec); + void HandleUpstreamSocksProxyConnect(const boost::system::error_code & ec); + + void HandleSocksProxySendHandshake(const boost::system::error_code & ec, std::size_t bytes_transfered); + void HandleSocksProxyReply(const boost::system::error_code & ec, std::size_t bytes_transfered); + + typedef std::function ProxyResolvedHandler; + + void HandleUpstreamProxyResolved(const boost::system::error_code & ecode, boost::asio::ip::tcp::resolver::iterator itr, ProxyResolvedHandler handler); + + void SocksProxySuccess(); + void HandoverToUpstreamProxy(); + uint8_t m_recv_chunk[8192]; std::string m_recv_buf; // from client std::string m_send_buf; // to upstream std::shared_ptr m_sock; - + std::shared_ptr m_proxysock; + boost::asio::ip::tcp::resolver m_proxy_resolver; + i2p::http::URL m_RequestURL; + i2p::http::URL m_ProxyURL; + std::string m_HTTPMethod; + uint8_t m_socks_buf[255+8]; // for socks request/response + public: HTTPReqHandler(HTTPProxy * parent, std::shared_ptr sock) : - I2PServiceHandler(parent), m_sock(sock) {} + I2PServiceHandler(parent), m_sock(sock), + m_proxysock(std::make_shared(parent->GetService())), + m_proxy_resolver(parent->GetService()) {} ~HTTPReqHandler() { Terminate(); } void Handle () { AsyncSockRead(); } /* overload */ }; @@ -97,6 +119,12 @@ namespace proxy { m_sock->close(); m_sock = nullptr; } + if(m_proxysock) + { + if(m_proxysock->is_open()) + m_proxysock->close(); + m_proxysock = nullptr; + } Done(shared_from_this()); } @@ -142,7 +170,7 @@ namespace proxy { << "\r\n"; res.body = ss.str(); std::string response = res.to_string(); - boost::asio::async_write(*m_sock, boost::asio::buffer(response), + boost::asio::async_write(*m_sock, boost::asio::buffer(response), boost::asio::transfer_all(), std::bind(&HTTPReqHandler::SentHTTPFailed, shared_from_this(), std::placeholders::_1)); } @@ -199,7 +227,6 @@ namespace proxy { bool HTTPReqHandler::HandleRequest() { i2p::http::HTTPReq req; - i2p::http::URL url; std::string b64; int req_len = 0; @@ -216,14 +243,14 @@ namespace proxy { /* parsing success, now let's look inside request */ LogPrint(eLogDebug, "HTTPProxy: requested: ", req.uri); - url.parse(req.uri); + m_RequestURL.parse(req.uri); - if (ExtractAddressHelper(url, b64)) { - i2p::client::context.GetAddressBook ().InsertAddress (url.host, b64); - LogPrint (eLogInfo, "HTTPProxy: added b64 from addresshelper for ", url.host); - std::string full_url = url.to_string(); + if (ExtractAddressHelper(m_RequestURL, b64)) { + i2p::client::context.GetAddressBook ().InsertAddress (m_RequestURL.host, b64); + LogPrint (eLogInfo, "HTTPProxy: added b64 from addresshelper for ", m_RequestURL.host); + std::string full_url = m_RequestURL.to_string(); std::stringstream ss; - ss << "Host " << url.host << " added to router's addressbook from helper. " + ss << "Host " << m_RequestURL.host << " added to router's addressbook from helper. " << "Click here to proceed."; GenericProxyInfo("Addresshelper found", ss.str().c_str()); return true; /* request processed */ @@ -231,11 +258,11 @@ namespace proxy { SanitizeHTTPRequest(req); - std::string dest_host = url.host; - uint16_t dest_port = url.port; + std::string dest_host = m_RequestURL.host; + uint16_t dest_port = m_RequestURL.port; /* always set port, even if missing in request */ if (!dest_port) { - dest_port = (url.schema == "https") ? 443 : 80; + dest_port = (m_RequestURL.schema == "https") ? 443 : 80; } /* detect dest_host, set proper 'Host' header in upstream request */ auto h = req.headers.find("Host"); @@ -267,16 +294,26 @@ namespace proxy { } /* TODO: outproxy handler here */ } else { - LogPrint (eLogWarning, "HTTPProxy: outproxy failure for ", dest_host, ": not implemented yet"); - std::string message = "Host" + dest_host + "not inside I2P network, but outproxy support not implemented yet"; - GenericProxyError("Outproxy failure", message.c_str()); + std::string outproxyUrl; i2p::config::GetOption("httpproxy.outproxy", outproxyUrl); + if(outproxyUrl.size()) { + m_HTTPMethod = req.method; + LogPrint (eLogDebug, "HTTPProxy: use outproxy ", outproxyUrl); + if(m_ProxyURL.parse(outproxyUrl)) + ForwardToUpstreamProxy(); + else + GenericProxyError("Outproxy failure", "bad outproxy settings"); + } else { + LogPrint (eLogWarning, "HTTPProxy: outproxy failure for ", dest_host, ": not implemented yet"); + std::string message = "Host" + dest_host + "not inside I2P network, but outproxy support not implemented yet"; + GenericProxyError("Outproxy failure", message.c_str()); + } return true; } /* make relative url */ - url.schema = ""; - url.host = ""; - req.uri = url.to_string(); + m_RequestURL.schema = ""; + m_RequestURL.host = ""; + req.uri = m_RequestURL.to_string(); /* drop original request from recv buffer */ m_recv_buf.erase(0, req_len); @@ -290,6 +327,124 @@ namespace proxy { return true; } + void HTTPReqHandler::ForwardToUpstreamProxy() + { + LogPrint(eLogDebug, "HTTPProxy: forward to upstream"); + // assume http if empty schema + if (m_ProxyURL.schema == "" || m_ProxyURL.schema == "http") { + // handle upstream http proxy + if (!m_ProxyURL.port) m_ProxyURL.port = 80; + boost::asio::ip::tcp::resolver::query q(m_ProxyURL.host, std::to_string(m_ProxyURL.port)); + m_proxy_resolver.async_resolve(q, std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) { + m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamHTTPProxyConnect, this, std::placeholders::_1)); + })); + } else if (m_ProxyURL.schema == "socks") { + // handle upstream socks proxy + if (!m_ProxyURL.port) m_ProxyURL.port = 9050; // default to tor default if not specified + boost::asio::ip::tcp::resolver::query q(m_ProxyURL.host, std::to_string(m_ProxyURL.port)); + m_proxy_resolver.async_resolve(q, std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) { + m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamSocksProxyConnect, this, std::placeholders::_1)); + })); + } else { + // unknown type, complain + GenericProxyError("unknown outproxy url", m_ProxyURL.to_string().c_str()); + } + } + + void HTTPReqHandler::HandleUpstreamProxyResolved(const boost::system::error_code & ec, boost::asio::ip::tcp::resolver::iterator it, ProxyResolvedHandler handler) + { + if(ec) GenericProxyError("cannot resolve upstream proxy", ec.message().c_str()); + else handler(*it); + } + + void HTTPReqHandler::HandleUpstreamSocksProxyConnect(const boost::system::error_code & ec) + { + if(!ec) { + if(m_RequestURL.host.size() > 255) { + GenericProxyError("hostname too long", m_RequestURL.host.c_str()); + return; + } + uint16_t port = m_RequestURL.port; + LogPrint(eLogDebug, "HTTPProxy: connected to socks upstream"); + if(m_HTTPMethod == "CONNECT") { + std::string host = m_RequestURL.host; + std::size_t reqsize = 0; + m_socks_buf[0] = '\x04'; + m_socks_buf[1] = 1; + htobe16buf(m_socks_buf+2, port); + m_socks_buf[4] = 0; + m_socks_buf[5] = 0; + m_socks_buf[6] = 0; + m_socks_buf[7] = 1; + // user id + m_socks_buf[8] = 'i'; + m_socks_buf[9] = '2'; + m_socks_buf[10] = 'p'; + m_socks_buf[11] = 'd'; + m_socks_buf[12] = 0; + reqsize += 13; + memcpy(m_socks_buf+ reqsize, host.c_str(), host.size()); + reqsize += host.size(); + m_socks_buf[++reqsize] = 0; + m_proxysock->async_write_some(boost::asio::buffer(m_socks_buf, reqsize), std::bind(&HTTPReqHandler::HandleSocksProxySendHandshake, this, std::placeholders::_1, std::placeholders::_2)); + } else { + GenericProxyError("unsupported http method", m_HTTPMethod.c_str()); + } + } else GenericProxyError("cannot connect to upstream socks proxy", ec.message().c_str()); + } + + void HTTPReqHandler::HandleSocksProxySendHandshake(const boost::system::error_code & ec, std::size_t bytes_transferred) + { + if(ec) GenericProxyError("Cannot negotiate with socks proxy", ec.message().c_str()); + else m_proxysock->async_read_some(boost::asio::buffer(m_socks_buf, 8), std::bind(&HTTPReqHandler::HandleSocksProxyReply, this, std::placeholders::_1, std::placeholders::_2)); + } + + void HTTPReqHandler::HandoverToUpstreamProxy() + { + auto connection = std::make_shared(GetOwner(), m_proxysock, m_sock); + m_sock = nullptr; + m_proxysock = nullptr; + GetOwner()->AddHandler(connection); + connection->Start(); + Terminate(); + } + + void HTTPReqHandler::SocksProxySuccess() + { + i2p::http::HTTPRes res; + res.code = 200; + std::string response = res.to_string(); + boost::asio::async_write(*m_sock, boost::asio::buffer(response), boost::asio::transfer_all(), [&] (const boost::system::error_code & ec, std::size_t transferred) { + if(ec) GenericProxyError("socks proxy error", ec.message().c_str()); + else HandoverToUpstreamProxy(); + }); + } + + void HTTPReqHandler::HandleSocksProxyReply(const boost::system::error_code & ec, std::size_t bytes_transferred) + { + if(!ec) + { + if(m_socks_buf[1] == 90) { + // success + SocksProxySuccess(); + } else { + std::stringstream ss; + ss << (int) m_socks_buf[1]; + std::string msg = ss.str(); + GenericProxyError("Socks Proxy error", msg.c_str()); + } + } + else GenericProxyError("No Reply From socks proxy", ec.message().c_str()); + } + + void HTTPReqHandler::HandleUpstreamHTTPProxyConnect(const boost::system::error_code & ec) + { + if(!ec) { + LogPrint(eLogDebug, "HTTPProxy: connected to http upstream"); + GenericProxyError("cannot connect", "http out proxy not implemented"); + } else GenericProxyError("cannot connect to upstream http proxy", ec.message().c_str()); + } + /* will be called after some data received from client */ void HTTPReqHandler::HandleSockRecv(const boost::system::error_code & ecode, std::size_t len) { diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 93bcff0c..e5c8dc0b 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -540,15 +540,28 @@ namespace client void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) { std::lock_guard lock(m_SessionsMutex); uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); - std::remove_if(m_Sessions.begin(), m_Sessions.end(), [now, delta](const UDPSession * u) -> bool { + std::remove_if(m_Sessions.begin(), m_Sessions.end(), [now, delta](const std::shared_ptr & u) -> bool { return now - u->LastActivity >= delta; }); } - - UDPSession * I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) + + void I2PUDPClientTunnel::ExpireStale(const uint64_t delta) { + std::lock_guard lock(m_SessionsMutex); + uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); + std::vector removePorts; + for (const auto & s : m_Sessions) { + if (now - s.second.second >= delta) + removePorts.push_back(s.first); + } + for(auto port : removePorts) { + m_Sessions.erase(port); + } + } + + std::shared_ptr I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) { auto ih = from.GetIdentHash(); - for ( UDPSession * s : m_Sessions ) + for (auto & s : m_Sessions ) { if ( s->Identity == ih) { @@ -559,8 +572,9 @@ namespace client } /** create new udp session */ boost::asio::ip::udp::endpoint ep(m_LocalAddress, 0); - m_Sessions.push_back(new UDPSession(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort)); - return m_Sessions.back(); + m_Sessions.push_back(std::make_shared(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort)); + auto & back = m_Sessions.back(); + return back; } UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint, @@ -568,7 +582,6 @@ namespace client boost::asio::ip::udp::endpoint endpoint, const i2p::data::IdentHash * to, uint16_t ourPort, uint16_t theirPort) : m_Destination(localDestination->GetDatagramDestination()), - m_Service(localDestination->GetService()), IPSocket(localDestination->GetService(), localEndpoint), SendEndpoint(endpoint), LastActivity(i2p::util::GetMillisecondsSinceEpoch()), @@ -592,7 +605,7 @@ namespace client { LogPrint(eLogDebug, "UDPSession: forward ", len, "B from ", FromEndpoint); LastActivity = i2p::util::GetMillisecondsSinceEpoch(); - m_Destination->SendDatagramTo(m_Buffer, len, Identity, 0, 0); + m_Destination->SendDatagramTo(m_Buffer, len, Identity, LocalPort, RemotePort); Receive(); } else { LogPrint(eLogError, "UDPSession: ", ecode.message()); @@ -602,9 +615,8 @@ namespace client I2PUDPServerTunnel::I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, - const boost::asio::ip::address& localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) : + boost::asio::ip::address localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) : m_Name(name), - LocalPort(port), m_LocalAddress(localAddress), m_RemoteEndpoint(forwardTo) { @@ -630,7 +642,7 @@ namespace client { std::vector > sessions; std::lock_guard lock(m_SessionsMutex); - for ( UDPSession * s : m_Sessions ) + for (auto & s : m_Sessions ) { if (!s->m_Destination) continue; auto info = s->m_Destination->GetInfoForRemote(s->Identity); @@ -652,13 +664,12 @@ namespace client std::shared_ptr localDestination, uint16_t remotePort) : m_Name(name), - m_Session(nullptr), m_RemoteDest(remoteDest), m_LocalDest(localDestination), m_LocalEndpoint(localEndpoint), m_RemoteIdent(nullptr), m_ResolveThread(nullptr), - LocalPort(localEndpoint.port()), + m_LocalSocket(localDestination->GetService(), localEndpoint), RemotePort(remotePort), m_cancel_resolve(false) { @@ -675,29 +686,44 @@ namespace client m_LocalDest->Start(); if (m_ResolveThread == nullptr) m_ResolveThread = new std::thread(std::bind(&I2PUDPClientTunnel::TryResolving, this)); + RecvFromLocal(); } + void I2PUDPClientTunnel::RecvFromLocal() + { + m_LocalSocket.async_receive_from(boost::asio::buffer(m_RecvBuff, I2P_UDP_MAX_MTU), + m_RecvEndpoint, std::bind(&I2PUDPClientTunnel::HandleRecvFromLocal, this, std::placeholders::_1, std::placeholders::_2)); + } + + void I2PUDPClientTunnel::HandleRecvFromLocal(const boost::system::error_code & ec, std::size_t transferred) + { + if(ec) { + LogPrint(eLogError, "UDP Client: ", ec.message()); + return; + } + if(!m_RemoteIdent) { + LogPrint(eLogWarning, "UDP Client: remote endpoint not resolved yet"); + RecvFromLocal(); + return; // drop, remote not resolved + } + auto remotePort = m_RecvEndpoint.port(); + auto itr = m_Sessions.find(remotePort); + if (itr == m_Sessions.end()) { + // track new udp convo + m_Sessions[remotePort] = {boost::asio::ip::udp::endpoint(m_RecvEndpoint), 0}; + } + // send off to remote i2p destination + LogPrint(eLogDebug, "UDP Client: send ", transferred, " to ", m_RemoteIdent->ToBase32(), ":", RemotePort); + m_LocalDest->GetDatagramDestination()->SendDatagramTo(m_RecvBuff, transferred, *m_RemoteIdent, remotePort, RemotePort); + // mark convo as active + m_Sessions[remotePort].second = i2p::util::GetMillisecondsSinceEpoch(); + RecvFromLocal(); + } + std::vector > I2PUDPClientTunnel::GetSessions() { + // TODO: implement std::vector > infos; - if(m_Session && m_LocalDest) - { - auto s = m_Session; - if (s->m_Destination) - { - auto info = m_Session->m_Destination->GetInfoForRemote(s->Identity); - if(info) - { - auto sinfo = std::make_shared(); - sinfo->Name = m_Name; - sinfo->LocalIdent = std::make_shared(m_LocalDest->GetIdentHash().data()); - sinfo->RemoteIdent = std::make_shared(s->Identity.data()); - sinfo->CurrentIBGW = info->IBGW; - sinfo->CurrentOBEP = info->OBEP; - infos.push_back(sinfo); - } - } - } return infos; } @@ -717,26 +743,28 @@ namespace client return; } LogPrint(eLogInfo, "UDP Tunnel: resolved ", m_RemoteDest, " to ", m_RemoteIdent->ToBase32()); - // delete existing session - if(m_Session) delete m_Session; - - boost::asio::ip::udp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 0); - m_Session = new UDPSession(m_LocalEndpoint, m_LocalDest, ep, m_RemoteIdent, LocalPort, RemotePort); } void I2PUDPClientTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { if(m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent) { - // address match - if(m_Session) + auto itr = m_Sessions.find(toPort); + // found convo ? + if(itr != m_Sessions.end()) { - // tell session - LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32()); - m_Session->IPSocket.send_to(boost::asio::buffer(buf, len), m_Session->FromEndpoint); + // found convo + if (len > 0) { + LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32()); + uint8_t sendbuf[len]; + memcpy(sendbuf, buf, len); + m_LocalSocket.send_to(boost::asio::buffer(buf, len), itr->second.first); + // mark convo as active + itr->second.second = i2p::util::GetMillisecondsSinceEpoch(); + } } else - LogPrint(eLogWarning, "UDP Client: no session"); + LogPrint(eLogWarning, "UDP Client: not tracking udp session using port ", (int) toPort); } else LogPrint(eLogWarning, "UDP Client: unwarrented traffic from ", from.GetIdentHash().ToBase32()); @@ -747,7 +775,11 @@ namespace client auto dgram = m_LocalDest->GetDatagramDestination(); if (dgram) dgram->ResetReceiver(); - if (m_Session) delete m_Session; + m_Sessions.clear(); + + if(m_LocalSocket.is_open()) + m_LocalSocket.close(); + m_cancel_resolve = true; if(m_ResolveThread) diff --git a/I2PTunnel.h b/I2PTunnel.h index e6f0e84f..b49efe40 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -141,7 +142,6 @@ namespace client struct UDPSession { i2p::datagram::DatagramDestination * m_Destination; - boost::asio::io_service & m_Service; boost::asio::ip::udp::socket IPSocket; i2p::data::IdentHash Identity; boost::asio::ip::udp::endpoint FromEndpoint; @@ -189,7 +189,7 @@ namespace client public: I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, - const boost::asio::ip::address & localAddress, + boost::asio::ip::address localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port); ~I2PUDPServerTunnel(); /** expire stale udp conversations */ @@ -202,15 +202,14 @@ namespace client private: void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); - UDPSession * ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); + std::shared_ptr ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); private: const std::string m_Name; - const uint16_t LocalPort; boost::asio::ip::address m_LocalAddress; boost::asio::ip::udp::endpoint m_RemoteEndpoint; std::mutex m_SessionsMutex; - std::vector m_Sessions; + std::vector > m_Sessions; std::shared_ptr m_LocalDest; }; @@ -228,18 +227,25 @@ namespace client bool IsLocalDestination(const i2p::data::IdentHash & destination) const { return destination == m_LocalDest->GetIdentHash(); } std::shared_ptr GetLocalDestination () const { return m_LocalDest; } + void ExpireStale(const uint64_t delta=I2P_UDP_SESSION_TIMEOUT); private: + typedef std::pair UDPConvo; + void RecvFromLocal(); + void HandleRecvFromLocal(const boost::system::error_code & e, std::size_t transferred); void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); void TryResolving(); const std::string m_Name; - UDPSession * m_Session; + std::mutex m_SessionsMutex; + std::map m_Sessions; // maps i2p port -> local udp convo const std::string m_RemoteDest; std::shared_ptr m_LocalDest; const boost::asio::ip::udp::endpoint m_LocalEndpoint; i2p::data::IdentHash * m_RemoteIdent; std::thread * m_ResolveThread; - uint16_t LocalPort; + boost::asio::ip::udp::socket m_LocalSocket; + boost::asio::ip::udp::endpoint m_RecvEndpoint; + uint8_t m_RecvBuff[I2P_UDP_MAX_MTU]; uint16_t RemotePort; bool m_cancel_resolve; }; diff --git a/Tunnel.cpp b/Tunnel.cpp index e1f5c035..e271052e 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -612,6 +612,7 @@ namespace tunnel for (auto it = pendingTunnels.begin (); it != pendingTunnels.end ();) { auto tunnel = it->second; + auto pool = tunnel->GetTunnelPool(); switch (tunnel->GetState ()) { case eTunnelStatePending: @@ -637,6 +638,8 @@ namespace tunnel #ifdef WITH_EVENTS EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed); #endif + // for i2lua + if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultTimeout); // delete it = pendingTunnels.erase (it); m_NumFailedTunnelCreations++; @@ -649,6 +652,9 @@ namespace tunnel #ifdef WITH_EVENTS EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed); #endif + // for i2lua + if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultRejected); + it = pendingTunnels.erase (it); m_NumFailedTunnelCreations++; break; diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 07057918..dfdfbf58 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -81,6 +81,8 @@ namespace tunnel } if (m_LocalDestination) m_LocalDestination->SetLeaseSetUpdated (); + + OnTunnelBuildResult(createdTunnel, eBuildResultOkay); } void TunnelPool::TunnelExpired (std::shared_ptr expiredTunnel) @@ -109,6 +111,8 @@ namespace tunnel std::unique_lock l(m_OutboundTunnelsMutex); m_OutboundTunnels.insert (createdTunnel); } + OnTunnelBuildResult(createdTunnel, eBuildResultOkay); + //CreatePairedInboundTunnel (createdTunnel); } @@ -579,5 +583,11 @@ namespace tunnel } return tun; } + + void TunnelPool::OnTunnelBuildResult(std::shared_ptr tunnel, TunnelBuildResult result) + { + auto peers = tunnel->GetPeers(); + if(m_CustomPeerSelector) m_CustomPeerSelector->OnBuildResult(peers, tunnel->IsInbound(), result); + } } } diff --git a/TunnelPool.h b/TunnelPool.h index 6a73bd67..9e2a3e24 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -23,12 +23,21 @@ namespace tunnel class InboundTunnel; class OutboundTunnel; + + enum TunnelBuildResult { + eBuildResultOkay, // tunnel was built okay + eBuildResultRejected, // tunnel build was explicitly rejected + eBuildResultTimeout // tunnel build timed out + }; + /** interface for custom tunnel peer selection algorithm */ struct ITunnelPeerSelector { typedef std::shared_ptr Peer; typedef std::vector TunnelPath; + virtual bool SelectPeers(TunnelPath & peers, int hops, bool isInbound) = 0; + virtual bool OnBuildResult(TunnelPath & peers, bool isInbound, TunnelBuildResult result) = 0; }; typedef std::shared_ptr TunnelPeerSelector; @@ -79,6 +88,8 @@ namespace tunnel /** @brief get the lowest latency tunnel in this tunnel pool regardless of latency requirements */ std::shared_ptr GetLowestLatencyInboundTunnel(std::shared_ptr exclude=nullptr) const; std::shared_ptr GetLowestLatencyOutboundTunnel(std::shared_ptr exclude=nullptr) const; + + void OnTunnelBuildResult(std::shared_ptr tunnel, TunnelBuildResult result); private: