Merge pull request #464 from majestrate/master

Add locking to SAM
This commit is contained in:
orignal 2016-04-01 12:56:30 -04:00
commit 1fc5dacd87
3 changed files with 43 additions and 16 deletions

View file

@ -749,7 +749,7 @@ namespace util
s << "&" << HTTP_PARAM_BASE32_ADDRESS << "=" << ident.ToBase32 () << ">"; s << "&" << HTTP_PARAM_BASE32_ADDRESS << "=" << ident.ToBase32 () << ">";
s << i2p::client::context.GetAddressBook ().ToAddress(ident) << "</a><br>\r\n" << std::endl; s << i2p::client::context.GetAddressBook ().ToAddress(ident) << "</a><br>\r\n" << std::endl;
s << "<b>Streams:</b><br>\r\n"; s << "<b>Streams:</b><br>\r\n";
for (auto it: session->sockets) for (auto it: session->ListSockets())
{ {
switch (it->GetSocketType ()) switch (it->GetSocketType ())
{ {

28
SAM.cpp
View file

@ -47,16 +47,19 @@ namespace client
break; break;
case eSAMSocketTypeStream: case eSAMSocketTypeStream:
{ {
if (m_Session) if (m_Session) {
m_Session->sockets.remove (shared_from_this ()); m_Session->DelSocket (shared_from_this ());
m_Session = nullptr;
}
break; break;
} }
case eSAMSocketTypeAcceptor: case eSAMSocketTypeAcceptor:
{ {
if (m_Session) if (m_Session)
{ {
m_Session->sockets.remove (shared_from_this ()); m_Session->DelSocket (shared_from_this ());
m_Session->localDestination->StopAcceptingStreams (); m_Session->localDestination->StopAcceptingStreams ();
m_Session = nullptr;
} }
break; break;
} }
@ -64,7 +67,7 @@ namespace client
; ;
} }
m_SocketType = eSAMSocketTypeTerminated; m_SocketType = eSAMSocketTypeTerminated;
m_Socket.close (); if (m_Socket.is_open()) m_Socket.close ();
} }
void SAMSocket::ReceiveHandshake () void SAMSocket::ReceiveHandshake ()
@ -369,7 +372,7 @@ namespace client
void SAMSocket::Connect (std::shared_ptr<const i2p::data::LeaseSet> remote) void SAMSocket::Connect (std::shared_ptr<const i2p::data::LeaseSet> remote)
{ {
m_SocketType = eSAMSocketTypeStream; m_SocketType = eSAMSocketTypeStream;
m_Session->sockets.push_back (shared_from_this ()); m_Session->AddSocket (shared_from_this ());
m_Stream = m_Session->localDestination->CreateStream (remote); m_Stream = m_Session->localDestination->CreateStream (remote);
m_Stream->Send ((uint8_t *)m_Buffer, 0); // connect m_Stream->Send ((uint8_t *)m_Buffer, 0); // connect
I2PReceive (); I2PReceive ();
@ -402,7 +405,7 @@ namespace client
if (!m_Session->localDestination->IsAcceptingStreams ()) if (!m_Session->localDestination->IsAcceptingStreams ())
{ {
m_SocketType = eSAMSocketTypeAcceptor; m_SocketType = eSAMSocketTypeAcceptor;
m_Session->sockets.push_back (shared_from_this ()); m_Session->AddSocket (shared_from_this ());
m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
} }
@ -676,19 +679,20 @@ namespace client
SAMSession::~SAMSession () SAMSession::~SAMSession ()
{ {
for (auto it: sockets) CloseStreams();
it->SetSocketType (eSAMSocketTypeTerminated);
i2p::client::context.DeleteLocalDestination (localDestination); i2p::client::context.DeleteLocalDestination (localDestination);
} }
void SAMSession::CloseStreams () void SAMSession::CloseStreams ()
{ {
for (auto it: sockets)
{ {
it->CloseStream (); std::lock_guard<std::mutex> lock(m_SocketsMutex);
it->SetSocketType (eSAMSocketTypeTerminated); for (auto sock : m_Sockets) {
sock->CloseStream();
} }
sockets.clear (); }
// XXX: should this be done inside locked parts?
m_Sockets.clear();
} }
SAMBridge::SAMBridge (const std::string& address, int port): SAMBridge::SAMBridge (const std::string& address, int port):

25
SAM.h
View file

@ -134,7 +134,30 @@ namespace client
struct SAMSession struct SAMSession
{ {
std::shared_ptr<ClientDestination> localDestination; std::shared_ptr<ClientDestination> localDestination;
std::list<std::shared_ptr<SAMSocket> > sockets; std::list<std::shared_ptr<SAMSocket> > m_Sockets;
std::mutex m_SocketsMutex;
/** safely add a socket to this session */
void AddSocket(std::shared_ptr<SAMSocket> sock) {
std::lock_guard<std::mutex> lock(m_SocketsMutex);
m_Sockets.push_back(sock);
}
/** safely remove a socket from this session */
void DelSocket(std::shared_ptr<SAMSocket> sock) {
std::lock_guard<std::mutex> lock(m_SocketsMutex);
m_Sockets.remove(sock);
}
/** get a list holding a copy of all sam sockets from this session */
std::list<std::shared_ptr<SAMSocket> > ListSockets() {
std::list<std::shared_ptr<SAMSocket> > l;
{
std::lock_guard<std::mutex> lock(m_SocketsMutex);
for( auto & sock : m_Sockets ) l.push_back(sock);
}
return l;
}
SAMSession (std::shared_ptr<ClientDestination> dest); SAMSession (std::shared_ptr<ClientDestination> dest);
~SAMSession (); ~SAMSession ();