diff --git a/daemon/HTTPServer.cpp b/daemon/HTTPServer.cpp
index 12edf276..657e6d43 100644
--- a/daemon/HTTPServer.cpp
+++ b/daemon/HTTPServer.cpp
@@ -660,7 +660,7 @@ namespace http {
s << i2p::client::context.GetAddressBook ().ToAddress(ident) << "
\r\n";
s << "
\r\n";
s << "Streams:
\r\n";
- for (const auto& it: session->ListSockets())
+ for (const auto& it: sam->ListSockets(id))
{
switch (it->GetSocketType ())
{
diff --git a/daemon/I2PControl.cpp b/daemon/I2PControl.cpp
index fcff78cd..6ac87cbb 100644
--- a/daemon/I2PControl.cpp
+++ b/daemon/I2PControl.cpp
@@ -727,7 +727,7 @@ namespace client
sam_session.put("name", name);
sam_session.put("address", i2p::client::context.GetAddressBook ().ToAddress(ident));
- for (const auto& socket: it.second->ListSockets())
+ for (const auto& socket: sam->ListSockets(it.first))
{
boost::property_tree::ptree stream;
stream.put("type", socket->GetSocketType ());
diff --git a/libi2pd/HTTP.cpp b/libi2pd/HTTP.cpp
index 41d7903b..985e1c22 100644
--- a/libi2pd/HTTP.cpp
+++ b/libi2pd/HTTP.cpp
@@ -96,7 +96,7 @@ namespace http {
pos_c = url.find('@', pos_p); /* find end of 'user' or 'user:pass' part */
if (pos_c != std::string::npos && (pos_s == std::string::npos || pos_s > pos_c)) {
std::size_t delim = url.find(':', pos_p);
- if (delim != std::string::npos && delim < pos_c) {
+ if (delim && delim != std::string::npos && delim < pos_c) {
user = url.substr(pos_p, delim - pos_p);
delim += 1;
pass = url.substr(delim, pos_c - delim);
diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp
index dd8e3634..e8386b61 100644
--- a/libi2pd/Streaming.cpp
+++ b/libi2pd/Streaming.cpp
@@ -578,9 +578,7 @@ namespace stream
if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send
{
m_Status = eStreamStatusClosed;
- // close could be called from another thread so do SendClose from the destination thread
- // this is so m_LocalDestination.NewPacket () does not trigger a race condition
- m_Service.post(std::bind(&Stream::SendClose, shared_from_this()));
+ SendClose();
}
break;
case eStreamStatusClosed:
diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h
index 47f99833..3db8d760 100644
--- a/libi2pd/Streaming.h
+++ b/libi2pd/Streaming.h
@@ -165,6 +165,9 @@ namespace stream
void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0);
size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); };
+ void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); };
+
+ /** only call close from destination thread, use Stream::AsyncClose for other threads */
void Close ();
void Cancel () { m_ReceiveTimer.cancel (); };
diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp
index 05943981..ac2dd853 100644
--- a/libi2pd_client/SAM.cpp
+++ b/libi2pd_client/SAM.cpp
@@ -15,8 +15,8 @@ namespace i2p
{
namespace client
{
- SAMSocket::SAMSocket (SAMBridge& owner, std::shared_ptr socket):
- m_Owner (owner), m_Socket(socket), m_Timer (m_Owner.GetService ()),
+ SAMSocket::SAMSocket (SAMBridge& owner):
+ m_Owner (owner), m_Socket(owner.GetService()), m_Timer (m_Owner.GetService ()),
m_BufferOffset (0),
m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false),
m_IsAccepting (false), m_Stream (nullptr)
@@ -25,51 +25,17 @@ namespace client
SAMSocket::~SAMSocket ()
{
- if(m_Stream)
- {
- m_Stream->Close ();
- m_Stream.reset ();
- }
- auto Session = m_Owner.FindSession(m_ID);
-
- switch (m_SocketType)
- {
- case eSAMSocketTypeSession:
- m_Owner.CloseSession (m_ID);
- break;
- case eSAMSocketTypeStream:
- {
- if (Session)
- Session->DelSocket (this);
- break;
- }
- case eSAMSocketTypeAcceptor:
- {
- if (Session)
- {
- Session->DelSocket (this);
- if (m_IsAccepting && Session->localDestination)
- Session->localDestination->StopAcceptingStreams ();
- }
- break;
- }
- default:
- ;
- }
- m_SocketType = eSAMSocketTypeTerminated;
- if (m_Socket && m_Socket->is_open()) m_Socket->close ();
- m_Socket.reset ();
+ m_Stream = nullptr;
}
void SAMSocket::Terminate (const char* reason)
{
if(m_Stream)
{
- m_Stream->Close ();
- m_Stream.reset ();
+ m_Stream->AsyncClose ();
+ m_Stream = nullptr;
}
auto Session = m_Owner.FindSession(m_ID);
-
switch (m_SocketType)
{
case eSAMSocketTypeSession:
@@ -77,15 +43,12 @@ namespace client
break;
case eSAMSocketTypeStream:
{
- if (Session)
- Session->DelSocket (this);
break;
}
case eSAMSocketTypeAcceptor:
{
if (Session)
{
- Session->DelSocket (this);
if (m_IsAccepting && Session->localDestination)
Session->localDestination->StopAcceptingStreams ();
}
@@ -95,16 +58,20 @@ namespace client
;
}
m_SocketType = eSAMSocketTypeTerminated;
- if (m_Socket && m_Socket->is_open()) m_Socket->close ();
- m_Socket.reset ();
+ if (m_Socket.is_open ())
+ {
+ boost::system::error_code ec;
+ m_Socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
+ m_Socket.close ();
+ }
+ m_Owner.RemoveSocket(shared_from_this());
}
void SAMSocket::ReceiveHandshake ()
- {
- if(m_Socket)
- m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
- std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (),
- std::placeholders::_1, std::placeholders::_2));
+ {
+ m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
+ std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (),
+ std::placeholders::_1, std::placeholders::_2));
}
static bool SAMVersionAcceptable(const std::string & ver)
@@ -125,7 +92,7 @@ namespace client
void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
{
if (ecode)
- {
+ {
LogPrint (eLogError, "SAM: handshake read error: ", ecode.message ());
if (ecode != boost::asio::error::operation_aborted)
Terminate ("SAM: handshake read error");
@@ -184,7 +151,7 @@ namespace client
#else
size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ());
#endif
- boost::asio::async_write (*m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (),
+ boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (),
std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (),
std::placeholders::_1, std::placeholders::_2));
}
@@ -199,17 +166,22 @@ namespace client
}
}
+ bool SAMSocket::IsSession(const std::string & id) const
+ {
+ return id == m_ID;
+ }
+
void SAMSocket::HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred)
{
if (ecode)
- {
+ {
LogPrint (eLogError, "SAM: handshake reply send error: ", ecode.message ());
if (ecode != boost::asio::error::operation_aborted)
Terminate ("SAM: handshake reply send error");
}
- else if(m_Socket)
+ else
{
- m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
+ m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
std::bind(&SAMSocket::HandleMessage, shared_from_this (),
std::placeholders::_1, std::placeholders::_2));
}
@@ -220,7 +192,7 @@ namespace client
LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg);
if (!m_IsSilent)
- boost::asio::async_write (*m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (),
+ boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (),
std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (),
std::placeholders::_1, std::placeholders::_2, close));
else
@@ -235,7 +207,7 @@ namespace client
void SAMSocket::HandleMessageReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred, bool close)
{
if (ecode)
- {
+ {
LogPrint (eLogError, "SAM: reply send error: ", ecode.message ());
if (ecode != boost::asio::error::operation_aborted)
Terminate ("SAM: reply send error");
@@ -252,7 +224,7 @@ namespace client
void SAMSocket::HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred)
{
if (ecode)
- {
+ {
LogPrint (eLogError, "SAM: read error: ", ecode.message ());
if (ecode != boost::asio::error::operation_aborted)
Terminate ("SAM: read error");
@@ -501,7 +473,6 @@ namespace client
if(session)
{
m_SocketType = eSAMSocketTypeStream;
- session->AddSocket (shared_from_this ());
m_Stream = session->localDestination->CreateStream (remote);
m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send
m_BufferOffset = 0;
@@ -534,7 +505,6 @@ namespace client
if (session)
{
m_SocketType = eSAMSocketTypeAcceptor;
- session->AddSocket (shared_from_this ());
if (!session->localDestination->IsAcceptingStreams ())
{
m_IsAccepting = true;
@@ -599,7 +569,7 @@ namespace client
keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ());
#else
size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY,
- keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ());
+ keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ());
#endif
SendMessageReply (m_Buffer, l, false);
}
@@ -704,17 +674,9 @@ namespace client
void SAMSocket::Receive ()
{
- if (m_BufferOffset >= SAM_SOCKET_BUFFER_SIZE)
- {
- LogPrint (eLogError, "SAM: Buffer is full, terminate");
- Terminate ("Buffer is full");
- return;
- } else if (m_Socket)
- m_Socket->async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
- std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage,
- shared_from_this (), std::placeholders::_1, std::placeholders::_2));
- else
- LogPrint(eLogError, "SAM: receive with no native socket");
+ m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
+ std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage,
+ shared_from_this (), std::placeholders::_1, std::placeholders::_2));
}
void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
@@ -731,15 +693,12 @@ namespace client
{
bytes_transferred += m_BufferOffset;
m_BufferOffset = 0;
- auto s = shared_from_this ();
m_Stream->AsyncSend ((uint8_t *)m_Buffer, bytes_transferred,
- [s](const boost::system::error_code& ecode)
- {
- if (!ecode)
- s->m_Owner.GetService ().post ([s] { s->Receive (); });
- else
- s->m_Owner.GetService ().post ([s] { s->Terminate ("AsyncSend failed"); });
- });
+ std::bind(&SAMSocket::HandleStreamSend, shared_from_this(), std::placeholders::_1));
+ }
+ else
+ {
+ Terminate("No Stream Remaining");
}
}
}
@@ -766,21 +725,21 @@ namespace client
WriteI2PDataImmediate(buff, len);
}
else // no more data
+ {
+ delete [] buff;
Terminate ("no more data");
+ }
}
}
}
void SAMSocket::WriteI2PDataImmediate(uint8_t * buff, size_t sz)
{
- if(m_Socket)
- boost::asio::async_write (
- *m_Socket,
- boost::asio::buffer (buff, sz),
- boost::asio::transfer_all(),
- std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination
- else
- LogPrint(eLogError, "SAM: no native socket");
+ boost::asio::async_write (
+ m_Socket,
+ boost::asio::buffer (buff, sz),
+ boost::asio::transfer_all(),
+ std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination
}
void SAMSocket::HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff)
@@ -790,9 +749,11 @@ namespace client
void SAMSocket::WriteI2PData(size_t sz)
{
- uint8_t * sendbuff = new uint8_t[sz];
- memcpy(sendbuff, m_StreamBuffer, sz);
- WriteI2PDataImmediate(sendbuff, sz);
+ boost::asio::async_write (
+ m_Socket,
+ boost::asio::buffer (m_StreamBuffer, sz),
+ boost::asio::transfer_all(),
+ std::bind(&SAMSocket::HandleWriteI2PData, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred)
@@ -826,7 +787,8 @@ namespace client
{
WriteI2PData(bytes_transferred);
}
- I2PReceive();
+ else
+ I2PReceive();
}
}
}
@@ -858,7 +820,7 @@ namespace client
if (session)
{
// find more pending acceptors
- for (auto it: session->ListSockets ())
+ for (auto & it: m_Owner.ListSockets (m_ID))
if (it->m_SocketType == eSAMSocketTypeAcceptor)
{
it->m_IsAccepting = true;
@@ -930,29 +892,30 @@ namespace client
}
}
- SAMSession::SAMSession (std::shared_ptr dest):
+ void SAMSocket::HandleStreamSend(const boost::system::error_code & ec)
+ {
+ m_Owner.GetService ().post (std::bind( !ec ? &SAMSocket::Receive : &SAMSocket::TerminateClose, shared_from_this()));
+ }
+
+ SAMSession::SAMSession (SAMBridge & parent, const std::string & id, std::shared_ptr dest):
+ m_Bridge(parent),
localDestination (dest),
- UDPEndpoint(nullptr)
+ UDPEndpoint(nullptr),
+ Name(id)
{
}
-
+
SAMSession::~SAMSession ()
{
- CloseStreams();
i2p::client::context.DeleteLocalDestination (localDestination);
}
void SAMSession::CloseStreams ()
{
- std::vector > socks;
+ for(const auto & itr : m_Bridge.ListSockets(Name))
{
- std::lock_guard lock(m_SocketsMutex);
- for (const auto& sock : m_Sockets) {
- socks.push_back(sock);
- }
+ itr->Terminate(nullptr);
}
- for (auto & sock : socks ) sock->Terminate("SAMSession::CloseStreams()");
- m_Sockets.clear();
}
SAMBridge::SAMBridge (const std::string& address, int port):
@@ -1009,12 +972,17 @@ namespace client
void SAMBridge::Accept ()
{
- auto native = std::make_shared(m_Service);
- auto newSocket = std::make_shared (*this, native);
- m_Acceptor.async_accept (*native, std::bind (&SAMBridge::HandleAccept, this,
+ auto newSocket = std::make_shared(*this);
+ m_Acceptor.async_accept (newSocket->GetSocket(), std::bind (&SAMBridge::HandleAccept, this,
std::placeholders::_1, newSocket));
}
+ void SAMBridge::RemoveSocket(const std::shared_ptr & socket)
+ {
+ std::unique_lock lock(m_OpenSocketsMutex);
+ m_OpenSockets.remove_if([socket](const std::shared_ptr & item) -> bool { return item == socket; });
+ }
+
void SAMBridge::HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket)
{
if (!ecode)
@@ -1024,6 +992,10 @@ namespace client
if (!ec)
{
LogPrint (eLogDebug, "SAM: new connection from ", ep);
+ {
+ std::unique_lock l(m_OpenSocketsMutex);
+ m_OpenSockets.push_back(socket);
+ }
socket->ReceiveHandshake ();
}
else
@@ -1066,7 +1038,7 @@ namespace client
if (localDestination)
{
localDestination->Acquire ();
- auto session = std::make_shared(localDestination);
+ auto session = std::make_shared(*this, id, localDestination);
std::unique_lock l(m_SessionsMutex);
auto ret = m_Sessions.insert (std::make_pair(id, session));
if (!ret.second)
@@ -1105,6 +1077,18 @@ namespace client
return nullptr;
}
+ std::list > SAMBridge::ListSockets(const std::string & id) const
+ {
+ std::list > list;
+ {
+ std::unique_lock l(m_OpenSocketsMutex);
+ for (const auto & itr : m_OpenSockets)
+ if (itr->IsSession(id))
+ list.push_back(itr);
+ }
+ return list;
+ }
+
void SAMBridge::SendTo(const uint8_t * buf, size_t len, std::shared_ptr remote)
{
if(remote)
@@ -1127,33 +1111,38 @@ namespace client
{
m_DatagramReceiveBuffer[bytes_transferred] = 0;
char * eol = strchr ((char *)m_DatagramReceiveBuffer, '\n');
- *eol = 0; eol++;
- size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer);
- LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen);
- char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' ');
- if (sessionID)
+ if(eol)
{
- sessionID++;
- char * destination = strchr (sessionID, ' ');
- if (destination)
+ *eol = 0; eol++;
+ size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer);
+ LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen);
+ char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' ');
+ if (sessionID)
{
- *destination = 0; destination++;
- auto session = FindSession (sessionID);
- if (session)
+ sessionID++;
+ char * destination = strchr (sessionID, ' ');
+ if (destination)
{
- i2p::data::IdentityEx dest;
- dest.FromBase64 (destination);
- session->localDestination->GetDatagramDestination ()->
- SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ());
+ *destination = 0; destination++;
+ auto session = FindSession (sessionID);
+ if (session)
+ {
+ i2p::data::IdentityEx dest;
+ dest.FromBase64 (destination);
+ session->localDestination->GetDatagramDestination ()->
+ SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ());
+ }
+ else
+ LogPrint (eLogError, "SAM: Session ", sessionID, " not found");
}
else
- LogPrint (eLogError, "SAM: Session ", sessionID, " not found");
+ LogPrint (eLogError, "SAM: Missing destination key");
}
else
- LogPrint (eLogError, "SAM: Missing destination key");
+ LogPrint (eLogError, "SAM: Missing sessionID");
}
else
- LogPrint (eLogError, "SAM: Missing sessionID");
+ LogPrint(eLogError, "SAM: invalid datagram");
ReceiveDatagram ();
}
else
diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h
index 6ecd14a4..953af1cd 100644
--- a/libi2pd_client/SAM.h
+++ b/libi2pd_client/SAM.h
@@ -82,18 +82,21 @@ namespace client
public:
typedef boost::asio::ip::tcp::socket Socket_t;
- SAMSocket (SAMBridge& owner, std::shared_ptr socket);
+ SAMSocket (SAMBridge& owner);
~SAMSocket ();
- boost::asio::ip::tcp::socket& GetSocket () { return *m_Socket; };
+ Socket_t& GetSocket () { return m_Socket; };
void ReceiveHandshake ();
void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; };
SAMSocketType GetSocketType () const { return m_SocketType; };
void Terminate (const char* reason);
+ bool IsSession(const std::string & id) const;
+
private:
-
+ void TerminateClose() { Terminate(nullptr); }
+
void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred);
@@ -128,10 +131,12 @@ namespace client
void WriteI2PDataImmediate(uint8_t * ptr, size_t sz);
void HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff);
+ void HandleStreamSend(const boost::system::error_code & ec);
+
private:
SAMBridge& m_Owner;
- std::shared_ptr m_Socket;
+ Socket_t m_Socket;
boost::asio::deadline_timer m_Timer;
char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1];
size_t m_BufferOffset;
@@ -145,34 +150,12 @@ namespace client
struct SAMSession
{
+ SAMBridge & m_Bridge;
std::shared_ptr localDestination;
- std::list > m_Sockets;
std::shared_ptr UDPEndpoint;
- std::mutex m_SocketsMutex;
+ std::string Name;
- /** safely add a socket to this session */
- void AddSocket(std::shared_ptr sock) {
- std::lock_guard lock(m_SocketsMutex);
- m_Sockets.push_back(sock);
- }
-
- /** safely remove a socket from this session */
- void DelSocket(SAMSocket * sock) {
- std::lock_guard lock(m_SocketsMutex);
- m_Sockets.remove_if([sock](const std::shared_ptr s) -> bool { return s.get() == sock; });
- }
-
- /** get a list holding a copy of all sam sockets from this session */
- std::list > ListSockets() {
- std::list > l;
- {
- std::lock_guard lock(m_SocketsMutex);
- for(const auto& sock : m_Sockets ) l.push_back(sock);
- }
- return l;
- }
-
- SAMSession (std::shared_ptr dest);
+ SAMSession (SAMBridge & parent, const std::string & name, std::shared_ptr dest);
~SAMSession ();
void CloseStreams ();
@@ -189,14 +172,18 @@ namespace client
void Stop ();
boost::asio::io_service& GetService () { return m_Service; };
- std::shared_ptr CreateSession (const std::string& id, const std::string& destination, // empty string means transient
+ std::shared_ptr CreateSession (const std::string& id, const std::string& destination, // empty string means transient
const std::map * params);
void CloseSession (const std::string& id);
std::shared_ptr FindSession (const std::string& id) const;
+ std::list > ListSockets(const std::string & id) const;
+
/** send raw data to remote endpoint from our UDP Socket */
void SendTo(const uint8_t * buf, size_t len, std::shared_ptr remote);
+ void RemoveSocket(const std::shared_ptr & socket);
+
private:
void Run ();
@@ -217,6 +204,8 @@ namespace client
boost::asio::ip::udp::socket m_DatagramSocket;
mutable std::mutex m_SessionsMutex;
std::map > m_Sessions;
+ mutable std::mutex m_OpenSocketsMutex;
+ std::list > m_OpenSockets;
uint8_t m_DatagramReceiveBuffer[i2p::datagram::MAX_DATAGRAM_SIZE+1];
public: