mirror of
				https://github.com/PurpleI2P/i2pd.git
				synced 2025-11-04 08:30:46 +00:00 
			
		
		
		
	
						commit
						0df68872ab
					
				
					 7 changed files with 140 additions and 161 deletions
				
			
		| 
						 | 
				
			
			@ -660,7 +660,7 @@ namespace http {
 | 
			
		|||
		s << i2p::client::context.GetAddressBook ().ToAddress(ident) << "</a><br>\r\n";
 | 
			
		||||
		s << "<br>\r\n";
 | 
			
		||||
		s << "<b>Streams:</b><br>\r\n";
 | 
			
		||||
		for (const auto& it: session->ListSockets())
 | 
			
		||||
		for (const auto& it: sam->ListSockets(id))
 | 
			
		||||
		{
 | 
			
		||||
			switch (it->GetSocketType ())
 | 
			
		||||
			{
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -727,7 +727,7 @@ namespace client
 | 
			
		|||
				sam_session.put("name", name);
 | 
			
		||||
				sam_session.put("address", i2p::client::context.GetAddressBook ().ToAddress(ident));
 | 
			
		||||
 | 
			
		||||
				for (const auto& socket: it.second->ListSockets())
 | 
			
		||||
				for (const auto& socket: sam->ListSockets(it.first))
 | 
			
		||||
				{
 | 
			
		||||
					boost::property_tree::ptree stream;
 | 
			
		||||
					stream.put("type", socket->GetSocketType ());
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -96,7 +96,7 @@ namespace http {
 | 
			
		|||
      pos_c = url.find('@', pos_p); /* find end of 'user' or 'user:pass' part */
 | 
			
		||||
      if (pos_c != std::string::npos && (pos_s == std::string::npos || pos_s > pos_c)) {
 | 
			
		||||
        std::size_t delim = url.find(':', pos_p);
 | 
			
		||||
        if (delim != std::string::npos && delim < pos_c) {
 | 
			
		||||
        if (delim && delim != std::string::npos && delim < pos_c) {
 | 
			
		||||
          user = url.substr(pos_p, delim - pos_p);
 | 
			
		||||
          delim += 1;
 | 
			
		||||
          pass = url.substr(delim, pos_c - delim);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -578,9 +578,7 @@ namespace stream
 | 
			
		|||
				if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send
 | 
			
		||||
				{
 | 
			
		||||
					m_Status = eStreamStatusClosed;
 | 
			
		||||
					// close could be called from another thread so do SendClose from the destination thread
 | 
			
		||||
					// this is so m_LocalDestination.NewPacket () does not trigger a race condition
 | 
			
		||||
					m_Service.post(std::bind(&Stream::SendClose, shared_from_this()));
 | 
			
		||||
					SendClose();
 | 
			
		||||
				}
 | 
			
		||||
			break;
 | 
			
		||||
			case eStreamStatusClosed:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -165,6 +165,9 @@ namespace stream
 | 
			
		|||
			void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0);
 | 
			
		||||
			size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); };
 | 
			
		||||
 | 
			
		||||
			void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); };
 | 
			
		||||
 | 
			
		||||
			/** only call close from destination thread, use Stream::AsyncClose for other threads */
 | 
			
		||||
			void Close ();
 | 
			
		||||
			void Cancel () { m_ReceiveTimer.cancel (); };
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,8 +15,8 @@ namespace i2p
 | 
			
		|||
{
 | 
			
		||||
namespace client
 | 
			
		||||
{
 | 
			
		||||
	SAMSocket::SAMSocket (SAMBridge& owner, std::shared_ptr<Socket_t> socket):
 | 
			
		||||
		m_Owner (owner), m_Socket(socket), m_Timer (m_Owner.GetService ()),
 | 
			
		||||
	SAMSocket::SAMSocket (SAMBridge& owner):
 | 
			
		||||
		m_Owner (owner), m_Socket(owner.GetService()), m_Timer (m_Owner.GetService ()),
 | 
			
		||||
		m_BufferOffset (0), 
 | 
			
		||||
		m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false),
 | 
			
		||||
		m_IsAccepting (false), m_Stream (nullptr)
 | 
			
		||||
| 
						 | 
				
			
			@ -25,51 +25,17 @@ namespace client
 | 
			
		|||
 | 
			
		||||
	SAMSocket::~SAMSocket ()
 | 
			
		||||
	{
 | 
			
		||||
		if(m_Stream)
 | 
			
		||||
		{
 | 
			
		||||
			m_Stream->Close ();
 | 
			
		||||
			m_Stream.reset ();
 | 
			
		||||
		}
 | 
			
		||||
		auto Session = m_Owner.FindSession(m_ID);
 | 
			
		||||
		
 | 
			
		||||
		switch (m_SocketType)
 | 
			
		||||
		{
 | 
			
		||||
			case eSAMSocketTypeSession:
 | 
			
		||||
				m_Owner.CloseSession (m_ID);
 | 
			
		||||
			break;
 | 
			
		||||
			case eSAMSocketTypeStream:
 | 
			
		||||
			{
 | 
			
		||||
				if (Session)
 | 
			
		||||
					Session->DelSocket (this);
 | 
			
		||||
				break;
 | 
			
		||||
			}
 | 
			
		||||
			case eSAMSocketTypeAcceptor:
 | 
			
		||||
			{
 | 
			
		||||
				if (Session)
 | 
			
		||||
				{
 | 
			
		||||
					Session->DelSocket (this);
 | 
			
		||||
					if (m_IsAccepting && Session->localDestination)
 | 
			
		||||
						Session->localDestination->StopAcceptingStreams ();
 | 
			
		||||
				}
 | 
			
		||||
				break;
 | 
			
		||||
			}
 | 
			
		||||
			default:
 | 
			
		||||
				;
 | 
			
		||||
		}
 | 
			
		||||
		m_SocketType = eSAMSocketTypeTerminated;
 | 
			
		||||
		if (m_Socket && m_Socket->is_open()) m_Socket->close ();
 | 
			
		||||
		m_Socket.reset ();
 | 
			
		||||
		m_Stream = nullptr;
 | 
			
		||||
	}	
 | 
			
		||||
 | 
			
		||||
	void SAMSocket::Terminate (const char* reason)
 | 
			
		||||
	{
 | 
			
		||||
		if(m_Stream)
 | 
			
		||||
		{
 | 
			
		||||
			m_Stream->Close ();
 | 
			
		||||
			m_Stream.reset ();
 | 
			
		||||
			m_Stream->AsyncClose ();
 | 
			
		||||
			m_Stream = nullptr;
 | 
			
		||||
		}
 | 
			
		||||
		auto Session = m_Owner.FindSession(m_ID);
 | 
			
		||||
		
 | 
			
		||||
		switch (m_SocketType)
 | 
			
		||||
		{
 | 
			
		||||
			case eSAMSocketTypeSession:
 | 
			
		||||
| 
						 | 
				
			
			@ -77,15 +43,12 @@ namespace client
 | 
			
		|||
			break;
 | 
			
		||||
			case eSAMSocketTypeStream:
 | 
			
		||||
			{
 | 
			
		||||
				if (Session)
 | 
			
		||||
					Session->DelSocket (this);
 | 
			
		||||
				break;
 | 
			
		||||
			}
 | 
			
		||||
			case eSAMSocketTypeAcceptor:
 | 
			
		||||
			{
 | 
			
		||||
				if (Session)
 | 
			
		||||
				{
 | 
			
		||||
					Session->DelSocket (this);
 | 
			
		||||
					if (m_IsAccepting && Session->localDestination)
 | 
			
		||||
						Session->localDestination->StopAcceptingStreams ();
 | 
			
		||||
				}
 | 
			
		||||
| 
						 | 
				
			
			@ -95,16 +58,20 @@ namespace client
 | 
			
		|||
				;
 | 
			
		||||
		}
 | 
			
		||||
		m_SocketType = eSAMSocketTypeTerminated;
 | 
			
		||||
		if (m_Socket && m_Socket->is_open()) m_Socket->close ();
 | 
			
		||||
		m_Socket.reset ();
 | 
			
		||||
		if (m_Socket.is_open ())
 | 
			
		||||
		{
 | 
			
		||||
			boost::system::error_code ec;
 | 
			
		||||
			m_Socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
 | 
			
		||||
			m_Socket.close ();
 | 
			
		||||
		}
 | 
			
		||||
		m_Owner.RemoveSocket(shared_from_this());
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void SAMSocket::ReceiveHandshake ()
 | 
			
		||||
	{
 | 
			
		||||
		if(m_Socket)
 | 
			
		||||
			m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
 | 
			
		||||
				std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (),
 | 
			
		||||
				std::placeholders::_1, std::placeholders::_2));
 | 
			
		||||
	{		
 | 
			
		||||
		m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
 | 
			
		||||
			std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (),
 | 
			
		||||
			std::placeholders::_1, std::placeholders::_2));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	static bool SAMVersionAcceptable(const std::string & ver)
 | 
			
		||||
| 
						 | 
				
			
			@ -125,7 +92,7 @@ namespace client
 | 
			
		|||
	void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
 | 
			
		||||
	{
 | 
			
		||||
		if (ecode)
 | 
			
		||||
        {
 | 
			
		||||
		{
 | 
			
		||||
			LogPrint (eLogError, "SAM: handshake read error: ", ecode.message ());
 | 
			
		||||
			if (ecode != boost::asio::error::operation_aborted)
 | 
			
		||||
				Terminate ("SAM: handshake read error");
 | 
			
		||||
| 
						 | 
				
			
			@ -184,7 +151,7 @@ namespace client
 | 
			
		|||
#else
 | 
			
		||||
					size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ());
 | 
			
		||||
#endif
 | 
			
		||||
					boost::asio::async_write (*m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (),
 | 
			
		||||
					boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (),
 | 
			
		||||
								std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (),
 | 
			
		||||
						std::placeholders::_1, std::placeholders::_2));
 | 
			
		||||
				}
 | 
			
		||||
| 
						 | 
				
			
			@ -199,17 +166,22 @@ namespace client
 | 
			
		|||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	bool SAMSocket::IsSession(const std::string & id) const
 | 
			
		||||
	{
 | 
			
		||||
		return id == m_ID;
 | 
			
		||||
	}
 | 
			
		||||
	
 | 
			
		||||
	void SAMSocket::HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred)
 | 
			
		||||
	{
 | 
			
		||||
		if (ecode)
 | 
			
		||||
        {
 | 
			
		||||
		{
 | 
			
		||||
			LogPrint (eLogError, "SAM: handshake reply send error: ", ecode.message ());
 | 
			
		||||
			if (ecode != boost::asio::error::operation_aborted)
 | 
			
		||||
				Terminate ("SAM: handshake reply send error");
 | 
			
		||||
		}
 | 
			
		||||
		else if(m_Socket)
 | 
			
		||||
		else
 | 
			
		||||
		{
 | 
			
		||||
			m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
 | 
			
		||||
			m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
 | 
			
		||||
				std::bind(&SAMSocket::HandleMessage, shared_from_this (),
 | 
			
		||||
				std::placeholders::_1, std::placeholders::_2));
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -220,7 +192,7 @@ namespace client
 | 
			
		|||
		LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg);
 | 
			
		||||
 | 
			
		||||
		if (!m_IsSilent)
 | 
			
		||||
			boost::asio::async_write (*m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (),
 | 
			
		||||
			boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (),
 | 
			
		||||
				std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (),
 | 
			
		||||
				std::placeholders::_1, std::placeholders::_2, close));
 | 
			
		||||
		else
 | 
			
		||||
| 
						 | 
				
			
			@ -235,7 +207,7 @@ namespace client
 | 
			
		|||
	void SAMSocket::HandleMessageReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred, bool close)
 | 
			
		||||
	{
 | 
			
		||||
		if (ecode)
 | 
			
		||||
        {
 | 
			
		||||
		{
 | 
			
		||||
			LogPrint (eLogError, "SAM: reply send error: ", ecode.message ());
 | 
			
		||||
			if (ecode != boost::asio::error::operation_aborted)
 | 
			
		||||
				Terminate ("SAM: reply send error");
 | 
			
		||||
| 
						 | 
				
			
			@ -252,7 +224,7 @@ namespace client
 | 
			
		|||
	void SAMSocket::HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred)
 | 
			
		||||
	{
 | 
			
		||||
		if (ecode)
 | 
			
		||||
        {
 | 
			
		||||
		{
 | 
			
		||||
			LogPrint (eLogError, "SAM: read error: ", ecode.message ());
 | 
			
		||||
			if (ecode != boost::asio::error::operation_aborted)
 | 
			
		||||
				Terminate ("SAM: read error");
 | 
			
		||||
| 
						 | 
				
			
			@ -501,7 +473,6 @@ namespace client
 | 
			
		|||
		if(session)
 | 
			
		||||
		{
 | 
			
		||||
			m_SocketType = eSAMSocketTypeStream;
 | 
			
		||||
			session->AddSocket (shared_from_this ());
 | 
			
		||||
			m_Stream = session->localDestination->CreateStream (remote);
 | 
			
		||||
			m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send
 | 
			
		||||
			m_BufferOffset = 0;
 | 
			
		||||
| 
						 | 
				
			
			@ -534,7 +505,6 @@ namespace client
 | 
			
		|||
		if (session)
 | 
			
		||||
		{
 | 
			
		||||
			m_SocketType = eSAMSocketTypeAcceptor;
 | 
			
		||||
			session->AddSocket (shared_from_this ());
 | 
			
		||||
			if (!session->localDestination->IsAcceptingStreams ())
 | 
			
		||||
			{
 | 
			
		||||
				m_IsAccepting = true;	
 | 
			
		||||
| 
						 | 
				
			
			@ -599,7 +569,7 @@ namespace client
 | 
			
		|||
			keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ());
 | 
			
		||||
#else
 | 
			
		||||
		size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY,
 | 
			
		||||
		    keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ());
 | 
			
		||||
			keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ());
 | 
			
		||||
#endif
 | 
			
		||||
		SendMessageReply (m_Buffer, l, false);
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -704,17 +674,9 @@ namespace client
 | 
			
		|||
 | 
			
		||||
	void SAMSocket::Receive ()
 | 
			
		||||
	{
 | 
			
		||||
		if (m_BufferOffset >= SAM_SOCKET_BUFFER_SIZE)
 | 
			
		||||
		{
 | 
			
		||||
			LogPrint (eLogError, "SAM: Buffer is full, terminate");
 | 
			
		||||
			Terminate ("Buffer is full");
 | 
			
		||||
			return;
 | 
			
		||||
		} else if (m_Socket)
 | 
			
		||||
			m_Socket->async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
 | 
			
		||||
				std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage,
 | 
			
		||||
				shared_from_this (), std::placeholders::_1, std::placeholders::_2));
 | 
			
		||||
		else
 | 
			
		||||
			LogPrint(eLogError, "SAM: receive with no native socket");
 | 
			
		||||
		m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
 | 
			
		||||
			std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage,
 | 
			
		||||
			shared_from_this (), std::placeholders::_1, std::placeholders::_2));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
 | 
			
		||||
| 
						 | 
				
			
			@ -731,15 +693,12 @@ namespace client
 | 
			
		|||
			{
 | 
			
		||||
				bytes_transferred += m_BufferOffset;
 | 
			
		||||
				m_BufferOffset = 0;
 | 
			
		||||
				auto s = shared_from_this ();
 | 
			
		||||
				m_Stream->AsyncSend ((uint8_t *)m_Buffer, bytes_transferred,
 | 
			
		||||
					[s](const boost::system::error_code& ecode)
 | 
			
		||||
				    {
 | 
			
		||||
						if (!ecode)
 | 
			
		||||
							s->m_Owner.GetService ().post ([s] { s->Receive (); });
 | 
			
		||||
						else	
 | 
			
		||||
							s->m_Owner.GetService ().post ([s] { s->Terminate ("AsyncSend failed"); });
 | 
			
		||||
					});
 | 
			
		||||
					std::bind(&SAMSocket::HandleStreamSend, shared_from_this(), std::placeholders::_1));
 | 
			
		||||
			}
 | 
			
		||||
			else
 | 
			
		||||
			{
 | 
			
		||||
				Terminate("No Stream Remaining");
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -766,21 +725,21 @@ namespace client
 | 
			
		|||
					WriteI2PDataImmediate(buff, len);
 | 
			
		||||
				}
 | 
			
		||||
				else // no more data
 | 
			
		||||
				{
 | 
			
		||||
					delete [] buff;
 | 
			
		||||
					Terminate ("no more data");
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void SAMSocket::WriteI2PDataImmediate(uint8_t * buff, size_t sz)
 | 
			
		||||
	{
 | 
			
		||||
		if(m_Socket)
 | 
			
		||||
			boost::asio::async_write (
 | 
			
		||||
				*m_Socket,
 | 
			
		||||
				boost::asio::buffer (buff, sz),
 | 
			
		||||
				boost::asio::transfer_all(),
 | 
			
		||||
				std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination
 | 
			
		||||
		else
 | 
			
		||||
			LogPrint(eLogError, "SAM: no native socket");
 | 
			
		||||
		boost::asio::async_write (
 | 
			
		||||
			m_Socket,
 | 
			
		||||
			boost::asio::buffer (buff, sz),
 | 
			
		||||
			boost::asio::transfer_all(),
 | 
			
		||||
			std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void SAMSocket::HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff)
 | 
			
		||||
| 
						 | 
				
			
			@ -790,9 +749,11 @@ namespace client
 | 
			
		|||
	
 | 
			
		||||
	void SAMSocket::WriteI2PData(size_t sz)
 | 
			
		||||
	{
 | 
			
		||||
		uint8_t * sendbuff = new uint8_t[sz];
 | 
			
		||||
		memcpy(sendbuff, m_StreamBuffer, sz);
 | 
			
		||||
		WriteI2PDataImmediate(sendbuff, sz);
 | 
			
		||||
		boost::asio::async_write (
 | 
			
		||||
			m_Socket,
 | 
			
		||||
			boost::asio::buffer (m_StreamBuffer, sz),
 | 
			
		||||
			boost::asio::transfer_all(),
 | 
			
		||||
			std::bind(&SAMSocket::HandleWriteI2PData, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
 | 
			
		||||
	}
 | 
			
		||||
	
 | 
			
		||||
	void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred)
 | 
			
		||||
| 
						 | 
				
			
			@ -826,7 +787,8 @@ namespace client
 | 
			
		|||
				{
 | 
			
		||||
					WriteI2PData(bytes_transferred);
 | 
			
		||||
				}
 | 
			
		||||
				I2PReceive();
 | 
			
		||||
				else
 | 
			
		||||
					I2PReceive();
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -858,7 +820,7 @@ namespace client
 | 
			
		|||
			if (session)
 | 
			
		||||
			{
 | 
			
		||||
				// find more pending acceptors
 | 
			
		||||
				for (auto it: session->ListSockets ())
 | 
			
		||||
				for (auto & it: m_Owner.ListSockets (m_ID))
 | 
			
		||||
					if (it->m_SocketType == eSAMSocketTypeAcceptor)
 | 
			
		||||
					{
 | 
			
		||||
						it->m_IsAccepting = true;
 | 
			
		||||
| 
						 | 
				
			
			@ -930,29 +892,30 @@ namespace client
 | 
			
		|||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	SAMSession::SAMSession (std::shared_ptr<ClientDestination> dest):
 | 
			
		||||
	void SAMSocket::HandleStreamSend(const boost::system::error_code & ec)
 | 
			
		||||
	{
 | 
			
		||||
		m_Owner.GetService ().post (std::bind( !ec ? &SAMSocket::Receive : &SAMSocket::TerminateClose, shared_from_this()));
 | 
			
		||||
	}
 | 
			
		||||
	
 | 
			
		||||
	SAMSession::SAMSession (SAMBridge & parent, const std::string & id, std::shared_ptr<ClientDestination> dest):
 | 
			
		||||
		m_Bridge(parent),
 | 
			
		||||
		localDestination (dest),
 | 
			
		||||
		UDPEndpoint(nullptr)
 | 
			
		||||
		UDPEndpoint(nullptr),
 | 
			
		||||
		Name(id)
 | 
			
		||||
	{
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	
 | 
			
		||||
	SAMSession::~SAMSession ()
 | 
			
		||||
	{
 | 
			
		||||
		CloseStreams();
 | 
			
		||||
		i2p::client::context.DeleteLocalDestination (localDestination);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void SAMSession::CloseStreams ()
 | 
			
		||||
	{
 | 
			
		||||
		std::vector<std::shared_ptr<SAMSocket> > socks;
 | 
			
		||||
		for(const auto & itr : m_Bridge.ListSockets(Name))
 | 
			
		||||
		{
 | 
			
		||||
			std::lock_guard<std::mutex> lock(m_SocketsMutex);
 | 
			
		||||
			for (const auto& sock : m_Sockets) {
 | 
			
		||||
				socks.push_back(sock);
 | 
			
		||||
			}
 | 
			
		||||
			itr->Terminate(nullptr);
 | 
			
		||||
		}
 | 
			
		||||
                for (auto & sock : socks ) sock->Terminate("SAMSession::CloseStreams()");
 | 
			
		||||
		m_Sockets.clear();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	SAMBridge::SAMBridge (const std::string& address, int port):
 | 
			
		||||
| 
						 | 
				
			
			@ -1009,12 +972,17 @@ namespace client
 | 
			
		|||
 | 
			
		||||
	void SAMBridge::Accept ()
 | 
			
		||||
	{
 | 
			
		||||
		auto native = std::make_shared<boost::asio::ip::tcp::socket>(m_Service);
 | 
			
		||||
		auto newSocket = std::make_shared<SAMSocket> (*this, native);
 | 
			
		||||
		m_Acceptor.async_accept (*native, std::bind (&SAMBridge::HandleAccept, this,
 | 
			
		||||
		auto newSocket = std::make_shared<SAMSocket>(*this);
 | 
			
		||||
		m_Acceptor.async_accept (newSocket->GetSocket(), std::bind (&SAMBridge::HandleAccept, this,
 | 
			
		||||
			std::placeholders::_1, newSocket));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	void SAMBridge::RemoveSocket(const std::shared_ptr<SAMSocket> & socket)
 | 
			
		||||
	{
 | 
			
		||||
		std::unique_lock<std::mutex> lock(m_OpenSocketsMutex);
 | 
			
		||||
		m_OpenSockets.remove_if([socket](const std::shared_ptr<SAMSocket> & item) -> bool { return item == socket; });
 | 
			
		||||
	}
 | 
			
		||||
	
 | 
			
		||||
	void SAMBridge::HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<SAMSocket> socket)
 | 
			
		||||
	{
 | 
			
		||||
		if (!ecode)
 | 
			
		||||
| 
						 | 
				
			
			@ -1024,6 +992,10 @@ namespace client
 | 
			
		|||
			if (!ec)
 | 
			
		||||
			{
 | 
			
		||||
				LogPrint (eLogDebug, "SAM: new connection from ", ep);
 | 
			
		||||
				{
 | 
			
		||||
					std::unique_lock<std::mutex> l(m_OpenSocketsMutex);
 | 
			
		||||
					m_OpenSockets.push_back(socket);
 | 
			
		||||
				}
 | 
			
		||||
				socket->ReceiveHandshake ();
 | 
			
		||||
			}
 | 
			
		||||
			else
 | 
			
		||||
| 
						 | 
				
			
			@ -1066,7 +1038,7 @@ namespace client
 | 
			
		|||
		if (localDestination)
 | 
			
		||||
		{
 | 
			
		||||
			localDestination->Acquire ();
 | 
			
		||||
			auto session = std::make_shared<SAMSession>(localDestination);
 | 
			
		||||
			auto session = std::make_shared<SAMSession>(*this, id, localDestination);
 | 
			
		||||
			std::unique_lock<std::mutex> l(m_SessionsMutex);
 | 
			
		||||
			auto ret = m_Sessions.insert (std::make_pair(id, session));
 | 
			
		||||
			if (!ret.second)
 | 
			
		||||
| 
						 | 
				
			
			@ -1105,6 +1077,18 @@ namespace client
 | 
			
		|||
		return nullptr;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	std::list<std::shared_ptr<SAMSocket> > SAMBridge::ListSockets(const std::string & id) const
 | 
			
		||||
	{
 | 
			
		||||
		std::list<std::shared_ptr<SAMSocket > > list;
 | 
			
		||||
		{
 | 
			
		||||
			std::unique_lock<std::mutex> l(m_OpenSocketsMutex);
 | 
			
		||||
			for (const auto & itr : m_OpenSockets)
 | 
			
		||||
				if (itr->IsSession(id))
 | 
			
		||||
					list.push_back(itr);
 | 
			
		||||
		}
 | 
			
		||||
		return list;
 | 
			
		||||
	}
 | 
			
		||||
	
 | 
			
		||||
	void SAMBridge::SendTo(const uint8_t * buf, size_t len, std::shared_ptr<boost::asio::ip::udp::endpoint> remote)
 | 
			
		||||
	{
 | 
			
		||||
		if(remote)
 | 
			
		||||
| 
						 | 
				
			
			@ -1127,33 +1111,38 @@ namespace client
 | 
			
		|||
		{
 | 
			
		||||
			m_DatagramReceiveBuffer[bytes_transferred] = 0;
 | 
			
		||||
			char * eol = strchr ((char *)m_DatagramReceiveBuffer, '\n');
 | 
			
		||||
			*eol = 0; eol++;
 | 
			
		||||
			size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer);
 | 
			
		||||
			LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen);
 | 
			
		||||
			char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' ');
 | 
			
		||||
			if (sessionID)
 | 
			
		||||
			if(eol)
 | 
			
		||||
			{
 | 
			
		||||
				sessionID++;
 | 
			
		||||
				char * destination = strchr (sessionID, ' ');
 | 
			
		||||
				if (destination)
 | 
			
		||||
				*eol = 0; eol++;
 | 
			
		||||
				size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer);
 | 
			
		||||
				LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen);
 | 
			
		||||
				char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' ');
 | 
			
		||||
				if (sessionID)
 | 
			
		||||
				{
 | 
			
		||||
					*destination = 0; destination++;
 | 
			
		||||
					auto session = FindSession (sessionID);
 | 
			
		||||
					if (session)
 | 
			
		||||
					sessionID++;
 | 
			
		||||
					char * destination = strchr (sessionID, ' ');
 | 
			
		||||
					if (destination)
 | 
			
		||||
					{
 | 
			
		||||
						i2p::data::IdentityEx dest;
 | 
			
		||||
						dest.FromBase64 (destination);
 | 
			
		||||
						session->localDestination->GetDatagramDestination ()->
 | 
			
		||||
							SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ());
 | 
			
		||||
						*destination = 0; destination++;
 | 
			
		||||
						auto session = FindSession (sessionID);
 | 
			
		||||
						if (session)
 | 
			
		||||
						{
 | 
			
		||||
							i2p::data::IdentityEx dest;
 | 
			
		||||
							dest.FromBase64 (destination);
 | 
			
		||||
							session->localDestination->GetDatagramDestination ()->
 | 
			
		||||
								SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ());
 | 
			
		||||
						}
 | 
			
		||||
						else
 | 
			
		||||
							LogPrint (eLogError, "SAM: Session ", sessionID, " not found");
 | 
			
		||||
					}
 | 
			
		||||
					else
 | 
			
		||||
						LogPrint (eLogError, "SAM: Session ", sessionID, " not found");
 | 
			
		||||
						LogPrint (eLogError, "SAM: Missing destination key");
 | 
			
		||||
				}
 | 
			
		||||
				else
 | 
			
		||||
					LogPrint (eLogError, "SAM: Missing destination key");
 | 
			
		||||
					LogPrint (eLogError, "SAM: Missing sessionID");
 | 
			
		||||
			}
 | 
			
		||||
			else
 | 
			
		||||
				LogPrint (eLogError, "SAM: Missing sessionID");
 | 
			
		||||
				LogPrint(eLogError, "SAM: invalid datagram");
 | 
			
		||||
			ReceiveDatagram ();
 | 
			
		||||
		}
 | 
			
		||||
		else
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -82,18 +82,21 @@ namespace client
 | 
			
		|||
		public:
 | 
			
		||||
 | 
			
		||||
			typedef boost::asio::ip::tcp::socket Socket_t;
 | 
			
		||||
			SAMSocket (SAMBridge& owner, std::shared_ptr<Socket_t> socket);
 | 
			
		||||
			SAMSocket (SAMBridge& owner);
 | 
			
		||||
			~SAMSocket ();			
 | 
			
		||||
 | 
			
		||||
			boost::asio::ip::tcp::socket& GetSocket () { return *m_Socket; };
 | 
			
		||||
			Socket_t& GetSocket () { return m_Socket; };
 | 
			
		||||
			void ReceiveHandshake ();
 | 
			
		||||
			void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; };
 | 
			
		||||
			SAMSocketType GetSocketType () const { return m_SocketType; };
 | 
			
		||||
 | 
			
		||||
			void Terminate (const char* reason);
 | 
			
		||||
 | 
			
		||||
			bool IsSession(const std::string & id) const;
 | 
			
		||||
		
 | 
			
		||||
		 private:
 | 
			
		||||
 | 
			
		||||
			void TerminateClose() { Terminate(nullptr); }
 | 
			
		||||
		
 | 
			
		||||
			void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred);
 | 
			
		||||
			void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred);
 | 
			
		||||
			void HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred);
 | 
			
		||||
| 
						 | 
				
			
			@ -128,10 +131,12 @@ namespace client
 | 
			
		|||
			void WriteI2PDataImmediate(uint8_t * ptr, size_t sz);
 | 
			
		||||
 | 
			
		||||
			void HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff);
 | 
			
		||||
			void HandleStreamSend(const boost::system::error_code & ec);
 | 
			
		||||
		
 | 
			
		||||
		private:
 | 
			
		||||
 | 
			
		||||
			SAMBridge& m_Owner;
 | 
			
		||||
			std::shared_ptr<Socket_t> m_Socket;
 | 
			
		||||
			Socket_t m_Socket;
 | 
			
		||||
			boost::asio::deadline_timer m_Timer;
 | 
			
		||||
			char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1];
 | 
			
		||||
			size_t m_BufferOffset;
 | 
			
		||||
| 
						 | 
				
			
			@ -145,34 +150,12 @@ namespace client
 | 
			
		|||
 | 
			
		||||
	struct SAMSession
 | 
			
		||||
	{
 | 
			
		||||
		SAMBridge & m_Bridge;
 | 
			
		||||
		std::shared_ptr<ClientDestination> localDestination;
 | 
			
		||||
		std::list<std::shared_ptr<SAMSocket> > m_Sockets;
 | 
			
		||||
		std::shared_ptr<boost::asio::ip::udp::endpoint> UDPEndpoint;
 | 
			
		||||
		std::mutex m_SocketsMutex;
 | 
			
		||||
		std::string Name;
 | 
			
		||||
 | 
			
		||||
		/** safely add a socket to this session */
 | 
			
		||||
		void AddSocket(std::shared_ptr<SAMSocket> sock) {
 | 
			
		||||
			std::lock_guard<std::mutex> lock(m_SocketsMutex);
 | 
			
		||||
			m_Sockets.push_back(sock);
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		/** safely remove a socket from this session */
 | 
			
		||||
		void DelSocket(SAMSocket * sock) {
 | 
			
		||||
			std::lock_guard<std::mutex> lock(m_SocketsMutex);
 | 
			
		||||
			m_Sockets.remove_if([sock](const std::shared_ptr<SAMSocket> s) -> bool { return s.get() == sock; });
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		/** get a list holding a copy of all sam sockets from this session */
 | 
			
		||||
		std::list<std::shared_ptr<SAMSocket> > ListSockets() {
 | 
			
		||||
			std::list<std::shared_ptr<SAMSocket> > l;
 | 
			
		||||
			{
 | 
			
		||||
				std::lock_guard<std::mutex> lock(m_SocketsMutex);
 | 
			
		||||
				for(const auto& sock : m_Sockets ) l.push_back(sock);
 | 
			
		||||
			}
 | 
			
		||||
			return l;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		SAMSession (std::shared_ptr<ClientDestination> dest);
 | 
			
		||||
		SAMSession (SAMBridge & parent, const std::string & name, std::shared_ptr<ClientDestination> dest);
 | 
			
		||||
		~SAMSession ();
 | 
			
		||||
 | 
			
		||||
		void CloseStreams ();
 | 
			
		||||
| 
						 | 
				
			
			@ -189,14 +172,18 @@ namespace client
 | 
			
		|||
			void Stop ();
 | 
			
		||||
 | 
			
		||||
			boost::asio::io_service& GetService () { return m_Service; };
 | 
			
		||||
			std::shared_ptr<SAMSession> CreateSession (const std::string& id, const std::string& destination, // empty string  means transient
 | 
			
		||||
			std::shared_ptr<SAMSession> CreateSession (const std::string& id, const std::string& destination, // empty string	 means transient
 | 
			
		||||
				const std::map<std::string, std::string> * params);
 | 
			
		||||
			void CloseSession (const std::string& id);
 | 
			
		||||
			std::shared_ptr<SAMSession> FindSession (const std::string& id) const;
 | 
			
		||||
 | 
			
		||||
			std::list<std::shared_ptr<SAMSocket> > ListSockets(const std::string & id) const;
 | 
			
		||||
 | 
			
		||||
			/** send raw data to remote endpoint from our UDP Socket */
 | 
			
		||||
			void SendTo(const uint8_t * buf, size_t len, std::shared_ptr<boost::asio::ip::udp::endpoint> remote);
 | 
			
		||||
 | 
			
		||||
			void RemoveSocket(const std::shared_ptr<SAMSocket> & socket);
 | 
			
		||||
		
 | 
			
		||||
		private:
 | 
			
		||||
 | 
			
		||||
			void Run ();
 | 
			
		||||
| 
						 | 
				
			
			@ -217,6 +204,8 @@ namespace client
 | 
			
		|||
			boost::asio::ip::udp::socket m_DatagramSocket;
 | 
			
		||||
			mutable std::mutex m_SessionsMutex;
 | 
			
		||||
			std::map<std::string, std::shared_ptr<SAMSession> > m_Sessions;
 | 
			
		||||
			mutable std::mutex m_OpenSocketsMutex;
 | 
			
		||||
			std::list<std::shared_ptr<SAMSocket> > m_OpenSockets;
 | 
			
		||||
			uint8_t m_DatagramReceiveBuffer[i2p::datagram::MAX_DATAGRAM_SIZE+1];
 | 
			
		||||
 | 
			
		||||
		public:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue