parse SAM commands

This commit is contained in:
orignal 2014-09-28 09:05:37 -04:00
parent 05c297cb0b
commit f7325f6c4f

61
SAM.cpp
View file

@ -80,16 +80,16 @@ namespace stream
{ {
m_Buffer[bytes_transferred] = 0; m_Buffer[bytes_transferred] = 0;
LogPrint ("SAM handshake ", m_Buffer); LogPrint ("SAM handshake ", m_Buffer);
if (!memcmp (m_Buffer, SAM_HANDSHAKE, sizeof (SAM_HANDSHAKE))) if (!memcmp (m_Buffer, SAM_HANDSHAKE, strlen (SAM_HANDSHAKE)))
{ {
// TODO: check version // TODO: check version
boost::asio::async_write (m_Socket, boost::asio::buffer (SAM_HANDSHAKE_REPLY, sizeof (SAM_HANDSHAKE_REPLY)), boost::asio::transfer_all (), boost::asio::async_write (m_Socket, boost::asio::buffer (SAM_HANDSHAKE_REPLY, strlen (SAM_HANDSHAKE_REPLY)), boost::asio::transfer_all (),
boost::bind(&SAMSocket::HandleHandshakeReplySent, this, boost::bind(&SAMSocket::HandleHandshakeReplySent, this,
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
} }
else else
{ {
LogPrint ("SAM hannshake mismatch"); LogPrint ("SAM handshake mismatch");
Terminate (); Terminate ();
} }
} }
@ -150,12 +150,18 @@ namespace stream
if (eol) if (eol)
{ {
*eol = 0; *eol = 0;
char * separator = strchr (m_Buffer, ' ');
if (separator)
separator = strchr (separator + 1, ' ');
if (separator)
{
*separator = 0;
if (!strcmp (m_Buffer, SAM_SESSION_CREATE)) if (!strcmp (m_Buffer, SAM_SESSION_CREATE))
ProcessSessionCreate (eol + 1, bytes_transferred - (eol - m_Buffer) - 1); ProcessSessionCreate (separator + 1, bytes_transferred - (separator - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_STREAM_CONNECT)) else if (!strcmp (m_Buffer, SAM_STREAM_CONNECT))
ProcessStreamConnect (eol + 1, bytes_transferred - (eol - m_Buffer) - 1); ProcessStreamConnect (separator + 1, bytes_transferred - (separator - m_Buffer) - 1);
else if (!strcmp (m_Buffer, SAM_STREAM_ACCEPT)) else if (!strcmp (m_Buffer, SAM_STREAM_ACCEPT))
ProcessStreamAccept (eol + 1, bytes_transferred - (eol - m_Buffer) - 1); ProcessStreamAccept (separator + 1, bytes_transferred - (separator - m_Buffer) - 1);
else else
{ {
LogPrint ("SAM unexpected message ", m_Buffer); LogPrint ("SAM unexpected message ", m_Buffer);
@ -168,10 +174,17 @@ namespace stream
Terminate (); Terminate ();
} }
} }
else
{
LogPrint ("SAM malformed message");
Terminate ();
}
}
} }
void SAMSocket::ProcessSessionCreate (char * buf, size_t len) void SAMSocket::ProcessSessionCreate (char * buf, size_t len)
{ {
LogPrint ("SAM session create: ", buf);
std::map<std::string, std::string> params; std::map<std::string, std::string> params;
ExtractParams (buf, len, params); ExtractParams (buf, len, params);
std::string& id = params[SAM_PARAM_ID]; std::string& id = params[SAM_PARAM_ID];
@ -181,20 +194,21 @@ namespace stream
if (session) if (session)
{ {
m_SocketType = eSAMSocketTypeSession; m_SocketType = eSAMSocketTypeSession;
memcpy (m_Buffer, SAM_SESSION_CREATE_REPLY_OK, sizeof (SAM_SESSION_CREATE_REPLY_OK)); size_t l = strlen (SAM_SESSION_CREATE_REPLY_OK);
memcpy (m_Buffer, SAM_SESSION_CREATE_REPLY_OK, l);
uint8_t ident[1024]; uint8_t ident[1024];
size_t l = session->localDestination->GetPrivateKeys ().ToBuffer (ident, 1024); size_t l1 = session->localDestination->GetPrivateKeys ().ToBuffer (ident, 1024);
size_t l1 = i2p::data::ByteStreamToBase64 (ident, l, m_Buffer + sizeof (SAM_SESSION_CREATE_REPLY_OK), size_t l2 = i2p::data::ByteStreamToBase64 (ident, l1, m_Buffer + l, SAM_SOCKET_BUFFER_SIZE - l);
SAM_SOCKET_BUFFER_SIZE - sizeof (SAM_SESSION_CREATE_REPLY_OK)); m_Buffer[l + l2] = '\n';
m_Buffer[sizeof (SAM_SESSION_CREATE_REPLY_OK) + l1] = '\n'; SendMessageReply (m_Buffer, l + l2 + 1, false);
SendMessageReply (m_Buffer, sizeof (SAM_SESSION_CREATE_REPLY_OK) + l1 + 1, false);
} }
else else
SendMessageReply (SAM_SESSION_CREATE_DUPLICATED_ID, sizeof(SAM_SESSION_CREATE_DUPLICATED_ID), true); SendMessageReply (SAM_SESSION_CREATE_DUPLICATED_ID, strlen(SAM_SESSION_CREATE_DUPLICATED_ID), true);
} }
void SAMSocket::ProcessStreamConnect (char * buf, size_t len) void SAMSocket::ProcessStreamConnect (char * buf, size_t len)
{ {
LogPrint ("SAM stream connect: ", buf);
std::map<std::string, std::string> params; std::map<std::string, std::string> params;
ExtractParams (buf, len, params); ExtractParams (buf, len, params);
std::string& id = params[SAM_PARAM_ID]; std::string& id = params[SAM_PARAM_ID];
@ -215,20 +229,21 @@ namespace stream
m_Stream = session->localDestination->CreateNewOutgoingStream (*leaseSet); m_Stream = session->localDestination->CreateNewOutgoingStream (*leaseSet);
m_Stream->Send ((uint8_t *)m_Buffer, 0, 0); // connect m_Stream->Send ((uint8_t *)m_Buffer, 0, 0); // connect
I2PReceive (); I2PReceive ();
SendMessageReply (SAM_STREAM_STATUS_OK, sizeof(SAM_STREAM_STATUS_OK), false); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
} }
else else
{ {
i2p::data::netdb.Subscribe (dest.GetIdentHash ()); i2p::data::netdb.Subscribe (dest.GetIdentHash ());
SendMessageReply (SAM_STREAM_STATUS_CANT_REACH_PEER, sizeof(SAM_STREAM_STATUS_CANT_REACH_PEER), true); SendMessageReply (SAM_STREAM_STATUS_CANT_REACH_PEER, strlen(SAM_STREAM_STATUS_CANT_REACH_PEER), true);
} }
} }
else else
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, sizeof(SAM_STREAM_STATUS_INVALID_ID), true); SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
} }
void SAMSocket::ProcessStreamAccept (char * buf, size_t len) void SAMSocket::ProcessStreamAccept (char * buf, size_t len)
{ {
LogPrint ("SAM stream accept: ", buf);
std::map<std::string, std::string> params; std::map<std::string, std::string> params;
ExtractParams (buf, len, params); ExtractParams (buf, len, params);
std::string& id = params[SAM_PARAM_ID]; std::string& id = params[SAM_PARAM_ID];
@ -241,20 +256,20 @@ namespace stream
m_SocketType = eSAMSocketTypeAcceptor; m_SocketType = eSAMSocketTypeAcceptor;
session->sockets.push_back (this); session->sockets.push_back (this);
session->localDestination->SetAcceptor (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1)); session->localDestination->SetAcceptor (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1));
SendMessageReply (SAM_STREAM_STATUS_OK, sizeof(SAM_STREAM_STATUS_OK), false); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
} }
else else
SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, sizeof(SAM_STREAM_STATUS_I2P_ERROR), true); SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true);
} }
else else
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, sizeof(SAM_STREAM_STATUS_INVALID_ID), true); SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
} }
void SAMSocket::ExtractParams (char * buf, size_t len, std::map<std::string, std::string>& params) void SAMSocket::ExtractParams (char * buf, size_t len, std::map<std::string, std::string>& params)
{ {
while (char * eol = strchr (buf, '\n')) while (char * separator = strchr (buf, ' '))
{ {
*eol = 0; *separator = 0;
char * value = strchr (buf, '='); char * value = strchr (buf, '=');
if (value) if (value)
{ {
@ -262,7 +277,7 @@ namespace stream
value++; value++;
params[buf] = value; params[buf] = value;
} }
buf = eol + 1; buf = separator + 1;
} }
} }
@ -349,6 +364,7 @@ namespace stream
void SAMBridge::Start () void SAMBridge::Start ()
{ {
Accept (); Accept ();
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&SAMBridge::Run, this)); m_Thread = new std::thread (std::bind (&SAMBridge::Run, this));
} }
@ -395,6 +411,7 @@ namespace stream
} }
else else
{ {
LogPrint ("SAM accept error: ", ecode.message ());
delete m_NewSocket; delete m_NewSocket;
m_NewSocket = nullptr; m_NewSocket = nullptr;
} }