Merge remote-tracking branch 'purple/openssl'

This commit is contained in:
Jeff Becker 2016-12-30 04:46:55 -05:00
commit be0c1c0912
16 changed files with 154 additions and 149 deletions

View file

@ -636,7 +636,11 @@ namespace client
if (address.length () > 0) if (address.length () > 0)
{ {
// TODO: verify from // TODO: verify from
m_Addresses[address] = buf + 8; i2p::data::IdentHash hash(buf + 8);
if (!hash.IsZero ())
m_Addresses[address] = hash;
else
LogPrint (eLogInfo, "AddressBook: Lookup response: ", address, " not found");
} }
} }

View file

@ -114,6 +114,7 @@ namespace client
const decltype(m_ClientForwards)& GetClientForwards () const { return m_ClientForwards; } const decltype(m_ClientForwards)& GetClientForwards () const { return m_ClientForwards; }
const decltype(m_ServerForwards)& GetServerForwards () const { return m_ServerForwards; } const decltype(m_ServerForwards)& GetServerForwards () const { return m_ServerForwards; }
const i2p::proxy::HTTPProxy * GetHttpProxy () const { return m_HttpProxy; } const i2p::proxy::HTTPProxy * GetHttpProxy () const { return m_HttpProxy; }
const i2p::proxy::SOCKSProxy * GetSocksProxy () const { return m_SocksProxy; }
}; };
extern ClientContext context; extern ClientContext context;

View file

@ -76,12 +76,10 @@ namespace i2p
return false; return false;
} }
#if !defined(__OpenBSD__)
// point std{in,out,err} descriptors to /dev/null // point std{in,out,err} descriptors to /dev/null
stdin = freopen("/dev/null", "r", stdin); freopen("/dev/null", "r", stdin);
stdout = freopen("/dev/null", "w", stdout); freopen("/dev/null", "w", stdout);
stderr = freopen("/dev/null", "w", stderr); freopen("/dev/null", "w", stderr);
#endif
} }
// set proc limits // set proc limits

View file

@ -168,7 +168,8 @@ namespace datagram
const i2p::data::IdentHash & remoteIdent) : const i2p::data::IdentHash & remoteIdent) :
m_LocalDestination(localDestination), m_LocalDestination(localDestination),
m_RemoteIdent(remoteIdent), m_RemoteIdent(remoteIdent),
m_SendQueueTimer(localDestination->GetService()) m_SendQueueTimer(localDestination->GetService()),
m_RequestingLS(false)
{ {
m_LastUse = i2p::util::GetMillisecondsSinceEpoch (); m_LastUse = i2p::util::GetMillisecondsSinceEpoch ();
ScheduleFlushSendQueue(); ScheduleFlushSendQueue();
@ -221,7 +222,10 @@ namespace datagram
} }
if(!m_RemoteLeaseSet) { if(!m_RemoteLeaseSet) {
// no remote lease set // no remote lease set
m_LocalDestination->RequestDestination(m_RemoteIdent, std::bind(&DatagramSession::HandleLeaseSetUpdated, this, std::placeholders::_1)); if(!m_RequestingLS) {
m_RequestingLS = true;
m_LocalDestination->RequestDestination(m_RemoteIdent, std::bind(&DatagramSession::HandleLeaseSetUpdated, this, std::placeholders::_1));
}
return nullptr; return nullptr;
} }
m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true);
@ -290,6 +294,7 @@ namespace datagram
void DatagramSession::HandleLeaseSetUpdated(std::shared_ptr<i2p::data::LeaseSet> ls) void DatagramSession::HandleLeaseSetUpdated(std::shared_ptr<i2p::data::LeaseSet> ls)
{ {
m_RequestingLS = false;
if(!ls) return; if(!ls) return;
// only update lease set if found and newer than previous lease set // only update lease set if found and newer than previous lease set
uint64_t oldExpire = 0; uint64_t oldExpire = 0;

View file

@ -88,7 +88,7 @@ namespace datagram
boost::asio::deadline_timer m_SendQueueTimer; boost::asio::deadline_timer m_SendQueueTimer;
std::vector<std::shared_ptr<I2NPMessage> > m_SendQueue; std::vector<std::shared_ptr<I2NPMessage> > m_SendQueue;
uint64_t m_LastUse; uint64_t m_LastUse;
bool m_RequestingLS;
}; };
const size_t MAX_DATAGRAM_SIZE = 32768; const size_t MAX_DATAGRAM_SIZE = 32768;

View file

@ -844,6 +844,12 @@ namespace client
return false; return false;
} }
void ClientDestination::AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor)
{
if (m_StreamingDestination)
m_StreamingDestination->AcceptOnce (acceptor);
}
std::shared_ptr<i2p::stream::StreamingDestination> ClientDestination::CreateStreamingDestination (int port, bool gzip) std::shared_ptr<i2p::stream::StreamingDestination> ClientDestination::CreateStreamingDestination (int port, bool gzip)
{ {
auto dest = std::make_shared<i2p::stream::StreamingDestination> (GetSharedFromThis (), port, gzip); auto dest = std::make_shared<i2p::stream::StreamingDestination> (GetSharedFromThis (), port, gzip);

View file

@ -186,7 +186,8 @@ namespace client
void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor); void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor);
void StopAcceptingStreams (); void StopAcceptingStreams ();
bool IsAcceptingStreams () const; bool IsAcceptingStreams () const;
void AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor);
// datagram // datagram
i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; }; i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; };
i2p::datagram::DatagramDestination * CreateDatagramDestination (); i2p::datagram::DatagramDestination * CreateDatagramDestination ();

View file

@ -604,6 +604,15 @@ namespace http {
s << i2p::client::context.GetAddressBook ().ToAddress(ident); s << i2p::client::context.GetAddressBook ().ToAddress(ident);
s << "<br>\r\n"<< std::endl; s << "<br>\r\n"<< std::endl;
} }
auto socksProxy = i2p::client::context.GetSocksProxy ();
if (socksProxy)
{
auto& ident = socksProxy->GetLocalDestination ()->GetIdentHash();
s << "<a href=\"/?page=" << HTTP_PAGE_LOCAL_DESTINATION << "&b32=" << ident.ToBase32 () << "\">";
s << "SOCKS Proxy" << "</a> &#8656; ";
s << i2p::client::context.GetAddressBook ().ToAddress(ident);
s << "<br>\r\n"<< std::endl;
}
s << "<br>\r\n<b>Server Tunnels:</b><br>\r\n<br>\r\n"; s << "<br>\r\n<b>Server Tunnels:</b><br>\r\n<br>\r\n";
for (auto& it: i2p::client::context.GetServerTunnels ()) for (auto& it: i2p::client::context.GetServerTunnels ())
{ {
@ -775,6 +784,7 @@ namespace http {
{ {
uint32_t token; uint32_t token;
RAND_bytes ((uint8_t *)&token, 4); RAND_bytes ((uint8_t *)&token, 4);
token &= 0x7FFFFFFF; // clear first bit
auto ts = i2p::util::GetSecondsSinceEpoch (); auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto it = m_Tokens.begin (); it != m_Tokens.end (); ) for (auto it = m_Tokens.begin (); it != m_Tokens.end (); )
{ {

View file

@ -72,7 +72,6 @@ namespace log {
{ {
if (!m_IsRunning) if (!m_IsRunning)
{ {
Reopen ();
m_IsRunning = true; m_IsRunning = true;
m_Thread = new std::thread (std::bind (&Log::Run, this)); m_Thread = new std::thread (std::bind (&Log::Run, this));
} }
@ -162,6 +161,7 @@ namespace log {
void Log::Run () void Log::Run ()
{ {
Reopen ();
while (m_IsRunning) while (m_IsRunning)
{ {
std::shared_ptr<LogMsg> msg; std::shared_ptr<LogMsg> msg;

128
SAM.cpp
View file

@ -54,7 +54,11 @@ namespace client
case eSAMSocketTypeAcceptor: case eSAMSocketTypeAcceptor:
{ {
if (m_Session) if (m_Session)
{
m_Session->DelSocket (shared_from_this ()); m_Session->DelSocket (shared_from_this ());
if (m_Session->localDestination)
m_Session->localDestination->StopAcceptingStreams ();
}
break; break;
} }
default: default:
@ -104,10 +108,10 @@ namespace client
separator++; separator++;
std::map<std::string, std::string> params; std::map<std::string, std::string> params;
ExtractParams (separator, params); ExtractParams (separator, params);
auto it = params.find (SAM_PARAM_MAX); //auto it = params.find (SAM_PARAM_MAX);
// TODO: check MIN as well // TODO: check MIN as well
if (it != params.end ()) //if (it != params.end ())
version = it->second; // version = it->second;
} }
if (version[0] == '3') // we support v3 (3.0 and 3.1) only if (version[0] == '3') // we support v3 (3.0 and 3.1) only
{ {
@ -285,11 +289,6 @@ namespace client
dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (),
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
} }
else
{
// start accepting streams because we're not a datagram session
m_Session->localDestination->AcceptStreams (std::bind (&SAMSession::AcceptI2P, m_Session, std::placeholders::_1));
}
if (m_Session->localDestination->IsReady ()) if (m_Session->localDestination->IsReady ())
SendSessionCreateReplyOk (); SendSessionCreateReplyOk ();
@ -401,27 +400,17 @@ namespace client
m_ID = id; m_ID = id;
m_Session = m_Owner.FindSession (id); m_Session = m_Owner.FindSession (id);
if (m_Session) if (m_Session)
{ {
m_SocketType = eSAMSocketTypeAcceptor; m_SocketType = eSAMSocketTypeAcceptor;
m_Session->AddSocket (shared_from_this ()); m_Session->AddSocket (shared_from_this ());
if (!m_Session->localDestination->IsAcceptingStreams ())
m_Session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
} }
else else
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
} }
void SAMSocket::Accept(std::shared_ptr<i2p::stream::Stream> stream)
{
if(stream) {
m_SocketType = eSAMSocketTypeStream;
HandleI2PAccept(stream);
} else {
SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true);
auto s = shared_from_this ();
m_Owner.GetService ().post ([s] { s->Terminate (); });
}
}
size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data) size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data)
{ {
LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len); LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len);
@ -475,21 +464,21 @@ namespace client
std::string& name = params[SAM_PARAM_NAME]; std::string& name = params[SAM_PARAM_NAME];
std::shared_ptr<const i2p::data::IdentityEx> identity; std::shared_ptr<const i2p::data::IdentityEx> identity;
i2p::data::IdentHash ident; i2p::data::IdentHash ident;
auto dest = m_Session == nullptr ? context.GetSharedLocalDestination() : m_Session->localDestination;
if (name == "ME") if (name == "ME")
SendNamingLookupReply (m_Session->localDestination->GetIdentity ()); SendNamingLookupReply (dest->GetIdentity ());
else if ((identity = context.GetAddressBook ().GetAddress (name)) != nullptr) else if ((identity = context.GetAddressBook ().GetAddress (name)) != nullptr)
SendNamingLookupReply (identity); SendNamingLookupReply (identity);
else if (m_Session && m_Session->localDestination && else if (context.GetAddressBook ().GetIdentHash (name, ident))
context.GetAddressBook ().GetIdentHash (name, ident))
{ {
auto leaseSet = m_Session->localDestination->FindLeaseSet (ident); auto leaseSet = dest->FindLeaseSet (ident);
if (leaseSet) if (leaseSet)
SendNamingLookupReply (leaseSet->GetIdentity ()); SendNamingLookupReply (leaseSet->GetIdentity ());
else else
m_Session->localDestination->RequestDestination (ident, dest->RequestDestination (ident,
std::bind (&SAMSocket::HandleNamingLookupLeaseSetRequestComplete, std::bind (&SAMSocket::HandleNamingLookupLeaseSetRequestComplete,
shared_from_this (), std::placeholders::_1, ident)); shared_from_this (), std::placeholders::_1, ident));
} }
else else
{ {
LogPrint (eLogError, "SAM: naming failed, unknown address ", name); LogPrint (eLogError, "SAM: naming failed, unknown address ", name);
@ -664,8 +653,20 @@ namespace client
if (stream) if (stream)
{ {
LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID); LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID);
m_SocketType = eSAMSocketTypeStream;
m_Stream = stream; m_Stream = stream;
context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ()); context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ());
auto session = m_Owner.FindSession (m_ID);
if (session)
{
// find more pending acceptors
for (auto it: session->ListSockets ())
if (it->m_SocketType == eSAMSocketTypeAcceptor)
{
session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, it, std::placeholders::_1));
break;
}
}
if (!m_IsSilent) if (!m_IsSilent)
{ {
// get remote peer address // get remote peer address
@ -707,76 +708,26 @@ namespace client
} }
SAMSession::SAMSession (std::shared_ptr<ClientDestination> dest): SAMSession::SAMSession (std::shared_ptr<ClientDestination> dest):
localDestination (dest), localDestination (dest)
m_BacklogPumper(dest->GetService())
{ {
PumpBacklog();
} }
void SAMSession::AcceptI2P(std::shared_ptr<i2p::stream::Stream> stream) SAMSession::~SAMSession ()
{ {
if(!stream) return; // fail CloseStreams();
std::unique_lock<std::mutex> lock(m_SocketsMutex); i2p::client::context.DeleteLocalDestination (localDestination);
if(m_Backlog.size() > SAM_MAX_ACCEPT_BACKLOG) {
stream->Close();
return;
}
m_Backlog.push_back(stream);
}
void SAMSession::PumpBacklog()
{
// pump backlog every 100ms
boost::posix_time::milliseconds dlt(100);
m_BacklogPumper.expires_from_now(dlt);
m_BacklogPumper.async_wait(std::bind(&SAMSession::HandlePumpBacklog, this, std::placeholders::_1));
}
std::shared_ptr<SAMSocket> SAMSession::FindAcceptor()
{
for (auto & sock : m_Sockets) {
auto t = sock->GetSocketType();
if(t == eSAMSocketTypeAcceptor) {
return sock;
}
}
return nullptr;
}
void SAMSession::HandlePumpBacklog(const boost::system::error_code & ec)
{
if(ec) return;
{
std::unique_lock<std::mutex> lock(m_SocketsMutex);
auto itr = m_Backlog.begin();
while(itr != m_Backlog.end()) {
auto sock = FindAcceptor();
if (sock) {
sock->Accept(*itr);
itr = m_Backlog.erase(itr);
} else {
++itr;
}
}
}
PumpBacklog();
} }
void SAMSession::CloseStreams () void SAMSession::CloseStreams ()
{ {
m_BacklogPumper.cancel(); {
localDestination->GetService().post([&] () {
std::lock_guard<std::mutex> lock(m_SocketsMutex); std::lock_guard<std::mutex> lock(m_SocketsMutex);
for (auto& sock : m_Sockets) { for (auto& sock : m_Sockets) {
sock->CloseStream(); sock->CloseStream();
} }
for(auto & stream : m_Backlog) { }
stream->Close(); // XXX: should this be done inside locked parts?
} m_Sockets.clear();
m_Sockets.clear();
m_Backlog.clear();
i2p::client::context.DeleteLocalDestination (localDestination);
});
} }
SAMBridge::SAMBridge (const std::string& address, int port): SAMBridge::SAMBridge (const std::string& address, int port):
@ -887,9 +838,8 @@ namespace client
auto session = std::make_shared<SAMSession>(localDestination); auto session = std::make_shared<SAMSession>(localDestination);
std::unique_lock<std::mutex> l(m_SessionsMutex); std::unique_lock<std::mutex> l(m_SessionsMutex);
auto ret = m_Sessions.insert (std::make_pair(id, session)); auto ret = m_Sessions.insert (std::make_pair(id, session));
if (!ret.second) { if (!ret.second)
LogPrint (eLogWarning, "SAM: Session ", id, " already exists"); LogPrint (eLogWarning, "SAM: Session ", id, " already exists");
}
return ret.first->second; return ret.first->second;
} }
return nullptr; return nullptr;

19
SAM.h
View file

@ -20,8 +20,7 @@ namespace client
{ {
const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const size_t SAM_SOCKET_BUFFER_SIZE = 8192;
const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds
const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds
const int SAM_MAX_ACCEPT_BACKLOG = 50;
const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE[] = "HELLO VERSION";
const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n";
const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n"; const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n";
@ -85,8 +84,6 @@ namespace client
void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; };
SAMSocketType GetSocketType () const { return m_SocketType; }; SAMSocketType GetSocketType () const { return m_SocketType; };
void Accept(std::shared_ptr<i2p::stream::Stream> stream);
private: private:
void Terminate (); void Terminate ();
@ -137,8 +134,6 @@ namespace client
struct SAMSession struct SAMSession
{ {
std::shared_ptr<ClientDestination> localDestination; std::shared_ptr<ClientDestination> localDestination;
boost::asio::deadline_timer m_BacklogPumper;
std::list<std::shared_ptr<i2p::stream::Stream> > m_Backlog;
std::list<std::shared_ptr<SAMSocket> > m_Sockets; std::list<std::shared_ptr<SAMSocket> > m_Sockets;
std::mutex m_SocketsMutex; std::mutex m_SocketsMutex;
@ -163,15 +158,9 @@ namespace client
} }
return l; return l;
} }
SAMSession (std::shared_ptr<ClientDestination> dest); SAMSession (std::shared_ptr<ClientDestination> dest);
~SAMSession ();
void AcceptI2P(std::shared_ptr<i2p::stream::Stream> stream);
std::shared_ptr<SAMSocket> FindAcceptor();
void PumpBacklog();
void HandlePumpBacklog(const boost::system::error_code & ec);
void CloseStreams (); void CloseStreams ();
}; };

View file

@ -1037,6 +1037,29 @@ namespace stream
m_Acceptor = nullptr; m_Acceptor = nullptr;
} }
void StreamingDestination::AcceptOnce (const Acceptor& acceptor)
{
m_Owner->GetService ().post([acceptor, this](void)
{
if (!m_PendingIncomingStreams.empty ())
{
acceptor (m_PendingIncomingStreams.front ());
m_PendingIncomingStreams.pop_front ();
if (m_PendingIncomingStreams.empty ())
m_PendingIncomingTimer.cancel ();
}
else // we must save old acceptor and set it back
{
auto oldAcceptor = m_Acceptor;
m_Acceptor = [acceptor, oldAcceptor, this](std::shared_ptr<Stream> stream)
{
acceptor (stream);
m_Acceptor = oldAcceptor;
};
}
});
}
void StreamingDestination::HandlePendingIncomingTimer (const boost::system::error_code& ecode) void StreamingDestination::HandlePendingIncomingTimer (const boost::system::error_code& ecode)
{ {
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)

View file

@ -223,6 +223,8 @@ namespace stream
void SetAcceptor (const Acceptor& acceptor); void SetAcceptor (const Acceptor& acceptor);
void ResetAcceptor (); void ResetAcceptor ();
bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
void AcceptOnce (const Acceptor& acceptor);
std::shared_ptr<i2p::client::ClientDestination> GetOwner () const { return m_Owner; }; std::shared_ptr<i2p::client::ClientDestination> GetOwner () const { return m_Owner; };
void SetOwner (std::shared_ptr<i2p::client::ClientDestination> owner) { m_Owner = owner; }; void SetOwner (std::shared_ptr<i2p::client::ClientDestination> owner) { m_Owner = owner; };
uint16_t GetLocalPort () const { return m_LocalPort; }; uint16_t GetLocalPort () const { return m_LocalPort; };

View file

@ -108,8 +108,8 @@ namespace transport
Transports transports; Transports transports;
Transports::Transports (): Transports::Transports ():
m_IsOnline (true), m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_IsOnline (true), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr),
m_PeerCleanupTimer (m_Service), m_PeerTestTimer (m_Service), m_Work (nullptr), m_PeerCleanupTimer (nullptr), m_PeerTestTimer (nullptr),
m_NTCPServer (nullptr), m_SSUServer (nullptr), m_DHKeysPairSupplier (5), // 5 pre-generated keys m_NTCPServer (nullptr), m_SSUServer (nullptr), m_DHKeysPairSupplier (5), // 5 pre-generated keys
m_TotalSentBytes(0), m_TotalReceivedBytes(0), m_InBandwidth (0), m_OutBandwidth (0), m_TotalSentBytes(0), m_TotalReceivedBytes(0), m_InBandwidth (0), m_OutBandwidth (0),
m_LastInBandwidthUpdateBytes (0), m_LastOutBandwidthUpdateBytes (0), m_LastBandwidthUpdateTime (0) m_LastInBandwidthUpdateBytes (0), m_LastOutBandwidthUpdateBytes (0), m_LastBandwidthUpdateTime (0)
@ -119,10 +119,25 @@ namespace transport
Transports::~Transports () Transports::~Transports ()
{ {
Stop (); Stop ();
if (m_Service)
{
delete m_PeerCleanupTimer; m_PeerCleanupTimer = nullptr;
delete m_PeerTestTimer; m_PeerTestTimer = nullptr;
delete m_Work; m_Work = nullptr;
delete m_Service; m_Service = nullptr;
}
} }
void Transports::Start (bool enableNTCP, bool enableSSU) void Transports::Start (bool enableNTCP, bool enableSSU)
{ {
if (!m_Service)
{
m_Service = new boost::asio::io_service ();
m_Work = new boost::asio::io_service::work (*m_Service);
m_PeerCleanupTimer = new boost::asio::deadline_timer (*m_Service);
m_PeerTestTimer = new boost::asio::deadline_timer (*m_Service);
}
m_DHKeysPairSupplier.Start (); m_DHKeysPairSupplier.Start ();
m_IsRunning = true; m_IsRunning = true;
m_Thread = new std::thread (std::bind (&Transports::Run, this)); m_Thread = new std::thread (std::bind (&Transports::Run, this));
@ -167,16 +182,16 @@ namespace transport
LogPrint (eLogError, "Transports: SSU server already exists"); LogPrint (eLogError, "Transports: SSU server already exists");
} }
} }
m_PeerCleanupTimer.expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT)); m_PeerCleanupTimer->expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT));
m_PeerCleanupTimer.async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1)); m_PeerCleanupTimer->async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1));
m_PeerTestTimer.expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL)); m_PeerTestTimer->expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL));
m_PeerTestTimer.async_wait (std::bind (&Transports::HandlePeerTestTimer, this, std::placeholders::_1)); m_PeerTestTimer->async_wait (std::bind (&Transports::HandlePeerTestTimer, this, std::placeholders::_1));
} }
void Transports::Stop () void Transports::Stop ()
{ {
m_PeerCleanupTimer.cancel (); if (m_PeerCleanupTimer) m_PeerCleanupTimer->cancel ();
m_PeerTestTimer.cancel (); if (m_PeerTestTimer) m_PeerTestTimer->cancel ();
m_Peers.clear (); m_Peers.clear ();
if (m_SSUServer) if (m_SSUServer)
{ {
@ -193,7 +208,7 @@ namespace transport
m_DHKeysPairSupplier.Stop (); m_DHKeysPairSupplier.Stop ();
m_IsRunning = false; m_IsRunning = false;
m_Service.stop (); if (m_Service) m_Service->stop ();
if (m_Thread) if (m_Thread)
{ {
m_Thread->join (); m_Thread->join ();
@ -204,11 +219,11 @@ namespace transport
void Transports::Run () void Transports::Run ()
{ {
while (m_IsRunning) while (m_IsRunning && m_Service)
{ {
try try
{ {
m_Service.run (); m_Service->run ();
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
@ -251,7 +266,7 @@ namespace transport
#ifdef WITH_EVENTS #ifdef WITH_EVENTS
QueueIntEvent("transport.send", ident.ToBase64(), msgs.size()); QueueIntEvent("transport.send", ident.ToBase64(), msgs.size());
#endif #endif
m_Service.post (std::bind (&Transports::PostMessages, this, ident, msgs)); m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs));
} }
void Transports::PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs) void Transports::PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs)
@ -386,7 +401,7 @@ namespace transport
void Transports::RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident) void Transports::RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident)
{ {
m_Service.post (std::bind (&Transports::HandleRequestComplete, this, r, ident)); m_Service->post (std::bind (&Transports::HandleRequestComplete, this, r, ident));
} }
void Transports::HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, i2p::data::IdentHash ident) void Transports::HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, i2p::data::IdentHash ident)
@ -411,7 +426,7 @@ namespace transport
void Transports::NTCPResolve (const std::string& addr, const i2p::data::IdentHash& ident) void Transports::NTCPResolve (const std::string& addr, const i2p::data::IdentHash& ident)
{ {
auto resolver = std::make_shared<boost::asio::ip::tcp::resolver>(m_Service); auto resolver = std::make_shared<boost::asio::ip::tcp::resolver>(*m_Service);
resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""), resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""),
std::bind (&Transports::HandleNTCPResolve, this, std::bind (&Transports::HandleNTCPResolve, this,
std::placeholders::_1, std::placeholders::_2, ident, resolver)); std::placeholders::_1, std::placeholders::_2, ident, resolver));
@ -454,7 +469,7 @@ namespace transport
void Transports::SSUResolve (const std::string& addr, const i2p::data::IdentHash& ident) void Transports::SSUResolve (const std::string& addr, const i2p::data::IdentHash& ident)
{ {
auto resolver = std::make_shared<boost::asio::ip::tcp::resolver>(m_Service); auto resolver = std::make_shared<boost::asio::ip::tcp::resolver>(*m_Service);
resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""), resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""),
std::bind (&Transports::HandleSSUResolve, this, std::bind (&Transports::HandleSSUResolve, this,
std::placeholders::_1, std::placeholders::_2, ident, resolver)); std::placeholders::_1, std::placeholders::_2, ident, resolver));
@ -497,7 +512,7 @@ namespace transport
void Transports::CloseSession (std::shared_ptr<const i2p::data::RouterInfo> router) void Transports::CloseSession (std::shared_ptr<const i2p::data::RouterInfo> router)
{ {
if (!router) return; if (!router) return;
m_Service.post (std::bind (&Transports::PostCloseSession, this, router)); m_Service->post (std::bind (&Transports::PostCloseSession, this, router));
} }
void Transports::PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router) void Transports::PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router)
@ -584,7 +599,7 @@ namespace transport
void Transports::PeerConnected (std::shared_ptr<TransportSession> session) void Transports::PeerConnected (std::shared_ptr<TransportSession> session)
{ {
m_Service.post([session, this]() m_Service->post([session, this]()
{ {
auto remoteIdentity = session->GetRemoteIdentity (); auto remoteIdentity = session->GetRemoteIdentity ();
if (!remoteIdentity) return; if (!remoteIdentity) return;
@ -632,7 +647,7 @@ namespace transport
void Transports::PeerDisconnected (std::shared_ptr<TransportSession> session) void Transports::PeerDisconnected (std::shared_ptr<TransportSession> session)
{ {
m_Service.post([session, this]() m_Service->post([session, this]()
{ {
auto remoteIdentity = session->GetRemoteIdentity (); auto remoteIdentity = session->GetRemoteIdentity ();
if (!remoteIdentity) return; if (!remoteIdentity) return;
@ -690,8 +705,8 @@ namespace transport
UpdateBandwidth (); // TODO: use separate timer(s) for it UpdateBandwidth (); // TODO: use separate timer(s) for it
if (i2p::context.GetStatus () == eRouterStatusTesting) // if still testing, repeat peer test if (i2p::context.GetStatus () == eRouterStatusTesting) // if still testing, repeat peer test
DetectExternalIP (); DetectExternalIP ();
m_PeerCleanupTimer.expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT)); m_PeerCleanupTimer->expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT));
m_PeerCleanupTimer.async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1)); m_PeerCleanupTimer->async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1));
} }
} }
@ -700,8 +715,8 @@ namespace transport
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
{ {
PeerTest (); PeerTest ();
m_PeerTestTimer.expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL)); m_PeerTestTimer->expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL));
m_PeerTestTimer.async_wait (std::bind (&Transports::HandlePeerTestTimer, this, std::placeholders::_1)); m_PeerTestTimer->async_wait (std::bind (&Transports::HandlePeerTestTimer, this, std::placeholders::_1));
} }
} }

View file

@ -84,7 +84,7 @@ namespace transport
bool IsOnline() const { return m_IsOnline; }; bool IsOnline() const { return m_IsOnline; };
void SetOnline (bool online) { m_IsOnline = online; }; void SetOnline (bool online) { m_IsOnline = online; };
boost::asio::io_service& GetService () { return m_Service; }; boost::asio::io_service& GetService () { return *m_Service; };
std::shared_ptr<i2p::crypto::DHKeys> GetNextDHKeysPair (); std::shared_ptr<i2p::crypto::DHKeys> GetNextDHKeysPair ();
void ReuseDHKeysPair (std::shared_ptr<i2p::crypto::DHKeys> pair); void ReuseDHKeysPair (std::shared_ptr<i2p::crypto::DHKeys> pair);
@ -144,9 +144,9 @@ namespace transport
bool m_IsOnline, m_IsRunning; bool m_IsOnline, m_IsRunning;
std::thread * m_Thread; std::thread * m_Thread;
boost::asio::io_service m_Service; boost::asio::io_service * m_Service;
boost::asio::io_service::work m_Work; boost::asio::io_service::work * m_Work;
boost::asio::deadline_timer m_PeerCleanupTimer, m_PeerTestTimer; boost::asio::deadline_timer * m_PeerCleanupTimer, * m_PeerTestTimer;
NTCPServer * m_NTCPServer; NTCPServer * m_NTCPServer;
SSUServer * m_SSUServer; SSUServer * m_SSUServer;

View file

@ -275,14 +275,15 @@ namespace net
if (cur_ifname == ifname && cur->ifa_addr && cur->ifa_addr->sa_family == af) if (cur_ifname == ifname && cur->ifa_addr && cur->ifa_addr->sa_family == af)
{ {
// match // match
char * addr = new char[INET6_ADDRSTRLEN]; char * addr = new char[INET6_ADDRSTRLEN];
bzero(addr, INET6_ADDRSTRLEN); bzero(addr, INET6_ADDRSTRLEN);
if(af == AF_INET) if(af == AF_INET)
inet_ntop(af, &((sockaddr_in *)cur->ifa_addr)->sin_addr, addr, INET6_ADDRSTRLEN); inet_ntop(af, &((sockaddr_in *)cur->ifa_addr)->sin_addr, addr, INET6_ADDRSTRLEN);
else else
inet_ntop(af, &((sockaddr_in6 *)cur->ifa_addr)->sin6_addr, addr, INET6_ADDRSTRLEN); inet_ntop(af, &((sockaddr_in6 *)cur->ifa_addr)->sin6_addr, addr, INET6_ADDRSTRLEN);
freeifaddrs(addrs); freeifaddrs(addrs);
std::string cur_ifaddr(addr); std::string cur_ifaddr(addr);
delete[] addr;
return boost::asio::ip::address::from_string(cur_ifaddr); return boost::asio::ip::address::from_string(cur_ifaddr);
} }
cur = cur->ifa_next; cur = cur->ifa_next;