From 41ef22cf09d093bb23634b0f89fe9467a95710e1 Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 10 Jan 2014 20:21:38 -0500 Subject: [PATCH 01/23] stream recieve queue --- Queue.h | 23 +++++++++++++++++-- Streaming.cpp | 63 +++++++++++++++++++++++++++++++++++++++++++-------- Streaming.h | 20 +++++++++++++--- 3 files changed, 92 insertions(+), 14 deletions(-) diff --git a/Queue.h b/Queue.h index 31e8e0a4..ae60f099 100644 --- a/Queue.h +++ b/Queue.h @@ -45,6 +45,18 @@ namespace util } return el; } + + bool Wait (int sec, int usec) + { + std::unique_lock l(m_QueueMutex); + return m_NonEmpty.wait_for (l, std::chrono::seconds (sec) + std::chrono::milliseconds (usec)) != std::cv_status::timeout; + } + + bool IsEmpty () + { + std::unique_lock l(m_QueueMutex); + return m_Queue.empty (); + } void WakeUp () { m_NonEmpty.notify_one (); }; @@ -54,14 +66,21 @@ namespace util return GetNonThreadSafe (); } + Element * Peek () + { + std::unique_lock l(m_QueueMutex); + return GetNonThreadSafe (true); + } + private: - Element * GetNonThreadSafe () + Element * GetNonThreadSafe (bool peek = false) { if (!m_Queue.empty ()) { Element * el = m_Queue.front (); - m_Queue.pop (); + if (!peek) + m_Queue.pop (); return el; } return nullptr; diff --git a/Streaming.cpp b/Streaming.cpp index fc746ebc..d0df63de 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -1,5 +1,6 @@ #include "I2PEndian.h" #include +#include #include #include "Log.h" #include "RouterInfo.h" @@ -20,9 +21,15 @@ namespace stream m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } - void Stream::HandleNextPacket (const uint8_t * buf, size_t len) + Stream::~Stream () { - const uint8_t * end = buf + len; + while (auto packet = m_ReceiveQueue.Get ()) + delete packet; + } + + void Stream::HandleNextPacket (Packet * packet) + { + const uint8_t * end = packet->buf + packet->len, * buf = packet->buf; buf += 4; // sendStreamID if (!m_SendStreamID) m_SendStreamID = be32toh (*(uint32_t *)buf); @@ -61,6 +68,9 @@ namespace stream // we have reached payload section std::string str((const char *)buf, end-buf); LogPrint ("Payload: ", str); + + packet->offset = buf - packet->buf; + m_ReceiveQueue.Put (packet); } size_t Stream::Send (uint8_t * buf, size_t len, int timeout) @@ -105,6 +115,37 @@ namespace stream DeleteI2NPMessage (msg); return len; } + + size_t Stream::Receive (uint8_t * buf, size_t len, int timeout) + { + if (m_ReceiveQueue.IsEmpty ()) + { + if (!m_ReceiveQueue.Wait (timeout, 0)) + return 0; + } + + // either non-empty or we have received empty + size_t pos = 0; + while (pos < len) + { + Packet * packet = m_ReceiveQueue.Peek (); + if (packet) + { + size_t l = std::min (packet->GetLength (), len - pos); + memcpy (buf + pos, packet->GetBuffer (), l); + pos += l; + packet->offset += l; + if (!packet->GetLength ()) + { + m_ReceiveQueue.Get (); + delete packet; + } + } + else // no more data available + break; + } + return pos; + } StreamingDestination * sharedLocalDestination = nullptr; @@ -124,14 +165,17 @@ namespace stream DeleteI2NPMessage (m_LeaseSet); } - void StreamingDestination::HandleNextPacket (const uint8_t * buf, size_t len) + void StreamingDestination::HandleNextPacket (Packet * packet) { - uint32_t sendStreamID = be32toh (*(uint32_t *)(buf)); + uint32_t sendStreamID = be32toh (*(uint32_t *)(packet->buf)); auto it = m_Streams.find (sendStreamID); if (it != m_Streams.end ()) - it->second->HandleNextPacket (buf, len); + it->second->HandleNextPacket (packet); else + { LogPrint ("Unknown stream ", sendStreamID); + delete packet; + } } Stream * StreamingDestination::CreateNewStream (const i2p::data::LeaseSet * remote) @@ -232,13 +276,14 @@ namespace stream CryptoPP::Gunzip decompressor; decompressor.Put (buf, length); decompressor.MessageEnd(); - uint8_t uncompressed[2048]; - int uncompressedSize = decompressor.MaxRetrievable (); - decompressor.Get (uncompressed, uncompressedSize); + Packet * uncompressed = new Packet; + uncompressed->offset = 0; + uncompressed->len = decompressor.MaxRetrievable (); + decompressor.Get (uncompressed->buf, uncompressed->len); // then forward to streaming engine // TODO: we have onle one destination, might be more if (sharedLocalDestination) - sharedLocalDestination->HandleNextPacket (uncompressed, uncompressedSize); + sharedLocalDestination->HandleNextPacket (uncompressed); } else LogPrint ("Data: protocol ", buf[9], " is not supported"); diff --git a/Streaming.h b/Streaming.h index 26e84e3a..5f0ab5f4 100644 --- a/Streaming.h +++ b/Streaming.h @@ -4,6 +4,7 @@ #include #include #include +#include "Queue.h" #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" @@ -25,26 +26,39 @@ namespace stream const uint16_t PACKET_FLAG_NO_ACK = 0x0400; const size_t STREAMING_MTU = 1730; - + + struct Packet + { + uint8_t buf[STREAMING_MTU]; + size_t len, offset; + + Packet (): len (0), offset (0) {}; + uint8_t * GetBuffer () { return buf + offset; }; + size_t GetLength () const { return len - offset; }; + }; + class StreamingDestination; class Stream { public: Stream (StreamingDestination * local, const i2p::data::LeaseSet * remote); + ~Stream (); uint32_t GetSendStreamID () const { return m_SendStreamID; }; uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; const i2p::data::LeaseSet * GetRemoteLeaseSet () const { return m_RemoteLeaseSet; }; bool IsEstablished () const { return m_SendStreamID; }; - void HandleNextPacket (const uint8_t * buf, size_t len); + void HandleNextPacket (Packet * packet); size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds + size_t Receive (uint8_t * buf, size_t len, int timeout); // returns 0 if timeout expired private: uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; StreamingDestination * m_LocalDestination; const i2p::data::LeaseSet * m_RemoteLeaseSet; + i2p::util::Queue m_ReceiveQueue; }; class StreamingDestination @@ -61,7 +75,7 @@ namespace stream Stream * CreateNewStream (const i2p::data::LeaseSet * remote); void DeleteStream (Stream * stream); - void HandleNextPacket (const uint8_t * buf, size_t len); + void HandleNextPacket (Packet * packet); private: From 9363547205844a7613af14e162c6a09f6c3bfcb4 Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 10 Jan 2014 22:23:17 -0500 Subject: [PATCH 02/23] send quick ack --- Streaming.cpp | 49 +++++++++++++++++++++++++++++++++++++++++++------ Streaming.h | 8 +++++++- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index d0df63de..e1a155fd 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -16,7 +16,8 @@ namespace i2p namespace stream { Stream::Stream (StreamingDestination * local, const i2p::data::LeaseSet * remote): - m_SendStreamID (0), m_SequenceNumber (0), m_LocalDestination (local), m_RemoteLeaseSet (remote) + m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), + m_LocalDestination (local), m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -33,7 +34,8 @@ namespace stream buf += 4; // sendStreamID if (!m_SendStreamID) m_SendStreamID = be32toh (*(uint32_t *)buf); - buf += 4; // receiveStreamID + buf += 4; // receiveStreamID + m_LastReceivedSequenceNumber = be32toh (*(uint32_t *)buf); buf += 4; // sequenceNum buf += 4; // ackThrough int nackCount = buf[0]; @@ -64,13 +66,15 @@ namespace stream LogPrint ("From identity"); optionalData += sizeof (i2p::data::Identity); } - + // we have reached payload section std::string str((const char *)buf, end-buf); LogPrint ("Payload: ", str); packet->offset = buf - packet->buf; m_ReceiveQueue.Put (packet); + + SendQuickAck (); } size_t Stream::Send (uint8_t * buf, size_t len, int timeout) @@ -105,17 +109,50 @@ namespace stream I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, CreateDataMessage (this, packet, size), m_LocalDestination->GetLeaseSet ()); - auto outbound = i2p::tunnel::tunnels.GetNextOutboundTunnel (); - if (outbound) + if (!m_OutboundTunnel) + m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); + if (m_OutboundTunnel) { auto& lease = m_RemoteLeaseSet->GetLeases ()[0]; // TODO: - outbound->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); } else DeleteI2NPMessage (msg); return len; } + void Stream::SendQuickAck () + { + uint8_t packet[STREAMING_MTU]; + size_t size = 0; + *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); + size += 4; // sendStreamID + *(uint32_t *)(packet + size) = htobe32 (m_RecvStreamID); + size += 4; // receiveStreamID + *(uint32_t *)(packet + size) = 0; // this is plain Ack message + size += 4; // sequenceNum + *(uint32_t *)(packet + size) = htobe32 (m_LastReceivedSequenceNumber); + size += 4; // ack Through + packet[size] = 0; + size++; // NACK count + size++; // resend delay + *(uint16_t *)(packet + size) = 0; // nof flags set + size += 2; // flags + *(uint16_t *)(packet + size) = 0; // nof flags set + size += 2; // options size + + I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, + CreateDataMessage (this, packet, size)); + if (m_OutboundTunnel) + { + auto& lease = m_RemoteLeaseSet->GetLeases ()[0]; // TODO: + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + LogPrint ("Quick Ack sent"); + } + else + DeleteI2NPMessage (msg); + } + size_t Stream::Receive (uint8_t * buf, size_t len, int timeout) { if (m_ReceiveQueue.IsEmpty ()) diff --git a/Streaming.h b/Streaming.h index 5f0ab5f4..7851589c 100644 --- a/Streaming.h +++ b/Streaming.h @@ -8,6 +8,7 @@ #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" +#include "Tunnel.h" namespace i2p { @@ -52,13 +53,18 @@ namespace stream void HandleNextPacket (Packet * packet); size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds size_t Receive (uint8_t * buf, size_t len, int timeout); // returns 0 if timeout expired + + private: + + void SendQuickAck (); private: - uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; + uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; StreamingDestination * m_LocalDestination; const i2p::data::LeaseSet * m_RemoteLeaseSet; i2p::util::Queue m_ReceiveQueue; + i2p::tunnel::OutboundTunnel * m_OutboundTunnel; }; class StreamingDestination From 460d4e754e8e50855d0dec1d758df0720fde05e2 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 12 Jan 2014 14:35:51 -0500 Subject: [PATCH 03/23] fixed compilation error --- i2p.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/i2p.cpp b/i2p.cpp index be007d15..e2b44dfe 100644 --- a/i2p.cpp +++ b/i2p.cpp @@ -1,6 +1,6 @@ #include +#include #include -#include #include "Log.h" #include "base64.h" #include "Transports.h" @@ -20,7 +20,7 @@ int main( int, char** ) i2p::transports.Start (); i2p::tunnel::tunnels.Start (); - boost::this_thread::sleep(boost::posix_time::seconds(1000)); + std::this_thread::sleep_for (std::chrono::seconds(10000)); i2p::tunnel::tunnels.Stop (); i2p::transports.Stop (); i2p::data::netdb.Stop (); From 07eb5baac05fb013ae931f2c87604859ae49d807 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 12 Jan 2014 15:57:10 -0500 Subject: [PATCH 04/23] implemented connect and close --- Streaming.cpp | 83 ++++++++++++++++++++++++++++++++++++++++++++++----- Streaming.h | 11 +++++-- 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index e1a155fd..5f3b3228 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -16,7 +16,7 @@ namespace i2p namespace stream { Stream::Stream (StreamingDestination * local, const i2p::data::LeaseSet * remote): - m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), + m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); @@ -54,7 +54,7 @@ namespace stream { LogPrint ("Synchronize"); } - + if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) { LogPrint ("Signature"); @@ -68,17 +68,36 @@ namespace stream } // we have reached payload section + LogPrint ("seqn=",m_LastReceivedSequenceNumber,", flags=", flags); std::string str((const char *)buf, end-buf); LogPrint ("Payload: ", str); packet->offset = buf - packet->buf; m_ReceiveQueue.Put (packet); - - SendQuickAck (); + + if (flags & PACKET_FLAG_CLOSE) + { + LogPrint ("Closed"); + m_IsOpen = false; + } + else + SendQuickAck (); } size_t Stream::Send (uint8_t * buf, size_t len, int timeout) { + if (!m_IsOpen) + ConnectAndSend (buf, len); + else + { + // TODO: implement + } + return len; + } + + void Stream::ConnectAndSend (uint8_t * buf, size_t len) + { + m_IsOpen = true; uint8_t packet[STREAMING_MTU]; size_t size = 0; *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); @@ -94,15 +113,19 @@ namespace stream size++; // resend delay // TODO: for initial packet only, following packets have different falgs *(uint16_t *)(packet + size) = htobe16 (PACKET_FLAG_SYNCHRONIZE | - PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_NO_ACK); + PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | + PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED | PACKET_FLAG_NO_ACK); size += 2; // flags - *(uint16_t *)(packet + size) = htobe16 (sizeof (i2p::data::Identity) + 40); // identity + signature + *(uint16_t *)(packet + size) = htobe16 (sizeof (i2p::data::Identity) + 40 + 2); // identity + signature + packet size size += 2; // options size memcpy (packet + size, &m_LocalDestination->GetIdentity (), sizeof (i2p::data::Identity)); size += sizeof (i2p::data::Identity); // from + *(uint16_t *)(packet + size) = htobe16 (STREAMING_MTU); + size += 2; // max packet size uint8_t * signature = packet + size; // set it later memset (signature, 0, 40); // zeroes for now size += 40; // signature + memcpy (packet + size, buf, len); size += len; // payload m_LocalDestination->Sign (packet, size, signature); @@ -118,9 +141,8 @@ namespace stream } else DeleteI2NPMessage (msg); - return len; } - + void Stream::SendQuickAck () { uint8_t packet[STREAMING_MTU]; @@ -152,6 +174,46 @@ namespace stream else DeleteI2NPMessage (msg); } + + void Stream::Close () + { + if (m_IsOpen) + { + m_IsOpen = false; + uint8_t packet[STREAMING_MTU]; + size_t size = 0; + *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); + size += 4; // sendStreamID + *(uint32_t *)(packet + size) = htobe32 (m_RecvStreamID); + size += 4; // receiveStreamID + *(uint32_t *)(packet + size) = htobe32 (m_SequenceNumber); + size += 4; // sequenceNum + *(uint32_t *)(packet + size) = htobe32 (m_LastReceivedSequenceNumber); + size += 4; // ack Through + packet[size] = 0; + size++; // NACK count + size++; // resend delay + *(uint16_t *)(packet + size) = PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED; + size += 2; // flags + *(uint16_t *)(packet + size) = 40; // 40 bytes signature + size += 2; // options size + uint8_t * signature = packet + size; + memset (packet + size, 0, 40); + size += 40; // signature + m_LocalDestination->Sign (packet, size, signature); + + I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, + CreateDataMessage (this, packet, size)); + if (m_OutboundTunnel) + { + auto& lease = m_RemoteLeaseSet->GetLeases ()[0]; // TODO: + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + LogPrint ("FIN sent"); + } + else + DeleteI2NPMessage (msg); + } + } size_t Stream::Receive (uint8_t * buf, size_t len, int timeout) { @@ -316,6 +378,11 @@ namespace stream Packet * uncompressed = new Packet; uncompressed->offset = 0; uncompressed->len = decompressor.MaxRetrievable (); + if (uncompressed->len > MAX_PACKET_SIZE) + { + LogPrint ("Recieved packet size exceeds mac packer size"); + uncompressed->len = MAX_PACKET_SIZE; + } decompressor.Get (uncompressed->buf, uncompressed->len); // then forward to streaming engine // TODO: we have onle one destination, might be more diff --git a/Streaming.h b/Streaming.h index 7851589c..06dc1f44 100644 --- a/Streaming.h +++ b/Streaming.h @@ -26,11 +26,12 @@ namespace stream const uint16_t PACKET_FLAG_ECHO = 0x0200; const uint16_t PACKET_FLAG_NO_ACK = 0x0400; - const size_t STREAMING_MTU = 1730; + const size_t STREAMING_MTU = 1730; + const size_t MAX_PACKET_SIZE = 1754; struct Packet { - uint8_t buf[STREAMING_MTU]; + uint8_t buf[1754]; size_t len, offset; Packet (): len (0), offset (0) {}; @@ -48,19 +49,23 @@ namespace stream uint32_t GetSendStreamID () const { return m_SendStreamID; }; uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; const i2p::data::LeaseSet * GetRemoteLeaseSet () const { return m_RemoteLeaseSet; }; + bool IsOpen () const { return m_IsOpen; }; bool IsEstablished () const { return m_SendStreamID; }; void HandleNextPacket (Packet * packet); size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds size_t Receive (uint8_t * buf, size_t len, int timeout); // returns 0 if timeout expired - + void Close (); + private: + void ConnectAndSend (uint8_t * buf, size_t len); void SendQuickAck (); private: uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; + bool m_IsOpen; StreamingDestination * m_LocalDestination; const i2p::data::LeaseSet * m_RemoteLeaseSet; i2p::util::Queue m_ReceiveQueue; From 9b92641112ec432878ab172b67638d098dfa8ed2 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 12 Jan 2014 21:41:25 -0500 Subject: [PATCH 05/23] request b32-coded I2P address from HTTP server --- HTTPServer.cpp | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++ HTTPServer.h | 1 + Streaming.cpp | 13 +++++++++--- Streaming.h | 4 ++-- 4 files changed, 67 insertions(+), 5 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 577ef748..a30f589f 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -1,8 +1,11 @@ #include #include +#include "base64.h" #include "Tunnel.h" #include "TransitTunnel.h" #include "Transports.h" +#include "NetDb.h" +#include "Streaming.h" #include "HTTPServer.h" namespace i2p @@ -123,12 +126,63 @@ namespace util } } + void HTTPConnection::HandleDestinationRequest (std::string b32) + { + uint8_t destination[32]; + i2p::data::Base32ToByteStream (b32.c_str (), b32.length (), destination, 32); + auto leaseSet = i2p::data::netdb.FindLeaseSet (destination); + if (!leaseSet) + { + i2p::data::netdb.RequestDestination (i2p::data::IdentHash (destination), true); + std::this_thread::sleep_for (std::chrono::seconds(10)); // wait for 10 seconds + leaseSet = i2p::data::netdb.FindLeaseSet (destination); + if (!leaseSet) // still no LeaseSet + { + m_Reply.content = "LeaseSet not found"; + m_Reply.headers.resize(2); + m_Reply.headers[0].name = "Content-Length"; + m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); + m_Reply.headers[1].name = "Content-Type"; + m_Reply.headers[1].value = "text/html"; + return; + } + } + // we found LeaseSet + auto s = i2p::stream::CreateStream (leaseSet); + if (s) + { + std::string request = "GET / HTTP/1.1\n Host:" + b32 + ".b32.i2p\n"; + s->Send ((uint8_t *)request.c_str (), request.length (), 30); + std::stringstream ss; + uint8_t buf[8192]; + size_t r = s->Receive (buf, 8192, 30); // 30 seconds + if (r) // we recieved data + { + ss << std::string ((char *)buf, r); + while (s->IsOpen () && (r = s->Receive (buf, 8192, 30)) > 0) + ss << std::string ((char *)buf,r); + } + else // nothing received + ss << "Not responding"; + s->Close (); + //DeleteStream (s); + + m_Reply.content = ss.str (); + m_Reply.headers.resize(2); + m_Reply.headers[0].name = "Content-Length"; + m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); + m_Reply.headers[1].name = "Content-Type"; + m_Reply.headers[1].value = "text/html"; + } + } + HTTPServer::HTTPServer (int port): m_Thread (nullptr), m_Work (m_Service), m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port)), m_NewSocket (nullptr) { + } HTTPServer::~HTTPServer () diff --git a/HTTPServer.h b/HTTPServer.h index 84439c2a..76393f27 100644 --- a/HTTPServer.h +++ b/HTTPServer.h @@ -48,6 +48,7 @@ namespace util void HandleWrite(const boost::system::error_code& ecode); void HandleRequest (); + void HandleDestinationRequest (std::string b32); void FillContent (std::stringstream& s); private: diff --git a/Streaming.cpp b/Streaming.cpp index 5f3b3228..a737e622 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -73,12 +73,16 @@ namespace stream LogPrint ("Payload: ", str); packet->offset = buf - packet->buf; - m_ReceiveQueue.Put (packet); - + if (packet->GetLength () > 0) + m_ReceiveQueue.Put (packet); + else + delete packet; + if (flags & PACKET_FLAG_CLOSE) { LogPrint ("Closed"); m_IsOpen = false; + m_ReceiveQueue.WakeUp (); } else SendQuickAck (); @@ -212,13 +216,16 @@ namespace stream } else DeleteI2NPMessage (msg); + m_ReceiveQueue.WakeUp (); } } size_t Stream::Receive (uint8_t * buf, size_t len, int timeout) { + if (!m_IsOpen) return 0; if (m_ReceiveQueue.IsEmpty ()) { + if (!timeout) return 0; if (!m_ReceiveQueue.Wait (timeout, 0)) return 0; } @@ -358,7 +365,7 @@ namespace stream return sharedLocalDestination->CreateNewStream (remote); } - void CloseStream (Stream * stream) + void DeleteStream (Stream * stream) { if (sharedLocalDestination) sharedLocalDestination->DeleteStream (stream); diff --git a/Streaming.h b/Streaming.h index 06dc1f44..cc772190 100644 --- a/Streaming.h +++ b/Streaming.h @@ -54,7 +54,7 @@ namespace stream void HandleNextPacket (Packet * packet); size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds - size_t Receive (uint8_t * buf, size_t len, int timeout); // returns 0 if timeout expired + size_t Receive (uint8_t * buf, size_t len, int timeout = 0); // returns 0 if timeout expired void Close (); private: @@ -105,7 +105,7 @@ namespace stream }; Stream * CreateStream (const i2p::data::LeaseSet * remote); - void CloseStream (Stream * stream); + void DeleteStream (Stream * stream); // assuming data is I2CP message void HandleDataMessage (i2p::data::IdentHash * destination, const uint8_t * buf, size_t len); From 34d2ae4500725d2e1cf21c20605c33bfadb27cbf Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 12 Jan 2014 22:31:26 -0500 Subject: [PATCH 06/23] fixed memory leak --- LeaseSet.h | 4 +++- NetDb.cpp | 20 +++++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/LeaseSet.h b/LeaseSet.h index 5bb75741..fababcf0 100644 --- a/LeaseSet.h +++ b/LeaseSet.h @@ -27,7 +27,9 @@ namespace data public: LeaseSet (const uint8_t * buf, int len); - + LeaseSet (const LeaseSet& ) = default; + LeaseSet& operator=(const LeaseSet& ) = default; + // implements RoutingDestination const Identity& GetIdentity () const { return m_Identity; }; const IdentHash& GetIdentHash () const { return m_IdentHash; }; diff --git a/NetDb.cpp b/NetDb.cpp index 870027e7..b0c95482 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -119,10 +119,9 @@ namespace data if (r->GetTimestamp () > it->second->GetTimestamp ()) { LogPrint ("RouterInfo updated"); - *m_RouterInfos[r->GetIdentHash ()] = *r; // we can't replace point because it's used by tunnels - } - else - delete r; + *(it->second) = *r; // we can't replace pointer because it's used by tunnels + } + delete r; } else { @@ -135,7 +134,18 @@ namespace data { LeaseSet * l = new LeaseSet (buf, len); DeleteRequestedDestination (l->GetIdentHash ()); - m_LeaseSets[l->GetIdentHash ()] = l; + auto it = m_LeaseSets.find(l->GetIdentHash ()); + if (it != m_LeaseSets.end ()) + { + LogPrint ("LeaseSet updated"); + *(it->second) = *l; // we can't replace pointer because it's used by streams + delete l; + } + else + { + LogPrint ("New LeaseSet added"); + m_LeaseSets[l->GetIdentHash ()] = l; + } } RouterInfo * NetDb::FindRouter (const IdentHash& ident) const From dd679c261c3e8445de1a762f9156a331a963b42b Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 13 Jan 2014 21:18:32 -0500 Subject: [PATCH 07/23] set minimum comression level --- Streaming.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Streaming.cpp b/Streaming.cpp index a737e622..b702e56a 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -404,6 +404,7 @@ namespace stream { I2NPMessage * msg = NewI2NPMessage (); CryptoPP::Gzip compressor; + compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL); compressor.Put (payload, len); compressor.MessageEnd(); int size = compressor.MaxRetrievable (); @@ -411,6 +412,7 @@ namespace stream *(uint32_t *)buf = htobe32 (size); // length buf += 4; compressor.Get (buf, size); + memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later buf[9] = 6; // streaming protocol msg->len += size + 4; FillI2NPMessageHeader (msg, eI2NPData); From 5856310cd6c8464a256fea2420442559f20c82a1 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 14 Jan 2014 19:00:12 -0500 Subject: [PATCH 08/23] multi-tunnels LeaseSet --- Streaming.cpp | 21 ++++++++------------- Tunnel.cpp | 17 +++++++++++++++++ Tunnel.h | 1 + 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index b702e56a..a22d1cd0 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -77,15 +77,14 @@ namespace stream m_ReceiveQueue.Put (packet); else delete packet; - + + SendQuickAck (); if (flags & PACKET_FLAG_CLOSE) { LogPrint ("Closed"); m_IsOpen = false; m_ReceiveQueue.WakeUp (); - } - else - SendQuickAck (); + } } size_t Stream::Send (uint8_t * buf, size_t len, int timeout) @@ -325,11 +324,12 @@ namespace stream size += 256; // encryption key memset (buf + size, 0, 128); size += 128; // signing key - auto tunnel = i2p::tunnel::tunnels.GetNextInboundTunnel (); - if (tunnel) + auto tunnels = i2p::tunnel::tunnels.GetInboundTunnels (5); // 5 tunnels maximum + buf[size] = tunnels.size (); // num leases + size++; // num + for (auto it: tunnels) { - buf[size] = 1; // 1 lease - size++; // num + auto tunnel = it; memcpy (buf + size, (const uint8_t *)tunnel->GetNextIdentHash (), 32); size += 32; // tunnel_gw *(uint32_t *)(buf + size) = htobe32 (tunnel->GetNextTunnelID ()); @@ -339,11 +339,6 @@ namespace stream *(uint64_t *)(buf + size) = htobe64 (ts); size += 8; // end_date } - else - { - buf[size] = 0; // zero leases - size++; // num - } Sign (buf, size, buf+ size); size += 40; // signature diff --git a/Tunnel.cpp b/Tunnel.cpp index 2a87a939..c5f9730f 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -211,6 +211,23 @@ namespace tunnel } return tunnel; } + + std::vector Tunnels::GetInboundTunnels (int num) const + { + std::vector v; + int i = 0; + for (auto it : m_InboundTunnels) + { + if (i >= num) break; + if (it.second->GetNextIdentHash () != i2p::context.GetRouterInfo ().GetIdentHash ()) + { + // exclude one hop tunnels + v.push_back (it.second); + i++; + } + } + return v; + } OutboundTunnel * Tunnels::GetNextOutboundTunnel () { diff --git a/Tunnel.h b/Tunnel.h index cec82958..3f94d787 100644 --- a/Tunnel.h +++ b/Tunnel.h @@ -107,6 +107,7 @@ namespace tunnel InboundTunnel * GetInboundTunnel (uint32_t tunnelID); Tunnel * GetPendingTunnel (uint32_t replyMsgID); InboundTunnel * GetNextInboundTunnel (); + std::vector GetInboundTunnels (int num) const; OutboundTunnel * GetNextOutboundTunnel (); TransitTunnel * GetTransitTunnel (uint32_t tunnelID); void AddTransitTunnel (TransitTunnel * tunnel); From 68a06433104fd7acf246b1910be24f63af5d4e2f Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 14 Jan 2014 19:56:34 -0500 Subject: [PATCH 09/23] extract address --- HTTPServer.cpp | 22 ++++++++++++++++++++-- HTTPServer.h | 5 +++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index a30f589f..58c5391d 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -1,6 +1,7 @@ #include #include #include "base64.h" +#include "Log.h" #include "Tunnel.h" #include "TransitTunnel.h" #include "Transports.h" @@ -45,7 +46,7 @@ namespace util void HTTPConnection::Receive () { - m_Socket->async_read_some (boost::asio::buffer (m_Buffer), + m_Socket->async_read_some (boost::asio::buffer (m_Buffer, 8192), boost::bind(&HTTPConnection::HandleReceive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } @@ -54,7 +55,12 @@ namespace util { if (!ecode) { - HandleRequest (); + m_Buffer[bytes_transferred] = 0; + auto address = ExtractAddress (); + if (address.find ('?') != std::string::npos) + HandleDestinationRequest ("zmw2cyw2vj7f6obx3msmdvdepdhnw2ctc4okza2zjxlukkdfckhq"); + else + HandleRequest (); boost::asio::async_write (*m_Socket, m_Reply.to_buffers(), boost::bind (&HTTPConnection::HandleWrite, this, boost::asio::placeholders::error)); @@ -64,6 +70,18 @@ namespace util Terminate (); } + std::string HTTPConnection::ExtractAddress () + { + char * get = strstr (m_Buffer, "GET"); + if (get) + { + char * http = strstr (get, "HTTP"); + if (http) + return std::string (get + 3, http - get - 3); + } + return ""; + } + void HTTPConnection::HandleWrite (const boost::system::error_code& ecode) { Terminate (); diff --git a/HTTPServer.h b/HTTPServer.h index 76393f27..90aacfc0 100644 --- a/HTTPServer.h +++ b/HTTPServer.h @@ -50,11 +50,12 @@ namespace util void HandleRequest (); void HandleDestinationRequest (std::string b32); void FillContent (std::stringstream& s); - + std::string ExtractAddress (); + private: boost::asio::ip::tcp::socket * m_Socket; - boost::array m_Buffer; + char m_Buffer[8192]; request m_Request; reply m_Reply; }; From 659177e2fd9e128670c60235ef3621df3223c104 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 14 Jan 2014 20:57:33 -0500 Subject: [PATCH 10/23] check expiration of lease --- HTTPServer.cpp | 12 +++++++++--- LeaseSet.cpp | 29 ++++++++++++++++++++++++++++- LeaseSet.h | 3 +++ Streaming.cpp | 15 ++++++++++++--- 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 58c5391d..80ddd456 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -149,14 +149,14 @@ namespace util uint8_t destination[32]; i2p::data::Base32ToByteStream (b32.c_str (), b32.length (), destination, 32); auto leaseSet = i2p::data::netdb.FindLeaseSet (destination); - if (!leaseSet) + if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) { i2p::data::netdb.RequestDestination (i2p::data::IdentHash (destination), true); std::this_thread::sleep_for (std::chrono::seconds(10)); // wait for 10 seconds leaseSet = i2p::data::netdb.FindLeaseSet (destination); - if (!leaseSet) // still no LeaseSet + if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) // still no LeaseSet { - m_Reply.content = "LeaseSet not found"; + m_Reply.content = leaseSet ? "Leases expired" : "LeaseSet not found"; m_Reply.headers.resize(2); m_Reply.headers[0].name = "Content-Length"; m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); @@ -166,6 +166,12 @@ namespace util } } // we found LeaseSet + if (leaseSet->HasExpiredLeases ()) + { + // we should re-request LeaseSet + LogPrint ("LeaseSet re-requested"); + i2p::data::netdb.RequestDestination (i2p::data::IdentHash (destination), true); + } auto s = i2p::stream::CreateStream (leaseSet); if (s) { diff --git a/LeaseSet.cpp b/LeaseSet.cpp index c948c343..9c7585a2 100644 --- a/LeaseSet.cpp +++ b/LeaseSet.cpp @@ -1,8 +1,9 @@ +#include "I2PEndian.h" #include #include "CryptoConst.h" #include "Log.h" +#include "Timestamp.h" #include "LeaseSet.h" -#include "I2PEndian.h" namespace i2p { @@ -44,5 +45,31 @@ namespace data if (!verifier.VerifyMessage (buf, leases - buf, leases, 40)) LogPrint ("LeaseSet verification failed"); } + + std::vector LeaseSet::GetNonExpiredLeases () const + { + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + std::vector leases; + for (auto& it: m_Leases) + if (ts < it.endDate) + leases.push_back (it); + return leases; + } + + bool LeaseSet::HasExpiredLeases () const + { + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + for (auto& it: m_Leases) + if (ts >= it.endDate) return true; + return false; + } + + bool LeaseSet::HasNonExpiredLeases () const + { + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + for (auto& it: m_Leases) + if (ts < it.endDate) return true; + return false; + } } } diff --git a/LeaseSet.h b/LeaseSet.h index fababcf0..294e32d0 100644 --- a/LeaseSet.h +++ b/LeaseSet.h @@ -34,6 +34,9 @@ namespace data const Identity& GetIdentity () const { return m_Identity; }; const IdentHash& GetIdentHash () const { return m_IdentHash; }; const std::vector& GetLeases () const { return m_Leases; }; + std::vector GetNonExpiredLeases () const; + bool HasExpiredLeases () const; + bool HasNonExpiredLeases () const; const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionKey; }; bool IsDestination () const { return true; }; diff --git a/Streaming.cpp b/Streaming.cpp index a22d1cd0..1acbad79 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -170,9 +170,18 @@ namespace stream CreateDataMessage (this, packet, size)); if (m_OutboundTunnel) { - auto& lease = m_RemoteLeaseSet->GetLeases ()[0]; // TODO: - m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); - LogPrint ("Quick Ack sent"); + auto leases = m_RemoteLeaseSet->GetNonExpiredLeases (); + if (!leases.empty ()) + { + auto& lease = leases[0]; // TODO: + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + LogPrint ("Quick Ack sent"); + } + else + { + LogPrint ("All leases are expired"); + DeleteI2NPMessage (msg); + } } else DeleteI2NPMessage (msg); From 075650b6d33438973dc6a31fc7a8d95c49faa5f2 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 14 Jan 2014 22:35:00 -0500 Subject: [PATCH 11/23] invert endDate --- LeaseSet.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/LeaseSet.cpp b/LeaseSet.cpp index 9c7585a2..efa6409f 100644 --- a/LeaseSet.cpp +++ b/LeaseSet.cpp @@ -33,6 +33,7 @@ namespace data { Lease lease = *(Lease *)leases; lease.tunnelID = be32toh (lease.tunnelID); + lease.endDate = be64toh (lease.endDate); m_Leases.push_back (lease); leases += sizeof (Lease); } From bc9b355d5de504bd1a23ca07c4301585db14db29 Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 16 Jan 2014 08:08:28 -0500 Subject: [PATCH 12/23] check garlic message payload hash --- Garlic.cpp | 31 ++++++++++++++++++++++--------- Streaming.cpp | 4 ++-- TunnelEndpoint.cpp | 2 +- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/Garlic.cpp b/Garlic.cpp index 1db02493..d66e4eb3 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -17,13 +17,7 @@ namespace garlic GarlicRoutingSession::GarlicRoutingSession (const i2p::data::RoutingDestination * destination, int numTags): m_Destination (destination), m_NumTags (numTags), m_NextTag (-1), m_SessionTags (0) { - m_Rnd.GenerateBlock (m_SessionKey, 32); - if (m_NumTags > 0) - { - m_SessionTags = new uint8_t[m_NumTags*32]; - for (int i = 0; i < m_NumTags; i++) - m_Rnd.GenerateBlock (m_SessionTags + i*32, 32); - } + m_SessionTags = new uint8_t[m_NumTags*32]; } GarlicRoutingSession::~GarlicRoutingSession () @@ -36,8 +30,13 @@ namespace garlic I2NPMessage * m = NewI2NPMessage (); size_t len = 0; uint8_t * buf = m->GetPayload () + 4; // 4 bytes for length - if (m_NextTag < 0) // new session + if (m_NextTag < 0 || m_NextTag >= m_NumTags) // new session { + // create new session tags and session key + m_Rnd.GenerateBlock (m_SessionKey, 32); + for (int i = 0; i < m_NumTags; i++) + m_Rnd.GenerateBlock (m_SessionTags + i*32, 32); + // create ElGamal block ElGamalBlock elGamal; memcpy (elGamal.sessionKey, m_SessionKey, 32); @@ -268,12 +267,26 @@ namespace garlic m_SessionTags[std::string ((const char *)(buf + i*32), 32)] = std::string ((const char *)sessionKey, 32); buf += tagCount*32; uint32_t payloadSize = be32toh (*(uint32_t *)buf); + if (payloadSize > len) + { + LogPrint ("Unxpected payload size ", payloadSize); + return; + } buf += 4; - buf += 32;// payload hash. TODO: verify it + uint8_t * payloadHash = buf; + buf += 32;// payload hash. if (*buf) // session key? buf += 32; // new session key buf++; // flag + // payload + uint8_t hash[32]; + CryptoPP::SHA256().CalculateDigest(hash, buf, payloadSize); + if (memcmp (hash, payloadHash, 32)) // payload hash doesn't match + { + LogPrint ("Wrong payload hash"); + return; + } HandleGarlicPayload (buf, payloadSize); } diff --git a/Streaming.cpp b/Streaming.cpp index 1acbad79..c910a190 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -163,7 +163,7 @@ namespace stream size++; // resend delay *(uint16_t *)(packet + size) = 0; // nof flags set size += 2; // flags - *(uint16_t *)(packet + size) = 0; // nof flags set + *(uint16_t *)(packet + size) = 0; // no options size += 2; // options size I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, @@ -207,7 +207,7 @@ namespace stream size++; // resend delay *(uint16_t *)(packet + size) = PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED; size += 2; // flags - *(uint16_t *)(packet + size) = 40; // 40 bytes signature + *(uint16_t *)(packet + size) = htobe16 (40); // 40 bytes signature size += 2; // options size uint8_t * signature = packet + size; memset (packet + size, 0, 40); diff --git a/TunnelEndpoint.cpp b/TunnelEndpoint.cpp index c5ec73b4..45aa4373 100644 --- a/TunnelEndpoint.cpp +++ b/TunnelEndpoint.cpp @@ -143,7 +143,7 @@ namespace tunnel void TunnelEndpoint::HandleNextMessage (const TunnelMessageBlock& msg) { - LogPrint ("TunnelMessage: handle fragment of ", msg.data->GetLength ()," bytes"); + LogPrint ("TunnelMessage: handle fragment of ", msg.data->GetLength ()," bytes. Msg type ", (int)msg.data->GetHeader()->typeID); switch (msg.deliveryType) { case eDeliveryTypeLocal: From 0d9cc12521caaa353088d40abd802d45d2fcace0 Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 16 Jan 2014 18:21:44 -0500 Subject: [PATCH 13/23] reverted sequence of hash and tunnelID --- Garlic.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Garlic.cpp b/Garlic.cpp index d66e4eb3..6a4230eb 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -163,10 +163,11 @@ namespace garlic { buf[size] = eGarlicDeliveryTypeTunnel << 5; // delivery instructions flag tunnel size++; - *(uint32_t *)(buf + size) = htobe32 (tunnel->GetNextTunnelID ()); // tunnelID - size += 4; + // hash and tunnelID sequence is reversed for Garlic memcpy (buf + size, tunnel->GetNextIdentHash (), 32); // To Hash size += 32; + *(uint32_t *)(buf + size) = htobe32 (tunnel->GetNextTunnelID ()); // tunnelID + size += 4; } else { @@ -330,10 +331,11 @@ namespace garlic case eGarlicDeliveryTypeTunnel: { LogPrint ("Garlic type tunnel"); - uint32_t gwTunnel = be32toh (*(uint32_t *)buf); - buf += 4; + // gwHash and gwTunnel sequence is reverted uint8_t * gwHash = buf; buf += 32; + uint32_t gwTunnel = be32toh (*(uint32_t *)buf); + buf += 4; auto tunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); if (tunnel) // we have send it through an outbound tunnel { From 869479e566f046fc3cfce6cc6d803421f567de5a Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 17 Jan 2014 08:12:57 -0500 Subject: [PATCH 14/23] handle DeliveryStatus message --- Garlic.cpp | 91 +++++++++++++++++++++++++++++++++--------------- Garlic.h | 12 +++++-- I2NPProtocol.cpp | 15 +++----- I2NPProtocol.h | 5 +++ 4 files changed, 83 insertions(+), 40 deletions(-) diff --git a/Garlic.cpp b/Garlic.cpp index 6a4230eb..6cdedd4b 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -15,9 +15,19 @@ namespace i2p namespace garlic { GarlicRoutingSession::GarlicRoutingSession (const i2p::data::RoutingDestination * destination, int numTags): - m_Destination (destination), m_NumTags (numTags), m_NextTag (-1), m_SessionTags (0) + m_Destination (destination), m_FirstMsgID (0), m_IsAcknowledged (false), + m_NumTags (numTags), m_NextTag (-1), m_SessionTags (0) { - m_SessionTags = new uint8_t[m_NumTags*32]; + // create new session tags and session key + m_Rnd.GenerateBlock (m_SessionKey, 32); + if (m_NumTags > 0) + { + m_SessionTags = new uint8_t[m_NumTags*32]; + for (int i = 0; i < m_NumTags; i++) + m_Rnd.GenerateBlock (m_SessionTags + i*32, 32); + } + else + m_SessionTags = nullptr; } GarlicRoutingSession::~GarlicRoutingSession () @@ -27,16 +37,16 @@ namespace garlic I2NPMessage * GarlicRoutingSession::WrapSingleMessage (I2NPMessage * msg, I2NPMessage * leaseSet) { + if (GetNumRemainingSessionTags () < 1) + { + LogPrint ("No more session tags"); + return nullptr; + } I2NPMessage * m = NewI2NPMessage (); size_t len = 0; uint8_t * buf = m->GetPayload () + 4; // 4 bytes for length - if (m_NextTag < 0 || m_NextTag >= m_NumTags) // new session + if (m_NextTag < 0) // new session { - // create new session tags and session key - m_Rnd.GenerateBlock (m_SessionKey, 32); - for (int i = 0; i < m_NumTags; i++) - m_Rnd.GenerateBlock (m_SessionTags + i*32, 32); - // create ElGamal block ElGamalBlock elGamal; memcpy (elGamal.sessionKey, m_SessionKey, 32); @@ -47,7 +57,7 @@ namespace garlic buf += 514; // AES block m_Encryption.SetKeyWithIV (m_SessionKey, 32, iv); - len += 514 + CreateAESBlock (buf, msg, leaseSet); + len += 514 + CreateAESBlock (buf, msg, leaseSet, true); } else // existing session { @@ -58,7 +68,7 @@ namespace garlic CryptoPP::SHA256().CalculateDigest(iv, m_SessionTags + m_NextTag*32, 32); m_Encryption.SetKeyWithIV (m_SessionKey, 32, iv); // AES block - len += 32 + CreateAESBlock (buf, msg, leaseSet); + len += 32 + CreateAESBlock (buf, msg, leaseSet, false); } m_NextTag++; *(uint32_t *)(m->GetPayload ()) = htobe32 (len); @@ -69,20 +79,23 @@ namespace garlic return m; } - size_t GarlicRoutingSession::CreateAESBlock (uint8_t * buf, I2NPMessage * msg, I2NPMessage * leaseSet) + size_t GarlicRoutingSession::CreateAESBlock (uint8_t * buf, I2NPMessage * msg, I2NPMessage * leaseSet, bool isNewSession) { size_t blockSize = 0; - *(uint16_t *)buf = htobe16 (m_NumTags); // tag count + *(uint16_t *)buf = isNewSession ? htobe16 (m_NumTags) : 0; // tag count blockSize += 2; - memcpy (buf + blockSize, m_SessionTags, m_NumTags*32); // tags - blockSize += m_NumTags*32; + if (isNewSession) + { + memcpy (buf + blockSize, m_SessionTags, m_NumTags*32); // tags + blockSize += m_NumTags*32; + } uint32_t * payloadSize = (uint32_t *)(buf + blockSize); blockSize += 4; uint8_t * payloadHash = buf + blockSize; blockSize += 32; buf[blockSize] = 0; // flag blockSize++; - size_t len = CreateGarlicPayload (buf + blockSize, msg, leaseSet); + size_t len = CreateGarlicPayload (buf + blockSize, msg, leaseSet, isNewSession); *payloadSize = htobe32 (len); CryptoPP::SHA256().CalculateDigest(payloadHash, buf + blockSize, len); blockSize += len; @@ -93,21 +106,24 @@ namespace garlic return blockSize; } - size_t GarlicRoutingSession::CreateGarlicPayload (uint8_t * payload, I2NPMessage * msg, I2NPMessage * leaseSet) + size_t GarlicRoutingSession::CreateGarlicPayload (uint8_t * payload, I2NPMessage * msg, I2NPMessage * leaseSet, bool isNewSession) { uint64_t ts = i2p::util::GetMillisecondsSinceEpoch () + 5000; // 5 sec - uint32_t msgID = m_Rnd.GenerateWord32 (); + uint32_t msgID = m_Rnd.GenerateWord32 (); size_t size = 0; uint8_t * numCloves = payload + size; *numCloves = 0; size++; - - if (leaseSet) + + if (isNewSession) { - // clove is DeliveryStatus is LeaseSet is presented + // clove is DeliveryStatus size += CreateDeliveryStatusClove (payload + size, msgID); (*numCloves)++; - + m_FirstMsgID = msgID; + } + if (leaseSet) + { // clove is our leaseSet if presented size += CreateGarlicClove (payload + size, leaseSet, false); (*numCloves)++; @@ -212,18 +228,25 @@ namespace garlic GarlicRoutingSession * session = nullptr; if (it != m_Sessions.end ()) session = it->second; + if (session && (/*!session->IsAcknowledged () ||*/ session->GetNumRemainingSessionTags () < 1)) + { + // we have to create new session + m_Sessions.erase (it); + m_CreatedSessions.erase (session->GetFirstMsgID ()); + delete session; + session = nullptr; + } + bool isNewSession = false; if (!session) { - session = new GarlicRoutingSession (destination, 4); // TODO: change it later + session = new GarlicRoutingSession (destination, 16); // TODO: change it later m_Sessions[destination->GetIdentHash ()] = session; + isNewSession = true; } I2NPMessage * ret = session->WrapSingleMessage (msg, leaseSet); - if (session->GetNumRemainingSessionTags () <= 0) - { - m_Sessions.erase (destination->GetIdentHash ()); - delete session; - } + if (isNewSession) + m_CreatedSessions[session->GetFirstMsgID ()] = session; return ret; } @@ -270,7 +293,7 @@ namespace garlic uint32_t payloadSize = be32toh (*(uint32_t *)buf); if (payloadSize > len) { - LogPrint ("Unxpected payload size ", payloadSize); + LogPrint ("Unexpected payload size ", payloadSize); return; } buf += 4; @@ -359,5 +382,17 @@ namespace garlic buf += 3; // Certificate } } + + void GarlicRouting::HandleDeliveryStatusMessage (uint8_t * buf, size_t len) + { + I2NPDeliveryStatusMsg * msg = (I2NPDeliveryStatusMsg *)buf; + auto it = m_CreatedSessions.find (be32toh (msg->msgID)); + if (it != m_CreatedSessions.end ()) + { + it->second->SetAcknowledged (true); + m_CreatedSessions.erase (it); + LogPrint ("Garlic message ", be32toh (msg->msgID), " acknowledged"); + } + } } } diff --git a/Garlic.h b/Garlic.h index 89f6bd90..a8fcd507 100644 --- a/Garlic.h +++ b/Garlic.h @@ -41,11 +41,15 @@ namespace garlic ~GarlicRoutingSession (); I2NPMessage * WrapSingleMessage (I2NPMessage * msg, I2NPMessage * leaseSet); int GetNumRemainingSessionTags () const { return m_NumTags - m_NextTag; }; + uint32_t GetFirstMsgID () const { return m_FirstMsgID; }; + bool IsAcknowledged () const { return m_IsAcknowledged; }; + void SetAcknowledged (bool acknowledged) { m_IsAcknowledged = acknowledged; }; + private: - size_t CreateAESBlock (uint8_t * buf, I2NPMessage * msg, I2NPMessage * leaseSet); - size_t CreateGarlicPayload (uint8_t * payload, I2NPMessage * msg, I2NPMessage * leaseSet); + size_t CreateAESBlock (uint8_t * buf, I2NPMessage * msg, I2NPMessage * leaseSet, bool isNewSession); + size_t CreateGarlicPayload (uint8_t * payload, I2NPMessage * msg, I2NPMessage * leaseSet, bool isNewSession); size_t CreateGarlicClove (uint8_t * buf, I2NPMessage * msg, bool isDestination); size_t CreateDeliveryStatusClove (uint8_t * buf, uint32_t msgID); @@ -53,6 +57,8 @@ namespace garlic const i2p::data::RoutingDestination * m_Destination; uint8_t m_SessionKey[32]; + uint32_t m_FirstMsgID; // first message ID + bool m_IsAcknowledged; int m_NumTags, m_NextTag; uint8_t * m_SessionTags; // m_NumTags*32 bytes @@ -68,6 +74,7 @@ namespace garlic ~GarlicRouting (); void HandleGarlicMessage (uint8_t * buf, size_t len, bool isFromTunnel); + void HandleDeliveryStatusMessage (uint8_t * buf, size_t len); I2NPMessage * WrapSingleMessage (const i2p::data::RoutingDestination * destination, I2NPMessage * msg, I2NPMessage * leaseSet = nullptr); @@ -81,6 +88,7 @@ namespace garlic // outgoing sessions std::map m_Sessions; + std::map m_CreatedSessions; // msgID -> session // incoming session std::map m_SessionTags; // tag -> key CryptoPP::CBC_Mode::Decryption m_Decryption; diff --git a/I2NPProtocol.cpp b/I2NPProtocol.cpp index 9395c40c..33bee329 100644 --- a/I2NPProtocol.cpp +++ b/I2NPProtocol.cpp @@ -1,5 +1,5 @@ #include -#include "I2PEndian.h" +#include #include #include #include @@ -63,20 +63,13 @@ namespace i2p { I2NPMessage * msg = NewI2NPMessage (); memcpy (msg->GetBuffer (), buf, len); - msg->len += msg->offset + len; + msg->len = msg->offset + len; return msg; } I2NPMessage * CreateDeliveryStatusMsg (uint32_t msgID) { -#pragma pack(1) - struct - { - uint32_t msgID; - uint64_t timestamp; - } msg; -#pragma pack () - + I2NPDeliveryStatusMsg msg; msg.msgID = htobe32 (msgID); msg.timestamp = htobe64 (i2p::util::GetMillisecondsSinceEpoch ()); return CreateI2NPMessage (eI2NPDeliveryStatus, (uint8_t *)&msg, sizeof (msg)); @@ -431,6 +424,8 @@ namespace i2p break; case eI2NPDeliveryStatus: LogPrint ("DeliveryStatus"); + // we assume DeliveryStatusMessage is sent with garlic only + i2p::garlic::routing.HandleDeliveryStatusMessage (buf, size); break; case eI2NPVariableTunnelBuild: LogPrint ("VariableTunnelBuild"); diff --git a/I2NPProtocol.h b/I2NPProtocol.h index ebba0c5e..7f47601e 100644 --- a/I2NPProtocol.h +++ b/I2NPProtocol.h @@ -26,6 +26,11 @@ namespace i2p uint32_t replyToken; }; + struct I2NPDeliveryStatusMsg + { + uint32_t msgID; + uint64_t timestamp; + }; struct I2NPBuildRequestRecordClearText { From b437bd8cf481ab316d2d87c1b94533f0e11916fa Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 18 Jan 2014 10:34:57 -0500 Subject: [PATCH 15/23] recreate session tags --- Garlic.cpp | 107 +++++++++++++++++++++++++++++--------------------- Garlic.h | 13 +++--- Streaming.cpp | 4 +- 3 files changed, 73 insertions(+), 51 deletions(-) diff --git a/Garlic.cpp b/Garlic.cpp index 6cdedd4b..ad824fd7 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -23,8 +23,7 @@ namespace garlic if (m_NumTags > 0) { m_SessionTags = new uint8_t[m_NumTags*32]; - for (int i = 0; i < m_NumTags; i++) - m_Rnd.GenerateBlock (m_SessionTags + i*32, 32); + GenerateSessionTags (); } else m_SessionTags = nullptr; @@ -34,18 +33,22 @@ namespace garlic { delete[] m_SessionTags; } - + + void GarlicRoutingSession::GenerateSessionTags () + { + if (m_SessionTags) + { + for (int i = 0; i < m_NumTags; i++) + m_Rnd.GenerateBlock (m_SessionTags + i*32, 32); + } + } + I2NPMessage * GarlicRoutingSession::WrapSingleMessage (I2NPMessage * msg, I2NPMessage * leaseSet) { - if (GetNumRemainingSessionTags () < 1) - { - LogPrint ("No more session tags"); - return nullptr; - } I2NPMessage * m = NewI2NPMessage (); size_t len = 0; uint8_t * buf = m->GetPayload () + 4; // 4 bytes for length - if (m_NextTag < 0) // new session + if (m_NextTag < 0 || !m_NumTags) // new session { // create ElGamal block ElGamalBlock elGamal; @@ -53,23 +56,30 @@ namespace garlic m_Rnd.GenerateBlock (elGamal.preIV, 32); // Pre-IV uint8_t iv[32]; // IV is first 16 bytes CryptoPP::SHA256().CalculateDigest(iv, elGamal.preIV, 32); - i2p::crypto::ElGamalEncrypt (m_Destination->GetEncryptionPublicKey (), (uint8_t *)&elGamal, sizeof(elGamal), buf, true); - buf += 514; - // AES block + i2p::crypto::ElGamalEncrypt (m_Destination->GetEncryptionPublicKey (), (uint8_t *)&elGamal, sizeof(elGamal), buf, true); m_Encryption.SetKeyWithIV (m_SessionKey, 32, iv); - len += 514 + CreateAESBlock (buf, msg, leaseSet, true); + buf += 514; + len += 514; } else // existing session { // session tag - memcpy (buf, m_SessionTags + m_NextTag*32, 32); - buf += 32; + memcpy (buf, m_SessionTags + m_NextTag*32, 32); uint8_t iv[32]; // IV is first 16 bytes CryptoPP::SHA256().CalculateDigest(iv, m_SessionTags + m_NextTag*32, 32); m_Encryption.SetKeyWithIV (m_SessionKey, 32, iv); - // AES block - len += 32 + CreateAESBlock (buf, msg, leaseSet, false); + buf += 32; + len += 32; + + // re-create session tags if necessary + if (m_NextTag >= m_NumTags - 1) // we have used last tag + { + GenerateSessionTags (); + m_NextTag = -1; + } } + // AES block + len += CreateAESBlock (buf, msg, leaseSet); m_NextTag++; *(uint32_t *)(m->GetPayload ()) = htobe32 (len); m->len += len + 4; @@ -79,12 +89,12 @@ namespace garlic return m; } - size_t GarlicRoutingSession::CreateAESBlock (uint8_t * buf, I2NPMessage * msg, I2NPMessage * leaseSet, bool isNewSession) + size_t GarlicRoutingSession::CreateAESBlock (uint8_t * buf, I2NPMessage * msg, I2NPMessage * leaseSet) { size_t blockSize = 0; - *(uint16_t *)buf = isNewSession ? htobe16 (m_NumTags) : 0; // tag count + *(uint16_t *)buf = m_NextTag < 0 ? htobe16 (m_NumTags) : 0; // tag count blockSize += 2; - if (isNewSession) + if (m_NextTag < 0) // session tags recreated { memcpy (buf + blockSize, m_SessionTags, m_NumTags*32); // tags blockSize += m_NumTags*32; @@ -95,7 +105,7 @@ namespace garlic blockSize += 32; buf[blockSize] = 0; // flag blockSize++; - size_t len = CreateGarlicPayload (buf + blockSize, msg, leaseSet, isNewSession); + size_t len = CreateGarlicPayload (buf + blockSize, msg, leaseSet); *payloadSize = htobe32 (len); CryptoPP::SHA256().CalculateDigest(payloadHash, buf + blockSize, len); blockSize += len; @@ -106,7 +116,7 @@ namespace garlic return blockSize; } - size_t GarlicRoutingSession::CreateGarlicPayload (uint8_t * payload, I2NPMessage * msg, I2NPMessage * leaseSet, bool isNewSession) + size_t GarlicRoutingSession::CreateGarlicPayload (uint8_t * payload, I2NPMessage * msg, I2NPMessage * leaseSet) { uint64_t ts = i2p::util::GetMillisecondsSinceEpoch () + 5000; // 5 sec uint32_t msgID = m_Rnd.GenerateWord32 (); @@ -115,7 +125,7 @@ namespace garlic *numCloves = 0; size++; - if (isNewSession) + if (m_NextTag < 0) // new session { // clove is DeliveryStatus size += CreateDeliveryStatusClove (payload + size, msgID); @@ -220,7 +230,22 @@ namespace garlic m_Sessions.clear (); } - I2NPMessage * GarlicRouting::WrapSingleMessage (const i2p::data::RoutingDestination * destination, + I2NPMessage * GarlicRouting::WrapSingleMessage (const i2p::data::RoutingDestination * destination, I2NPMessage * msg) + { + if (!destination) return nullptr; + auto it = m_Sessions.find (destination->GetIdentHash ()); + if (it != m_Sessions.end ()) + { + m_Sessions.erase (it); + delete it->second; + } + GarlicRoutingSession * session = new GarlicRoutingSession (destination, 0); // not follow-on messages expected + m_Sessions[destination->GetIdentHash ()] = session; + + return session->WrapSingleMessage (msg, nullptr); + } + + I2NPMessage * GarlicRouting::WrapMessage (const i2p::data::RoutingDestination * destination, I2NPMessage * msg, I2NPMessage * leaseSet) { if (!destination) return nullptr; @@ -228,27 +253,17 @@ namespace garlic GarlicRoutingSession * session = nullptr; if (it != m_Sessions.end ()) session = it->second; - if (session && (/*!session->IsAcknowledged () ||*/ session->GetNumRemainingSessionTags () < 1)) - { - // we have to create new session - m_Sessions.erase (it); - m_CreatedSessions.erase (session->GetFirstMsgID ()); - delete session; - session = nullptr; - } - bool isNewSession = false; if (!session) { - session = new GarlicRoutingSession (destination, 16); // TODO: change it later + session = new GarlicRoutingSession (destination, 4); // TODO: change it later m_Sessions[destination->GetIdentHash ()] = session; - isNewSession = true; } I2NPMessage * ret = session->WrapSingleMessage (msg, leaseSet); - if (isNewSession) + if (!session->GetNextTag ()) // tags have beed recreated m_CreatedSessions[session->GetFirstMsgID ()] = session; return ret; - } + } void GarlicRouting::HandleGarlicMessage (uint8_t * buf, size_t len, bool isFromTunnel) { @@ -271,14 +286,18 @@ namespace garlic { // new session ElGamalBlock elGamal; - i2p::crypto::ElGamalDecrypt ( + if (i2p::crypto::ElGamalDecrypt ( isFromTunnel ? i2p::context.GetLeaseSetPrivateKey () : i2p::context.GetPrivateKey (), - buf, (uint8_t *)&elGamal, true); - uint8_t iv[32]; // IV is first 16 bytes - CryptoPP::SHA256().CalculateDigest(iv, elGamal.preIV, 32); - m_Decryption.SetKeyWithIV (elGamal.sessionKey, 32, iv); - m_Decryption.ProcessData(buf + 514, buf + 514, length - 514); - HandleAESBlock (buf + 514, length - 514, elGamal.sessionKey); + buf, (uint8_t *)&elGamal, true)) + { + uint8_t iv[32]; // IV is first 16 bytes + CryptoPP::SHA256().CalculateDigest(iv, elGamal.preIV, 32); + m_Decryption.SetKeyWithIV (elGamal.sessionKey, 32, iv); + m_Decryption.ProcessData(buf + 514, buf + 514, length - 514); + HandleAESBlock (buf + 514, length - 514, elGamal.sessionKey); + } + else + LogPrint ("Failed to decrypt garlic"); } } diff --git a/Garlic.h b/Garlic.h index a8fcd507..c640eee3 100644 --- a/Garlic.h +++ b/Garlic.h @@ -40,7 +40,7 @@ namespace garlic GarlicRoutingSession (const i2p::data::RoutingDestination * destination, int numTags); ~GarlicRoutingSession (); I2NPMessage * WrapSingleMessage (I2NPMessage * msg, I2NPMessage * leaseSet); - int GetNumRemainingSessionTags () const { return m_NumTags - m_NextTag; }; + int GetNextTag () const { return m_NextTag; }; uint32_t GetFirstMsgID () const { return m_FirstMsgID; }; bool IsAcknowledged () const { return m_IsAcknowledged; }; @@ -48,11 +48,13 @@ namespace garlic private: - size_t CreateAESBlock (uint8_t * buf, I2NPMessage * msg, I2NPMessage * leaseSet, bool isNewSession); - size_t CreateGarlicPayload (uint8_t * payload, I2NPMessage * msg, I2NPMessage * leaseSet, bool isNewSession); + size_t CreateAESBlock (uint8_t * buf, I2NPMessage * msg, I2NPMessage * leaseSet); + size_t CreateGarlicPayload (uint8_t * payload, I2NPMessage * msg, I2NPMessage * leaseSet); size_t CreateGarlicClove (uint8_t * buf, I2NPMessage * msg, bool isDestination); size_t CreateDeliveryStatusClove (uint8_t * buf, uint32_t msgID); + void GenerateSessionTags (); + private: const i2p::data::RoutingDestination * m_Destination; @@ -76,8 +78,9 @@ namespace garlic void HandleGarlicMessage (uint8_t * buf, size_t len, bool isFromTunnel); void HandleDeliveryStatusMessage (uint8_t * buf, size_t len); - I2NPMessage * WrapSingleMessage (const i2p::data::RoutingDestination * destination, - I2NPMessage * msg, I2NPMessage * leaseSet = nullptr); + I2NPMessage * WrapSingleMessage (const i2p::data::RoutingDestination * destination, I2NPMessage * msg); + I2NPMessage * WrapMessage (const i2p::data::RoutingDestination * destination, + I2NPMessage * msg, I2NPMessage * leaseSet = nullptr); private: diff --git a/Streaming.cpp b/Streaming.cpp index c910a190..30feb6f6 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -132,7 +132,7 @@ namespace stream memcpy (packet + size, buf, len); size += len; // payload m_LocalDestination->Sign (packet, size, signature); - I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, + I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, CreateDataMessage (this, packet, size), m_LocalDestination->GetLeaseSet ()); if (!m_OutboundTunnel) @@ -166,7 +166,7 @@ namespace stream *(uint16_t *)(packet + size) = 0; // no options size += 2; // options size - I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, + I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, CreateDataMessage (this, packet, size)); if (m_OutboundTunnel) { From d4d1c2f427dc997aef7ca93cbd1b4eff0ae1cebd Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 18 Jan 2014 17:43:04 -0500 Subject: [PATCH 16/23] show recived page 'as is' --- HTTPServer.cpp | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 80ddd456..99bd90f4 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -24,16 +24,19 @@ namespace util std::vector HTTPConnection::reply::to_buffers() { std::vector buffers; - buffers.push_back (boost::asio::buffer ("HTTP/1.0 200 OK\r\n")); // always OK - for (std::size_t i = 0; i < headers.size(); ++i) - { - header& h = headers[i]; - buffers.push_back(boost::asio::buffer(h.name)); - buffers.push_back(boost::asio::buffer(misc_strings::name_value_separator)); - buffers.push_back(boost::asio::buffer(h.value)); + if (headers.size () > 0) + { + buffers.push_back (boost::asio::buffer ("HTTP/1.0 200 OK\r\n")); // always OK + for (std::size_t i = 0; i < headers.size(); ++i) + { + header& h = headers[i]; + buffers.push_back(boost::asio::buffer(h.name)); + buffers.push_back(boost::asio::buffer(misc_strings::name_value_separator)); + buffers.push_back(boost::asio::buffer(h.value)); + buffers.push_back(boost::asio::buffer(misc_strings::crlf)); + } buffers.push_back(boost::asio::buffer(misc_strings::crlf)); - } - buffers.push_back(boost::asio::buffer(misc_strings::crlf)); + } buffers.push_back(boost::asio::buffer(content)); return buffers; } @@ -185,6 +188,10 @@ namespace util ss << std::string ((char *)buf, r); while (s->IsOpen () && (r = s->Receive (buf, 8192, 30)) > 0) ss << std::string ((char *)buf,r); + + m_Reply.content = ss.str (); // send "as is" + m_Reply.headers.resize(0); // no headers + return; } else // nothing received ss << "Not responding"; From 5b025909b6d7f2ff3b5ee0dcb9acabced7c7c137 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 19 Jan 2014 10:05:54 -0500 Subject: [PATCH 17/23] request router if not found --- NetDb.cpp | 2 +- Transports.cpp | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/NetDb.cpp b/NetDb.cpp index b0c95482..cf1d6382 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -347,7 +347,7 @@ namespace data // do we have that floodfill router in our database? if (r) { - if (!dest->IsExcluded (r->GetIdentHash ()) && dest->GetNumExcludedPeers () < 10) // TODO: fix TunnelGateway first + if (!dest->IsExcluded (r->GetIdentHash ()) && dest->GetNumExcludedPeers () < 30) // TODO: fix TunnelGateway first { // request destination auto msg = dest->CreateRequestMessage (r, dest->GetLastReplyTunnel ()); diff --git a/Transports.cpp b/Transports.cpp index e9953eca..371ea2e6 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -142,7 +142,14 @@ namespace i2p session = new i2p::ntcp::NTCPClient (m_Service, address->host.c_str (), address->port, *r); AddNTCPSession (session); } + else + LogPrint ("No NTCP addresses available"); } + else + { + LogPrint ("Router not found. Requested"); + i2p::data::netdb.RequestDestination (ident); + } } if (session) session->SendI2NPMessage (msg); From c762e41b05382f7e85406c4ee3e08e511388d440 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 19 Jan 2014 12:01:12 -0500 Subject: [PATCH 18/23] check received sequence number for gaps and duplicates --- HTTPServer.cpp | 2 ++ Streaming.cpp | 46 +++++++++++++++++++++++++++++++++------------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 99bd90f4..a4d3f1e5 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -183,6 +183,8 @@ namespace util std::stringstream ss; uint8_t buf[8192]; size_t r = s->Receive (buf, 8192, 30); // 30 seconds + if (!r && s->IsEstablished ()) // nothing received but connection is established + r = s->Receive (buf, 8192, 30); // wait for another 30 secondd if (r) // we recieved data { ss << std::string ((char *)buf, r); diff --git a/Streaming.cpp b/Streaming.cpp index 30feb6f6..99d51618 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -30,12 +30,12 @@ namespace stream void Stream::HandleNextPacket (Packet * packet) { - const uint8_t * end = packet->buf + packet->len, * buf = packet->buf; + const uint8_t * buf = packet->buf; buf += 4; // sendStreamID if (!m_SendStreamID) m_SendStreamID = be32toh (*(uint32_t *)buf); buf += 4; // receiveStreamID - m_LastReceivedSequenceNumber = be32toh (*(uint32_t *)buf); + uint32_t receivedSeqn = be32toh (*(uint32_t *)buf); buf += 4; // sequenceNum buf += 4; // ackThrough int nackCount = buf[0]; @@ -68,17 +68,37 @@ namespace stream } // we have reached payload section - LogPrint ("seqn=",m_LastReceivedSequenceNumber,", flags=", flags); - std::string str((const char *)buf, end-buf); - LogPrint ("Payload: ", str); - - packet->offset = buf - packet->buf; - if (packet->GetLength () > 0) - m_ReceiveQueue.Put (packet); - else - delete packet; - - SendQuickAck (); + LogPrint ("seqn=", receivedSeqn, ", flags=", flags); + if (!receivedSeqn || receivedSeqn == m_LastReceivedSequenceNumber + 1) + { + // we have received next message + packet->offset = buf - packet->buf; + if (packet->GetLength () > 0) + m_ReceiveQueue.Put (packet); + else + delete packet; + + m_LastReceivedSequenceNumber = receivedSeqn; + SendQuickAck (); + } + else + { + if (receivedSeqn <= m_LastReceivedSequenceNumber) + { + // we have received duplicate. Most likely our outbound tunnel is dead + LogPrint ("Duplicate message ", receivedSeqn, " received"); + m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); // pick another tunnel + if (m_OutboundTunnel) + SendQuickAck (); // resend ack for previous message again + } + else + { + LogPrint ("Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); + // actually do nothing. just wait for missing message again + } + delete packet; // packet dropped + } + if (flags & PACKET_FLAG_CLOSE) { LogPrint ("Closed"); From 55436dbe7aa98e724a2a7e75717ea3a8cfef1b64 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 19 Jan 2014 19:19:09 -0500 Subject: [PATCH 19/23] different I2P sites --- HTTPServer.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index a4d3f1e5..5aeac9ed 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -60,8 +60,9 @@ namespace util { m_Buffer[bytes_transferred] = 0; auto address = ExtractAddress (); - if (address.find ('?') != std::string::npos) - HandleDestinationRequest ("zmw2cyw2vj7f6obx3msmdvdepdhnw2ctc4okza2zjxlukkdfckhq"); + LogPrint (address); + if (address.length () > 1) // not just '/' + HandleDestinationRequest (address.substr (1)); // exclude '/' else HandleRequest (); boost::asio::async_write (*m_Socket, m_Reply.to_buffers(), @@ -80,7 +81,7 @@ namespace util { char * http = strstr (get, "HTTP"); if (http) - return std::string (get + 3, http - get - 3); + return std::string (get + 4, http - get - 5); } return ""; } @@ -145,6 +146,7 @@ namespace util s << "
"; } } + s << "

Flibusta

"; } void HTTPConnection::HandleDestinationRequest (std::string b32) @@ -179,7 +181,7 @@ namespace util if (s) { std::string request = "GET / HTTP/1.1\n Host:" + b32 + ".b32.i2p\n"; - s->Send ((uint8_t *)request.c_str (), request.length (), 30); + s->Send ((uint8_t *)request.c_str (), request.length (), 10); std::stringstream ss; uint8_t buf[8192]; size_t r = s->Receive (buf, 8192, 30); // 30 seconds From a9ac6b77f4d75e0ecf1ad9bca98a13a3d09609cb Mon Sep 17 00:00:00 2001 From: mikhail4021 Date: Mon, 20 Jan 2014 17:27:46 +0400 Subject: [PATCH 20/23] a bug has been fixed. some warnings have been found --- I2NPProtocol.cpp | 3 ++- TunnelBase.h | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/I2NPProtocol.cpp b/I2NPProtocol.cpp index 33bee329..48e31986 100644 --- a/I2NPProtocol.cpp +++ b/I2NPProtocol.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include #include @@ -171,6 +171,7 @@ namespace i2p CryptoPP::Gzip compressor; compressor.Put ((uint8_t *)context.GetRouterInfo ().GetBuffer (), context.GetRouterInfo ().GetBufferLen ()); compressor.MessageEnd(); + // WARNING!!! MaxRetrievable() return uint64_t. Есть подозрение, что что-то не так int size = compressor.MaxRetrievable (); uint8_t * buf = m->GetPayload () + sizeof (I2NPDatabaseStoreMsg); *(uint16_t *)buf = htobe16 (size); // size diff --git a/TunnelBase.h b/TunnelBase.h index bd760989..49be29de 100644 --- a/TunnelBase.h +++ b/TunnelBase.h @@ -31,6 +31,7 @@ namespace tunnel { public: + //WARNING!!! GetSecondsSinceEpoch() return uint64_t TunnelBase (): m_CreationTime (i2p::util::GetSecondsSinceEpoch ()) {}; virtual ~TunnelBase () {}; From 91199a905358b62de147d5ddffca6e637d9e319f Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 20 Jan 2014 09:36:20 -0500 Subject: [PATCH 21/23] fixed compilation error --- I2NPProtocol.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/I2NPProtocol.cpp b/I2NPProtocol.cpp index 48e31986..502a2e40 100644 --- a/I2NPProtocol.cpp +++ b/I2NPProtocol.cpp @@ -1,5 +1,5 @@ #include -#include +#include "I2PEndian.h" #include #include #include From 30ecf1ef8ce2022604e010a5400496c8f637a039 Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 20 Jan 2014 18:37:51 -0500 Subject: [PATCH 22/23] send multiple messages --- Tunnel.cpp | 26 +++++++++++++++++++++----- Tunnel.h | 5 +++-- TunnelBase.h | 3 ++- TunnelEndpoint.cpp | 4 ++-- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/Tunnel.cpp b/Tunnel.cpp index c5f9730f..f92337f7 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -138,13 +138,29 @@ namespace tunnel { m_Gateway.SendTunnelDataMsg (gwHash, gwTunnel, msg); } - - void OutboundTunnel::SendTunnelDataMsg (i2p::I2NPMessage * msg) + + void OutboundTunnel::SendTunnelDataMsg (std::vector msgs) { - SendTunnelDataMsg (nullptr, 0, msg); + for (auto& it : msgs) + { + switch (it.deliveryType) + { + case eDeliveryTypeLocal: + m_Gateway.SendTunnelDataMsg (nullptr, 0, it.data); + break; + case eDeliveryTypeTunnel: + m_Gateway.SendTunnelDataMsg (it.hash, it.tunnelID, it.data); + break; + case eDeliveryTypeRouter: + m_Gateway.SendTunnelDataMsg (it.hash, 0, it.data); + break; + default: + LogPrint ("Unexpected delivery type ", (int)it.deliveryType); + } + } + m_Gateway.SendBuffer (); } - - + Tunnels tunnels; Tunnels::Tunnels (): m_IsRunning (false), m_IsTunnelCreated (false), diff --git a/Tunnel.h b/Tunnel.h index 3f94d787..6c4b7d30 100644 --- a/Tunnel.h +++ b/Tunnel.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -64,9 +65,9 @@ namespace tunnel OutboundTunnel (TunnelConfig * config): Tunnel (config), m_Gateway (this) {}; - void SendTunnelDataMsg (i2p::I2NPMessage * msg); //local void SendTunnelDataMsg (const uint8_t * gwHash, uint32_t gwTunnel, i2p::I2NPMessage * msg); - + void SendTunnelDataMsg (std::vector msgs); // multiple messages + TunnelGateway& GetTunnelGateway () { return m_Gateway; }; size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); }; diff --git a/TunnelBase.h b/TunnelBase.h index 49be29de..f59d995f 100644 --- a/TunnelBase.h +++ b/TunnelBase.h @@ -4,6 +4,7 @@ #include #include "Timestamp.h" #include "I2NPProtocol.h" +#include "Identity.h" namespace i2p { @@ -23,7 +24,7 @@ namespace tunnel { TunnelDeliveryType deliveryType; uint32_t tunnelID; - uint8_t hash[32]; + i2p::data::IdentHash hash; I2NPMessage * data; }; diff --git a/TunnelEndpoint.cpp b/TunnelEndpoint.cpp index 45aa4373..0e23a837 100644 --- a/TunnelEndpoint.cpp +++ b/TunnelEndpoint.cpp @@ -41,12 +41,12 @@ namespace tunnel LogPrint ("Delivery type tunnel"); m.tunnelID = be32toh (*(uint32_t *)fragment); fragment += 4; // tunnelID - memcpy (m.hash, fragment, 32); + m.hash = i2p::data::IdentHash (fragment); fragment += 32; // hash break; case eDeliveryTypeRouter: // 2 LogPrint ("Delivery type router"); - memcpy (m.hash, fragment, 32); + m.hash = i2p::data::IdentHash (fragment); fragment += 32; // to hash break; default: From 0b079f472365f3c7ddd8bbd5e928d18a1759cdff Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 20 Jan 2014 19:12:59 -0500 Subject: [PATCH 23/23] send multiple messages --- NetDb.cpp | 48 +++++++++++++++++++++++++++++++++++------------- Tunnel.h | 1 - TunnelBase.h | 2 +- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/NetDb.cpp b/NetDb.cpp index cf1d6382..9b454598 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -1,5 +1,6 @@ #include "I2PEndian.h" #include +#include #include #include #include "base64.h" @@ -313,7 +314,8 @@ namespace data { i2p::tunnel::OutboundTunnel * outbound = dest->GetLastOutboundTunnel (); const i2p::tunnel::InboundTunnel * inbound = dest->GetLastReplyTunnel (); - + std::vector msgs; + for (int i = 0; i < num; i++) { uint8_t * router = buf + 33 + i*32; @@ -332,7 +334,11 @@ namespace data RequestedDestination * d1 = CreateRequestedDestination (router, false, false); d1->SetLastOutboundTunnel (outbound); auto msg = d1->CreateRequestMessage (dest->GetLastRouter (), dest->GetLastReplyTunnel ()); - outbound->GetTunnelGateway ().PutTunnelDataMsg (dest->GetLastRouter ()->GetIdentHash (), 0, msg); + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeRouter, + dest->GetLastRouter ()->GetIdentHash (), 0, msg + }); } } else @@ -351,7 +357,11 @@ namespace data { // request destination auto msg = dest->CreateRequestMessage (r, dest->GetLastReplyTunnel ()); - outbound->GetTunnelGateway ().PutTunnelDataMsg (r->GetIdentHash (), 0, msg); + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeRouter, + r->GetIdentHash (), 0, msg + }); } } else @@ -361,15 +371,18 @@ namespace data RequestedDestination * d2 = CreateRequestedDestination (router, false, false); d2->SetLastOutboundTunnel (outbound); I2NPMessage * msg = d2->CreateRequestMessage (dest->GetLastRouter (), inbound); - outbound->GetTunnelGateway ().PutTunnelDataMsg ( - dest->GetLastRouter ()->GetIdentHash (), 0, msg); + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeRouter, + dest->GetLastRouter ()->GetIdentHash (), 0, msg + }); } } } } - if (outbound) - outbound->GetTunnelGateway ().SendBuffer (); + if (msgs.size () > 0) + outbound->SendTunnelDataMsg (msgs); } else { @@ -398,12 +411,21 @@ namespace data rnd.GenerateBlock (randomHash, 32); RequestedDestination * dest = CreateRequestedDestination (IdentHash (randomHash), false, true); dest->SetLastOutboundTunnel (outbound); - - outbound->GetTunnelGateway ().PutTunnelDataMsg (floodfill->GetIdentHash (), 0, - CreateDatabaseStoreMsg ()); // tell floodfill about us - outbound->GetTunnelGateway ().PutTunnelDataMsg (floodfill->GetIdentHash (), 0, - dest->CreateRequestMessage (floodfill, inbound)); // explore - outbound->GetTunnelGateway ().SendBuffer (); + + std::vector msgs; + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeRouter, + floodfill->GetIdentHash (), 0, + CreateDatabaseStoreMsg () // tell floodfill about us + }); + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeRouter, + floodfill->GetIdentHash (), 0, + dest->CreateRequestMessage (floodfill, inbound) // explore + }); + outbound->SendTunnelDataMsg (msgs); } } } diff --git a/Tunnel.h b/Tunnel.h index 6c4b7d30..a189da0d 100644 --- a/Tunnel.h +++ b/Tunnel.h @@ -68,7 +68,6 @@ namespace tunnel void SendTunnelDataMsg (const uint8_t * gwHash, uint32_t gwTunnel, i2p::I2NPMessage * msg); void SendTunnelDataMsg (std::vector msgs); // multiple messages - TunnelGateway& GetTunnelGateway () { return m_Gateway; }; size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); }; // implements TunnelBase diff --git a/TunnelBase.h b/TunnelBase.h index f59d995f..6be902ca 100644 --- a/TunnelBase.h +++ b/TunnelBase.h @@ -23,8 +23,8 @@ namespace tunnel struct TunnelMessageBlock { TunnelDeliveryType deliveryType; - uint32_t tunnelID; i2p::data::IdentHash hash; + uint32_t tunnelID; I2NPMessage * data; };