initial implementation of STREAM FORWARD

This commit is contained in:
orignal 2020-10-25 17:20:15 -04:00
parent e9f11e204e
commit b35f43d79e
2 changed files with 92 additions and 9 deletions

View file

@ -54,6 +54,7 @@ namespace client
break;
}
case eSAMSocketTypeAcceptor:
case eSAMSocketTypeForward:
{
if (Session)
{
@ -263,6 +264,8 @@ namespace client
ProcessStreamConnect (separator + 1, bytes_transferred - (separator - m_Buffer) - 1, bytes_transferred - (eol - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_STREAM_ACCEPT))
ProcessStreamAccept (separator + 1, bytes_transferred - (separator - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_STREAM_FORWARD))
ProcessStreamForward (separator + 1, bytes_transferred - (separator - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_DEST_GENERATE))
ProcessDestGenerate (separator + 1, bytes_transferred - (separator - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_NAMING_LOOKUP))
@ -358,12 +361,12 @@ namespace client
std::shared_ptr<boost::asio::ip::udp::endpoint> forward = nullptr;
if ((type == eSAMSessionTypeDatagram || type == eSAMSessionTypeRaw) &&
params.find(SAM_VALUE_HOST) != params.end() && params.find(SAM_VALUE_PORT) != params.end())
params.find(SAM_PARAM_HOST) != params.end() && params.find(SAM_PARAM_PORT) != params.end())
{
// udp forward selected
boost::system::error_code e;
// TODO: support hostnames in udp forward
auto addr = boost::asio::ip::address::from_string(params[SAM_VALUE_HOST], e);
auto addr = boost::asio::ip::address::from_string(params[SAM_PARAM_HOST], e);
if (e)
{
// not an ip address
@ -371,7 +374,7 @@ namespace client
return;
}
auto port = std::stoi(params[SAM_VALUE_PORT]);
auto port = std::stoi(params[SAM_PARAM_PORT]);
if (port == -1)
{
SendI2PError("Invalid port");
@ -565,6 +568,51 @@ namespace client
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
}
void SAMSocket::ProcessStreamForward (char * buf, size_t len)
{
LogPrint (eLogDebug, "SAM: stream forward: ", buf);
std::map<std::string, std::string> params;
ExtractParams (buf, params);
std::string& id = params[SAM_PARAM_ID];
auto session = m_Owner.FindSession (id);
if (!session)
{
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
return;
}
if (session->localDestination->IsAcceptingStreams ())
{
SendI2PError ("Already accepting");
return;
}
auto it = params.find (SAM_PARAM_PORT);
if (it == params.end ())
{
SendI2PError ("PORT is missing");
return;
}
auto port = std::stoi (it->second);
if (port <= 0 || port >= 0xFFFF)
{
SendI2PError ("Invalid PORT");
return;
}
boost::system::error_code ec;
auto ep = m_Socket.remote_endpoint (ec);
if (ec)
{
SendI2PError ("Socket error");
return;
}
ep.port (port);
m_SocketType = eSAMSocketTypeForward;
m_ID = id;
m_IsAccepting = true;
session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PForward,
shared_from_this (), std::placeholders::_1, ep));
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
}
size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data)
{
LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len);
@ -917,6 +965,33 @@ namespace client
LogPrint (eLogWarning, "SAM: I2P acceptor has been reset");
}
void SAMSocket::HandleI2PForward (std::shared_ptr<i2p::stream::Stream> stream,
boost::asio::ip::tcp::endpoint ep)
{
if (stream)
{
LogPrint (eLogDebug, "SAM: incoming forward I2P connection for session ", m_ID);
auto newSocket = std::make_shared<SAMSocket>(m_Owner);
newSocket->SetSocketType (eSAMSocketTypeStream);
auto s = shared_from_this ();
newSocket->GetSocket ().async_connect (ep,
[s, newSocket, stream](const boost::system::error_code& ecode)
{
if (!ecode)
{
s->m_Owner.AddSocket (newSocket);
newSocket->Receive ();
newSocket->m_Stream = stream;
newSocket->I2PReceive ();
}
else
stream->AsyncClose ();
});
}
else
LogPrint (eLogWarning, "SAM: I2P forward acceptor has been reset");
}
void SAMSocket::HandleI2PDatagramReceive (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
{
LogPrint (eLogDebug, "SAM: datagram received ", len);
@ -1072,6 +1147,12 @@ namespace client
std::placeholders::_1, newSocket));
}
void SAMBridge::AddSocket(std::shared_ptr<SAMSocket> socket)
{
std::unique_lock<std::mutex> lock(m_OpenSocketsMutex);
m_OpenSockets.push_back(socket);
}
void SAMBridge::RemoveSocket(const std::shared_ptr<SAMSocket> & socket)
{
std::unique_lock<std::mutex> lock(m_OpenSocketsMutex);
@ -1087,10 +1168,7 @@ namespace client
if (!ec)
{
LogPrint (eLogDebug, "SAM: new connection from ", ep);
{
std::unique_lock<std::mutex> l(m_OpenSocketsMutex);
m_OpenSockets.push_back(socket);
}
AddSocket (socket);
socket->ReceiveHandshake ();
}
else