separate SSU sessions lists for V4 and V6

This commit is contained in:
orignal 2015-11-30 15:53:07 -05:00
parent 2b8e662f81
commit c5308e3f2f
4 changed files with 46 additions and 38 deletions

View file

@ -772,7 +772,6 @@ namespace util
s << "<br>SSU<br>"; s << "<br>SSU<br>";
for (auto it: ssuServer->GetSessions ()) for (auto it: ssuServer->GetSessions ())
{ {
// incoming connections don't have remote router
auto endpoint = it.second->GetRemoteEndpoint (); auto endpoint = it.second->GetRemoteEndpoint ();
if (it.second->IsOutgoing ()) s << "-->"; if (it.second->IsOutgoing ()) s << "-->";
s << endpoint.address ().to_string () << ":" << endpoint.port (); s << endpoint.address ().to_string () << ":" << endpoint.port ();
@ -780,8 +779,17 @@ namespace util
s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]"; s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]";
if (it.second->GetRelayTag ()) if (it.second->GetRelayTag ())
s << " [itag:" << it.second->GetRelayTag () << "]"; s << " [itag:" << it.second->GetRelayTag () << "]";
s << "<br>"; s << "<br>" << std::endl;
s << std::endl; }
s << "<br>SSU6<br>";
for (auto it: ssuServer->GetSessionsV6 ())
{
auto endpoint = it.second->GetRemoteEndpoint ();
if (it.second->IsOutgoing ()) s << "-->";
s << endpoint.address ().to_string () << ":" << endpoint.port ();
if (!it.second->IsOutgoing ()) s << "-->";
s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]";
s << "<br>" << std::endl;
} }
} }
} }

57
SSU.cpp
View file

@ -174,7 +174,7 @@ namespace transport
moreBytes = m_Socket.available(); moreBytes = m_Socket.available();
} }
m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets)); m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, m_Sessions));
Receive (); Receive ();
} }
else else
@ -201,7 +201,7 @@ namespace transport
moreBytes = m_SocketV6.available(); moreBytes = m_SocketV6.available();
} }
m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets)); m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, m_SessionsV6));
ReceiveV6 (); ReceiveV6 ();
} }
else else
@ -211,7 +211,8 @@ namespace transport
} }
} }
void SSUServer::HandleReceivedPackets (std::vector<SSUPacket *> packets) void SSUServer::HandleReceivedPackets (std::vector<SSUPacket *> packets,
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> >& sessions)
{ {
std::shared_ptr<SSUSession> session; std::shared_ptr<SSUSession> session;
for (auto it1: packets) for (auto it1: packets)
@ -222,17 +223,14 @@ namespace transport
if (!session || session->GetRemoteEndpoint () != packet->from) // we received packet for other session than previous if (!session || session->GetRemoteEndpoint () != packet->from) // we received packet for other session than previous
{ {
if (session) session->FlushData (); if (session) session->FlushData ();
auto it = m_Sessions.find (packet->from); auto it = sessions.find (packet->from);
if (it != m_Sessions.end ()) if (it != sessions.end ())
session = it->second; session = it->second;
if (!session) if (!session)
{ {
session = std::make_shared<SSUSession> (*this, packet->from); session = std::make_shared<SSUSession> (*this, packet->from);
session->WaitForConnect (); session->WaitForConnect ();
{ sessions[packet->from] = session;
std::unique_lock<std::mutex> l(m_SessionsMutex);
m_Sessions[packet->from] = session;
}
LogPrint (eLogInfo, "New SSU session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created"); LogPrint (eLogInfo, "New SSU session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created");
} }
} }
@ -265,8 +263,9 @@ namespace transport
std::shared_ptr<SSUSession> SSUServer::FindSession (const boost::asio::ip::udp::endpoint& e) const std::shared_ptr<SSUSession> SSUServer::FindSession (const boost::asio::ip::udp::endpoint& e) const
{ {
auto it = m_Sessions.find (e); auto& sessions = e.address ().is_v6 () ? m_SessionsV6 : m_Sessions;
if (it != m_Sessions.end ()) auto it = sessions.find (e);
if (it != sessions.end ())
return it->second; return it->second;
else else
return nullptr; return nullptr;
@ -295,8 +294,9 @@ namespace transport
void SSUServer::CreateDirectSession (std::shared_ptr<const i2p::data::RouterInfo> router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest) void SSUServer::CreateDirectSession (std::shared_ptr<const i2p::data::RouterInfo> router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest)
{ {
auto it = m_Sessions.find (remoteEndpoint); auto& sessions = remoteEndpoint.address ().is_v6 () ? m_SessionsV6 : m_Sessions;
if (it != m_Sessions.end ()) auto it = sessions.find (remoteEndpoint);
if (it != sessions.end ())
{ {
auto session = it->second; auto session = it->second;
if (peerTest && session->GetState () == eSessionStateEstablished) if (peerTest && session->GetState () == eSessionStateEstablished)
@ -306,10 +306,7 @@ namespace transport
{ {
// otherwise create new session // otherwise create new session
auto session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest); auto session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest);
{ sessions[remoteEndpoint] = session;
std::unique_lock<std::mutex> l(m_SessionsMutex);
m_Sessions[remoteEndpoint] = session;
}
// connect // connect
LogPrint ("Creating new SSU session to [", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), "] ", LogPrint ("Creating new SSU session to [", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), "] ",
remoteEndpoint.address ().to_string (), ":", remoteEndpoint.port ()); remoteEndpoint.address ().to_string (), ":", remoteEndpoint.port ());
@ -360,15 +357,11 @@ namespace transport
introducer = &(address->introducers[0]); // TODO: introducer = &(address->introducers[0]); // TODO:
boost::asio::ip::udp::endpoint introducerEndpoint (introducer->iHost, introducer->iPort); boost::asio::ip::udp::endpoint introducerEndpoint (introducer->iHost, introducer->iPort);
introducerSession = std::make_shared<SSUSession> (*this, introducerEndpoint, router); introducerSession = std::make_shared<SSUSession> (*this, introducerEndpoint, router);
std::unique_lock<std::mutex> l(m_SessionsMutex);
m_Sessions[introducerEndpoint] = introducerSession; m_Sessions[introducerEndpoint] = introducerSession;
} }
// create session // create session
auto session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest); auto session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest);
{ m_Sessions[remoteEndpoint] = session;
std::unique_lock<std::mutex> l(m_SessionsMutex);
m_Sessions[remoteEndpoint] = session;
}
// introduce // introduce
LogPrint ("Introduce new SSU session to [", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), LogPrint ("Introduce new SSU session to [", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()),
"] through introducer ", introducer->iHost, ":", introducer->iPort); "] through introducer ", introducer->iHost, ":", introducer->iPort);
@ -393,21 +386,27 @@ namespace transport
if (session) if (session)
{ {
session->Close (); session->Close ();
std::unique_lock<std::mutex> l(m_SessionsMutex); auto& ep = session->GetRemoteEndpoint ();
m_Sessions.erase (session->GetRemoteEndpoint ()); if (ep.address ().is_v6 ())
m_SessionsV6.erase (ep);
else
m_Sessions.erase (ep);
} }
} }
void SSUServer::DeleteAllSessions () void SSUServer::DeleteAllSessions ()
{ {
std::unique_lock<std::mutex> l(m_SessionsMutex);
for (auto it: m_Sessions) for (auto it: m_Sessions)
it.second->Close (); it.second->Close ();
m_Sessions.clear (); m_Sessions.clear ();
for (auto it: m_SessionsV6)
it.second->Close ();
m_SessionsV6.clear ();
} }
template<typename Filter> template<typename Filter>
std::shared_ptr<SSUSession> SSUServer::GetRandomSession (Filter filter) std::shared_ptr<SSUSession> SSUServer::GetRandomV4Session (Filter filter) // v4 only
{ {
std::vector<std::shared_ptr<SSUSession> > filteredSessions; std::vector<std::shared_ptr<SSUSession> > filteredSessions;
for (auto s :m_Sessions) for (auto s :m_Sessions)
@ -420,9 +419,9 @@ namespace transport
return nullptr; return nullptr;
} }
std::shared_ptr<SSUSession> SSUServer::GetRandomEstablishedSession (std::shared_ptr<const SSUSession> excluded) std::shared_ptr<SSUSession> SSUServer::GetRandomEstablishedV4Session (std::shared_ptr<const SSUSession> excluded) // v4 only
{ {
return GetRandomSession ( return GetRandomV4Session (
[excluded](std::shared_ptr<SSUSession> session)->bool [excluded](std::shared_ptr<SSUSession> session)->bool
{ {
return session->GetState () == eSessionStateEstablished && !session->IsV6 () && return session->GetState () == eSessionStateEstablished && !session->IsV6 () &&
@ -437,7 +436,7 @@ namespace transport
std::set<SSUSession *> ret; std::set<SSUSession *> ret;
for (int i = 0; i < maxNumIntroducers; i++) for (int i = 0; i < maxNumIntroducers; i++)
{ {
auto session = GetRandomSession ( auto session = GetRandomV4Session (
[&ret, ts](std::shared_ptr<SSUSession> session)->bool [&ret, ts](std::shared_ptr<SSUSession> session)->bool
{ {
return session->GetRelayTag () && !ret.count (session.get ()) && return session->GetRelayTag () && !ret.count (session.get ()) &&

11
SSU.h
View file

@ -43,7 +43,7 @@ namespace transport
void CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest = false); void CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest = false);
std::shared_ptr<SSUSession> FindSession (std::shared_ptr<const i2p::data::RouterInfo> router) const; std::shared_ptr<SSUSession> FindSession (std::shared_ptr<const i2p::data::RouterInfo> router) const;
std::shared_ptr<SSUSession> FindSession (const boost::asio::ip::udp::endpoint& e) const; std::shared_ptr<SSUSession> FindSession (const boost::asio::ip::udp::endpoint& e) const;
std::shared_ptr<SSUSession> GetRandomEstablishedSession (std::shared_ptr<const SSUSession> excluded); std::shared_ptr<SSUSession> GetRandomEstablishedV4Session (std::shared_ptr<const SSUSession> excluded);
void DeleteSession (std::shared_ptr<SSUSession> session); void DeleteSession (std::shared_ptr<SSUSession> session);
void DeleteAllSessions (); void DeleteAllSessions ();
@ -69,12 +69,13 @@ namespace transport
void ReceiveV6 (); void ReceiveV6 ();
void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet); void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet);
void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet); void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet);
void HandleReceivedPackets (std::vector<SSUPacket *> packets); void HandleReceivedPackets (std::vector<SSUPacket *> packets,
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> >& sessions);
void CreateSessionThroughIntroducer (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest = false); void CreateSessionThroughIntroducer (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest = false);
void CreateDirectSession (std::shared_ptr<const i2p::data::RouterInfo> router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest); void CreateDirectSession (std::shared_ptr<const i2p::data::RouterInfo> router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest);
template<typename Filter> template<typename Filter>
std::shared_ptr<SSUSession> GetRandomSession (Filter filter); std::shared_ptr<SSUSession> GetRandomV4Session (Filter filter);
std::set<SSUSession *> FindIntroducers (int maxNumIntroducers); std::set<SSUSession *> FindIntroducers (int maxNumIntroducers);
void ScheduleIntroducersUpdateTimer (); void ScheduleIntroducersUpdateTimer ();
@ -100,14 +101,14 @@ namespace transport
boost::asio::ip::udp::socket m_Socket, m_SocketV6; boost::asio::ip::udp::socket m_Socket, m_SocketV6;
boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer; boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer;
std::list<boost::asio::ip::udp::endpoint> m_Introducers; // introducers we are connected to std::list<boost::asio::ip::udp::endpoint> m_Introducers; // introducers we are connected to
mutable std::mutex m_SessionsMutex; std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions, m_SessionsV6;
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions;
std::map<uint32_t, boost::asio::ip::udp::endpoint> m_Relays; // we are introducer std::map<uint32_t, boost::asio::ip::udp::endpoint> m_Relays; // we are introducer
std::map<uint32_t, PeerTest> m_PeerTests; // nonce -> creation time in milliseconds std::map<uint32_t, PeerTest> m_PeerTests; // nonce -> creation time in milliseconds
public: public:
// for HTTP only // for HTTP only
const decltype(m_Sessions)& GetSessions () const { return m_Sessions; }; const decltype(m_Sessions)& GetSessions () const { return m_Sessions; };
const decltype(m_SessionsV6)& GetSessionsV6 () const { return m_SessionsV6; };
}; };
} }
} }

View file

@ -931,7 +931,7 @@ namespace transport
else else
{ {
LogPrint (eLogDebug, "SSU peer test from Alice. We are Bob"); LogPrint (eLogDebug, "SSU peer test from Alice. We are Bob");
auto session = m_Server.GetRandomEstablishedSession (shared_from_this ()); // Charlie auto session = m_Server.GetRandomEstablishedV4Session (shared_from_this ()); // Charlie, TODO: implement v6 support
if (session) if (session)
{ {
m_Server.NewPeerTest (nonce, ePeerTestParticipantBob, shared_from_this ()); m_Server.NewPeerTest (nonce, ePeerTestParticipantBob, shared_from_this ());