SAM subsessions

This commit is contained in:
orignal 2021-05-04 14:27:06 -04:00
parent 0b9cb4e75b
commit fffa550bb0
4 changed files with 84 additions and 5 deletions

View file

@ -1131,6 +1131,21 @@ namespace client
return dest;
}
std::shared_ptr<i2p::stream::StreamingDestination> ClientDestination::RemoveStreamingDestination (int port)
{
if (port)
{
auto it = m_StreamingDestinationsByPorts.find (port);
if (it != m_StreamingDestinationsByPorts.end ())
{
auto ret = it->second;
m_StreamingDestinationsByPorts.erase (it);
return ret;
}
}
return nullptr;
}
i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination (bool gzip)
{
if (m_DatagramDestination == nullptr)

View file

@ -236,6 +236,7 @@ namespace client
// streaming
std::shared_ptr<i2p::stream::StreamingDestination> CreateStreamingDestination (int port, bool gzip = true); // additional
std::shared_ptr<i2p::stream::StreamingDestination> GetStreamingDestination (int port = 0) const;
std::shared_ptr<i2p::stream::StreamingDestination> RemoveStreamingDestination (int port);
// following methods operate with default streaming destination
void CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port = 0);
void CreateStream (StreamRequestComplete streamRequestComplete, std::shared_ptr<const i2p::data::BlindedPublicKey> dest, int port = 0);

View file

@ -270,6 +270,10 @@ namespace client
ProcessDestGenerate (separator + 1, bytes_transferred - (separator - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_NAMING_LOOKUP))
ProcessNamingLookup (separator + 1, bytes_transferred - (separator - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_SESSION_ADD))
ProcessSessionAdd (separator + 1, bytes_transferred - (separator - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_SESSION_REMOVE))
ProcessSessionRemove (separator + 1, bytes_transferred - (separator - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_DATAGRAM_SEND) || !strcmp (m_Buffer, SAM_RAW_SEND))
{
size_t len = bytes_transferred - (separator - m_Buffer) - 1;
@ -759,6 +763,16 @@ namespace client
}
}
void SAMSocket::ProcessSessionAdd (char * buf, size_t len)
{
// TODO: implement
}
void SAMSocket::ProcessSessionRemove (char * buf, size_t len)
{
// TODO: implement
}
void SAMSocket::SendI2PError(const std::string & msg)
{
LogPrint (eLogError, "SAM: i2p error ", msg);
@ -1133,6 +1147,39 @@ namespace client
i2p::client::context.DeleteLocalDestination (localDestination);
}
void SAMSingleSession::StopLocalDestination ()
{
localDestination->Release ();
localDestination->StopAcceptingStreams ();
}
SAMSubSession::SAMSubSession (std::shared_ptr<SAMMasterSession> master, const std::string& name, SAMSessionType type, int port):
SAMSession (master->m_Bridge, name, type), masterSession (master), inPort (port)
{
if (Type == eSAMSessionTypeStream)
{
auto d = masterSession->GetLocalDestination ()->CreateStreamingDestination (inPort);
if (d) d->Start ();
}
// TODO: implement datagrams
}
std::shared_ptr<ClientDestination> SAMSubSession::GetLocalDestination ()
{
return masterSession ? masterSession->GetLocalDestination () : nullptr;
}
void SAMSubSession::StopLocalDestination ()
{
auto dest = GetLocalDestination ();
if (dest && Type == eSAMSessionTypeStream)
{
auto d = dest->RemoveStreamingDestination (inPort);
if (d) d->Stop ();
}
// TODO: implement datagrams
}
SAMBridge::SAMBridge (const std::string& address, int port, bool singleThread):
RunnableService ("SAM"), m_IsSingleThread (singleThread),
m_Acceptor (GetIOService (), boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(address), port)),
@ -1295,8 +1342,7 @@ namespace client
}
if (session)
{
session->GetLocalDestination ()->Release ();
session->GetLocalDestination ()->StopAcceptingStreams ();
session->StopLocalDestination ();
session->CloseStreams ();
if (m_IsSingleThread)
{

View file

@ -41,6 +41,8 @@ namespace client
const char SAM_SESSION_CREATE_INVALID_ID[] = "SESSION STATUS RESULT=INVALID_ID\n";
const char SAM_SESSION_STATUS_INVALID_KEY[] = "SESSION STATUS RESULT=INVALID_KEY\n";
const char SAM_SESSION_STATUS_I2P_ERROR[] = "SESSION STATUS RESULT=I2P_ERROR MESSAGE=%s\n";
const char SAM_SESSION_ADD[] = "SESSION ADD";
const char SAM_SESSION_REMOVE[] = "SESSION REMOVE";
const char SAM_STREAM_CONNECT[] = "STREAM CONNECT";
const char SAM_STREAM_STATUS_OK[] = "STREAM STATUS RESULT=OK\n";
const char SAM_STREAM_STATUS_INVALID_ID[] = "STREAM STATUS RESULT=INVALID_ID\n";
@ -135,6 +137,8 @@ namespace client
void ProcessStreamForward (char * buf, size_t len);
void ProcessDestGenerate (char * buf, size_t len);
void ProcessNamingLookup (char * buf, size_t len);
void ProcessSessionAdd (char * buf, size_t len);
void ProcessSessionRemove (char * buf, size_t len);
void SendI2PError(const std::string & msg);
size_t ProcessDatagramSend (char * buf, size_t len, const char * data); // from SAM 1.0
void ExtractParams (char * buf, std::map<std::string, std::string>& params);
@ -187,6 +191,7 @@ namespace client
virtual ~SAMSession () {};
virtual std::shared_ptr<ClientDestination> GetLocalDestination () = 0;
virtual void StopLocalDestination () = 0;
void CloseStreams ();
};
@ -199,6 +204,7 @@ namespace client
~SAMSingleSession ();
std::shared_ptr<ClientDestination> GetLocalDestination () { return localDestination; };
void StopLocalDestination ();
};
struct SAMMasterSession: public SAMSingleSession
@ -207,6 +213,17 @@ namespace client
SAMSingleSession (parent, name, eSAMSessionTypeMaster, dest) {};
};
struct SAMSubSession: public SAMSession
{
std::shared_ptr<SAMMasterSession> masterSession;
int inPort;
SAMSubSession (std::shared_ptr<SAMMasterSession> master, const std::string& name, SAMSessionType type, int port);
// implements SAMSession
std::shared_ptr<ClientDestination> GetLocalDestination ();
void StopLocalDestination ();
};
class SAMBridge: private i2p::util::RunnableService
{
public: