From 82f9585b7ab3c1ab7120ef0949413d72c7f9ff74 Mon Sep 17 00:00:00 2001
From: orignal <i2porignal@yandex.ru>
Date: Sat, 2 Apr 2022 13:05:11 -0400
Subject: [PATCH] handle fragments

---
 libi2pd/SSU2.cpp | 120 ++++++++++++++++++++++++++++++++++++++++++++++-
 libi2pd/SSU2.h   |  22 ++++++++-
 2 files changed, 140 insertions(+), 2 deletions(-)

diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp
index 98ab2f1b..ebc6d675 100644
--- a/libi2pd/SSU2.cpp
+++ b/libi2pd/SSU2.cpp
@@ -729,9 +729,13 @@ namespace transport
 					break;	
 				}	
 				case eSSU2BlkFirstFragment:
+					LogPrint (eLogDebug, "SSU2: First fragment");
+					HandleFirstFragment (buf + offset, size);
 					isData = true;
 				break;	
 				case eSSU2BlkFollowOnFragment:
+					LogPrint (eLogDebug, "SSU2: Follow-on fragment");
+					HandleFollowOnFragment (buf + offset, size);
 					isData = true;
 				break;	
 				case eSSU2BlkTermination:
@@ -824,6 +828,103 @@ namespace transport
 		it1--;
 		m_SentPackets.erase (it, it1);
 	}	
+
+	void SSU2Session::HandleFirstFragment (const uint8_t * buf, size_t len)
+	{
+		uint32_t msgID; memcpy (&msgID, buf + 1, 4);
+		auto msg = NewI2NPMessage ();
+		// same format as I2NP message block
+		msg->len = msg->offset + len + 7; 
+		memcpy (msg->GetNTCP2Header (), buf, len);
+		msg->FromNTCP2 ();
+		std::shared_ptr<SSU2IncompleteMessage> m;
+		bool found = false;
+		auto it = m_IncompleteMessages.find (msgID);
+		if (it != m_IncompleteMessages.end ())
+		{
+			found = true;	
+			m = it->second;
+		}	
+		else
+		{	
+			m = std::make_shared<SSU2IncompleteMessage>();
+			m_IncompleteMessages.emplace (msgID, m);
+		}	
+		m->msg = msg;
+		m->nextFragmentNum = 1;
+		m->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch ();
+		if (found && ConcatOutOfSequenceFragments (m))
+		{
+			// we have all follow-on fragments already
+			m_Handler.PutNextMessage (std::move (m->msg));
+			m_IncompleteMessages.erase (it);
+		}	
+	}	
+
+	void SSU2Session::HandleFollowOnFragment (const uint8_t * buf, size_t len)
+	{
+		if (len < 5) return;
+		uint8_t fragmentNum = buf[0] >> 1;
+		bool isLast = buf[0] & 0x01;
+		uint32_t msgID; memcpy (&msgID, buf + 1, 4);
+		auto it = m_IncompleteMessages.find (msgID);
+		if (it != m_IncompleteMessages.end ())
+		{
+			if (it->second->nextFragmentNum == fragmentNum && it->second->msg)
+			{
+				// in sequence
+				it->second->msg->Concat (buf + 5, len - 5);
+				if (isLast)
+				{
+					m_Handler.PutNextMessage (std::move (it->second->msg));
+					m_IncompleteMessages.erase (it);
+				}
+				else
+				{
+					it->second->nextFragmentNum++;
+					if (ConcatOutOfSequenceFragments (it->second))
+					{
+						m_Handler.PutNextMessage (std::move (it->second->msg));
+						m_IncompleteMessages.erase (it);
+					}	
+					else	
+						it->second->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch ();
+				}	
+				return;
+			}
+		}	
+		else
+		{
+			// follow-on fragment before first fragment
+			auto msg = std::make_shared<SSU2IncompleteMessage> ();
+			msg->nextFragmentNum = 0;
+			it = m_IncompleteMessages.emplace (msgID, msg).first;
+		}	
+		// insert out of sequence fragment
+		auto fragment = std::make_shared<SSU2IncompleteMessage::Fragment> ();
+		memcpy (fragment->buf, buf + 5, len -5);
+		fragment->len = len - 5;
+		fragment->isLast = isLast;
+		it->second->outOfSequenceFragments.emplace (fragmentNum, fragment);
+		it->second->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch ();
+	}	
+
+	bool SSU2Session::ConcatOutOfSequenceFragments (std::shared_ptr<SSU2IncompleteMessage> m)
+	{
+		if (!m) return false;
+		bool isLast = false;
+		for (auto it = m->outOfSequenceFragments.begin (); it != m->outOfSequenceFragments.end ();)
+			if (it->first == m->nextFragmentNum)
+			{
+				m->msg->Concat (it->second->buf, it->second->len);
+				isLast = it->second->isLast;
+				it = m->outOfSequenceFragments.erase (it);
+				m->nextFragmentNum++;
+			}
+			else
+				break;
+		return isLast;
+	}	
 		
 	bool SSU2Session::ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep)
 	{
@@ -884,7 +985,7 @@ namespace transport
 		htobe32buf (buf + 3, ackThrough); // Ack Through
 		uint8_t acnt = 0;
 		if (ackThrough)
-			acnt = std::min ((int)ackThrough - 1, 255);
+			acnt = std::min ((int)ackThrough, 255);
 		buf[7] = acnt; // acnt
 		// TODO: ranges
 		return 8;
@@ -985,6 +1086,20 @@ namespace transport
 		payloadSize += CreatePaddingBlock (payload + payloadSize, 32 - payloadSize);
 		SendData (payload, payloadSize);
 	}	
+
+	void SSU2Session::CleanUp (uint64_t ts)
+	{
+		for (auto it = m_IncompleteMessages.begin (); it != m_IncompleteMessages.end ();)
+		{
+			if (ts > it->second->lastFragmentInsertTime + SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT)
+			{
+				LogPrint (eLogWarning, "SSU2: message ", it->first, " was not completed in ", SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT, " seconds, deleted");
+				it = m_IncompleteMessages.erase (it);
+			}
+			else
+				++it;
+		}
+	}
 		
 	SSU2Server::SSU2Server ():
 		RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()),
@@ -1221,7 +1336,10 @@ namespace transport
 					it = m_Sessions.erase (it); 
 				}
 				else
+				{
+					it->second->CleanUp (ts);
 					it++;
+				}	
 			}
 
 			for (auto it = m_IncomingTokens.begin (); it != m_IncomingTokens.end (); )
diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h
index f7f73ec2..abf6a18c 100644
--- a/libi2pd/SSU2.h
+++ b/libi2pd/SSU2.h
@@ -32,6 +32,7 @@ namespace transport
 	const size_t SSU2_MAX_PAYLOAD_SIZE = SSU2_MTU - 32;
 	const int SSU2_RESEND_INTERVAL = 3; // in seconds
 	const int SSU2_MAX_NUM_RESENDS = 5;
+	const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds	
 	
 	enum SSU2MessageType
 	{
@@ -77,6 +78,20 @@ namespace transport
 		eSSU2SessionStateFailed
 	};
 
+	struct SSU2IncompleteMessage
+	{
+		struct Fragment
+		{
+			uint8_t buf[SSU2_MTU];
+			size_t len;
+			bool isLast;
+		};
+		
+		std::shared_ptr<I2NPMessage> msg;
+		int nextFragmentNum;
+		uint32_t lastFragmentInsertTime; // in seconds
+		std::map<int, std::shared_ptr<Fragment> > outOfSequenceFragments; 
+	};	
 	
 	// RouterInfo flags
 	const uint8_t SSU2_ROUTER_INFO_FLAG_REQUEST_FLOOD = 0x01;
@@ -120,6 +135,7 @@ namespace transport
 			void Connect ();
 			void Terminate ();
 			void TerminateByTimeout ();
+			void CleanUp (uint64_t ts);
 			void Done () override;
 			void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
 			void Resend (uint64_t ts);
@@ -158,7 +174,10 @@ namespace transport
 			std::shared_ptr<const i2p::data::RouterInfo> ExtractRouterInfo (const uint8_t * buf, size_t size);
 			void CreateNonce (uint64_t seqn, uint8_t * nonce);
 			bool UpdateReceivePacketNum (uint32_t packetNum); // for Ack, returns false if duplicate
-
+			void HandleFirstFragment (const uint8_t * buf, size_t len);
+			void HandleFollowOnFragment (const uint8_t * buf, size_t len);
+			bool ConcatOutOfSequenceFragments (std::shared_ptr<SSU2IncompleteMessage> m); // true if message complete
+			
 			size_t CreateAddressBlock (const boost::asio::ip::udp::endpoint& ep, uint8_t * buf, size_t len);
 			size_t CreateAckBlock (uint8_t * buf, size_t len);
 			size_t CreatePaddingBlock (uint8_t * buf, size_t len, size_t minSize = 0);
@@ -177,6 +196,7 @@ namespace transport
 			uint32_t m_SendPacketNum, m_ReceivePacketNum;
 			std::set<uint32_t> m_OutOfSequencePackets; // packet nums > receive packet num
 			std::map<uint32_t, std::shared_ptr<SentPacket> > m_SentPackets; // packetNum -> packet
+			std::map<uint32_t, std::shared_ptr<SSU2IncompleteMessage> > m_IncompleteMessages; // I2NP
 			std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
 			i2p::I2NPMessagesHandler m_Handler;
 	};