2016-05-11 21:12:38 +02:00
# include "Crypto.h"
2013-12-13 03:36:24 +01:00
# include "Log.h"
# include "RouterInfo.h"
2013-12-20 03:19:44 +01:00
# include "RouterContext.h"
2013-12-31 02:46:33 +01:00
# include "Tunnel.h"
# include "Timestamp.h"
2014-10-05 14:54:59 +02:00
# include "Destination.h"
2013-12-13 03:36:24 +01:00
# include "Streaming.h"
namespace i2p
{
namespace stream
{
2014-10-07 16:44:42 +02:00
Stream : : Stream ( boost : : asio : : io_service & service , StreamingDestination & local ,
2015-03-09 00:36:33 +01:00
std : : shared_ptr < const i2p : : data : : LeaseSet > remote , int port ) : m_Service ( service ) ,
m_SendStreamID ( 0 ) , m_SequenceNumber ( 0 ) , m_LastReceivedSequenceNumber ( - 1 ) ,
m_Status ( eStreamStatusNew ) , m_IsAckSendScheduled ( false ) , m_LocalDestination ( local ) ,
2015-02-03 19:46:44 +01:00
m_RemoteLeaseSet ( remote ) , m_ReceiveTimer ( m_Service ) , m_ResendTimer ( m_Service ) ,
m_AckSendTimer ( m_Service ) , m_NumSentBytes ( 0 ) , m_NumReceivedBytes ( 0 ) , m_Port ( port ) ,
2015-03-10 16:11:42 +01:00
m_WindowSize ( MIN_WINDOW_SIZE ) , m_RTT ( INITIAL_RTT ) , m_RTO ( INITIAL_RTO ) ,
2015-03-21 21:26:14 +01:00
m_LastWindowSizeIncreaseTime ( 0 ) , m_NumResendAttempts ( 0 )
2013-12-20 03:19:44 +01:00
{
2015-11-03 15:15:49 +01:00
RAND_bytes ( ( uint8_t * ) & m_RecvStreamID , 4 ) ;
2015-03-23 23:07:43 +01:00
m_RemoteIdentity = remote - > GetIdentity ( ) ;
2013-12-20 03:19:44 +01:00
}
2014-10-07 16:44:42 +02:00
Stream : : Stream ( boost : : asio : : io_service & service , StreamingDestination & local ) :
2014-08-07 01:19:59 +02:00
m_Service ( service ) , m_SendStreamID ( 0 ) , m_SequenceNumber ( 0 ) , m_LastReceivedSequenceNumber ( - 1 ) ,
2015-03-09 00:36:33 +01:00
m_Status ( eStreamStatusNew ) , m_IsAckSendScheduled ( false ) , m_LocalDestination ( local ) ,
2015-01-27 20:55:46 +01:00
m_ReceiveTimer ( m_Service ) , m_ResendTimer ( m_Service ) , m_AckSendTimer ( m_Service ) ,
2015-02-03 19:46:44 +01:00
m_NumSentBytes ( 0 ) , m_NumReceivedBytes ( 0 ) , m_Port ( 0 ) , m_WindowSize ( MIN_WINDOW_SIZE ) ,
2015-03-21 21:26:14 +01:00
m_RTT ( INITIAL_RTT ) , m_RTO ( INITIAL_RTO ) , m_LastWindowSizeIncreaseTime ( 0 ) , m_NumResendAttempts ( 0 )
2014-08-01 20:54:14 +02:00
{
2015-11-03 15:15:49 +01:00
RAND_bytes ( ( uint8_t * ) & m_RecvStreamID , 4 ) ;
2014-08-01 20:54:14 +02:00
}
2014-01-11 02:21:38 +01:00
Stream : : ~ Stream ( )
2014-10-04 22:27:21 +02:00
{
2014-04-12 03:13:52 +02:00
while ( ! m_ReceiveQueue . empty ( ) )
{
auto packet = m_ReceiveQueue . front ( ) ;
m_ReceiveQueue . pop ( ) ;
2014-01-11 02:21:38 +01:00
delete packet ;
2015-03-10 03:05:26 +01:00
}
2014-08-11 00:27:23 +02:00
for ( auto it : m_SentPackets )
delete it ;
2014-10-04 22:27:21 +02:00
m_SentPackets . clear ( ) ;
2015-03-10 03:05:26 +01:00
2014-10-04 22:27:21 +02:00
for ( auto it : m_SavedPackets )
delete it ;
m_SavedPackets . clear ( ) ;
2015-03-10 03:05:26 +01:00
2015-12-18 14:33:58 +01:00
LogPrint ( eLogDebug , " Streaming: Stream deleted " ) ;
2015-03-10 03:05:26 +01:00
}
void Stream : : Terminate ( )
{
m_AckSendTimer . cancel ( ) ;
m_ReceiveTimer . cancel ( ) ;
m_ResendTimer . cancel ( ) ;
2015-04-09 21:07:25 +02:00
if ( m_SendHandler )
{
2015-04-10 19:19:23 +02:00
auto handler = m_SendHandler ;
2015-04-09 21:07:25 +02:00
m_SendHandler = nullptr ;
2015-04-10 19:19:23 +02:00
handler ( boost : : asio : : error : : make_error_code ( boost : : asio : : error : : operation_aborted ) ) ;
2015-04-09 21:07:25 +02:00
}
2016-07-05 15:52:18 +02:00
m_LocalDestination . DeleteStream ( shared_from_this ( ) ) ;
2014-01-11 02:21:38 +01:00
}
void Stream : : HandleNextPacket ( Packet * packet )
{
2014-10-13 23:03:27 +02:00
m_NumReceivedBytes + = packet - > GetLength ( ) ;
2014-08-01 20:54:14 +02:00
if ( ! m_SendStreamID )
m_SendStreamID = packet - > GetReceiveStreamID ( ) ;
2014-08-11 00:27:23 +02:00
if ( ! packet - > IsNoAck ( ) ) // ack received
ProcessAck ( packet ) ;
2014-08-07 01:19:59 +02:00
int32_t receivedSeqn = packet - > GetSeqn ( ) ;
2014-08-07 04:08:57 +02:00
bool isSyn = packet - > IsSYN ( ) ;
if ( ! receivedSeqn & & ! isSyn )
2014-08-06 21:44:00 +02:00
{
// plain ack
2015-12-18 14:33:58 +01:00
LogPrint ( eLogDebug , " Streaming: Plain ACK received " ) ;
2014-08-06 21:44:00 +02:00
delete packet ;
return ;
}
2016-06-29 03:00:00 +02:00
LogPrint ( eLogDebug , " Streaming: Received seqn= " , receivedSeqn , " on sSID= " , m_SendStreamID ) ;
2016-02-15 21:16:53 +01:00
if ( receivedSeqn = = m_LastReceivedSequenceNumber + 1 )
2014-01-19 18:01:12 +01:00
{
2014-02-02 04:20:41 +01:00
// we have received next in sequence message
ProcessPacket ( packet ) ;
2014-08-07 04:08:57 +02:00
2014-01-27 00:22:30 +01:00
// we should also try stored messages if any
for ( auto it = m_SavedPackets . begin ( ) ; it ! = m_SavedPackets . end ( ) ; )
{
2014-08-07 01:19:59 +02:00
if ( ( * it ) - > GetSeqn ( ) = = ( uint32_t ) ( m_LastReceivedSequenceNumber + 1 ) )
2014-01-27 00:22:30 +01:00
{
2014-02-02 04:20:41 +01:00
Packet * savedPacket = * it ;
2014-01-27 00:22:30 +01:00
m_SavedPackets . erase ( it + + ) ;
2014-02-02 04:20:41 +01:00
ProcessPacket ( savedPacket ) ;
2014-01-27 00:22:30 +01:00
}
else
break ;
2014-03-25 22:43:36 +01:00
}
2014-10-10 17:53:27 +02:00
// schedule ack for last message
2015-03-09 00:36:33 +01:00
if ( m_Status = = eStreamStatusOpen )
2014-10-10 17:53:27 +02:00
{
if ( ! m_IsAckSendScheduled )
{
m_IsAckSendScheduled = true ;
2016-02-14 05:10:51 +01:00
auto ackTimeout = m_RTT / 10 ;
if ( ackTimeout > ACK_SEND_TIMEOUT ) ackTimeout = ACK_SEND_TIMEOUT ;
m_AckSendTimer . expires_from_now ( boost : : posix_time : : milliseconds ( ackTimeout ) ) ;
2014-11-23 23:00:45 +01:00
m_AckSendTimer . async_wait ( std : : bind ( & Stream : : HandleAckSendTimer ,
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
2014-10-10 17:53:27 +02:00
}
}
2014-08-07 04:08:57 +02:00
else if ( isSyn )
// we have to send SYN back to incoming connection
2015-03-09 00:36:33 +01:00
SendBuffer ( ) ; // also sets m_IsOpen
2014-01-19 18:01:12 +01:00
}
else
{
if ( receivedSeqn < = m_LastReceivedSequenceNumber )
{
2015-01-30 01:17:44 +01:00
// we have received duplicate
2016-06-29 03:00:00 +02:00
LogPrint ( eLogWarning , " Streaming: Duplicate message " , receivedSeqn , " on sSID= " , m_SendStreamID ) ;
2014-07-09 04:09:58 +02:00
SendQuickAck ( ) ; // resend ack for previous message again
2014-01-27 00:22:30 +01:00
delete packet ; // packet dropped
2014-01-19 18:01:12 +01:00
}
else
{
2016-06-29 03:00:00 +02:00
LogPrint ( eLogWarning , " Streaming: Missing messages on sSID= " , m_SendStreamID , " : from " , m_LastReceivedSequenceNumber + 1 , " to " , receivedSeqn - 1 ) ;
2014-01-27 00:22:30 +01:00
// save message and wait for missing message again
SavePacket ( packet ) ;
2015-03-23 23:07:43 +01:00
if ( m_LastReceivedSequenceNumber > = 0 )
{
// send NACKs for missing messages ASAP
if ( m_IsAckSendScheduled )
{
m_IsAckSendScheduled = false ;
m_AckSendTimer . cancel ( ) ;
}
SendQuickAck ( ) ;
}
else
2014-11-29 22:21:19 +01:00
{
2015-03-23 23:07:43 +01:00
// wait for SYN
m_IsAckSendScheduled = true ;
m_AckSendTimer . expires_from_now ( boost : : posix_time : : milliseconds ( ACK_SEND_TIMEOUT ) ) ;
m_AckSendTimer . async_wait ( std : : bind ( & Stream : : HandleAckSendTimer ,
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
}
2014-01-19 18:01:12 +01:00
}
}
2014-02-02 04:20:41 +01:00
}
void Stream : : SavePacket ( Packet * packet )
{
2016-03-27 18:06:00 +02:00
if ( ! m_SavedPackets . insert ( packet ) . second )
delete packet ;
2014-02-02 04:20:41 +01:00
}
void Stream : : ProcessPacket ( Packet * packet )
{
// process flags
uint32_t receivedSeqn = packet - > GetSeqn ( ) ;
uint16_t flags = packet - > GetFlags ( ) ;
2015-12-18 14:33:58 +01:00
LogPrint ( eLogDebug , " Streaming: Process seqn= " , receivedSeqn , " , flags= " , flags ) ;
2014-02-02 04:20:41 +01:00
const uint8_t * optionData = packet - > GetOptionData ( ) ;
2014-08-07 01:19:59 +02:00
if ( flags & PACKET_FLAG_DELAY_REQUESTED )
optionData + = 2 ;
2014-02-02 04:20:41 +01:00
if ( flags & PACKET_FLAG_FROM_INCLUDED )
{
2015-11-03 15:15:49 +01:00
m_RemoteIdentity = std : : make_shared < i2p : : data : : IdentityEx > ( optionData , packet - > GetOptionSize ( ) ) ;
optionData + = m_RemoteIdentity - > GetFullLen ( ) ;
2014-08-07 01:19:59 +02:00
if ( ! m_RemoteLeaseSet )
2016-06-29 03:00:00 +02:00
LogPrint ( eLogDebug , " Streaming: Incoming stream from " , m_RemoteIdentity - > GetIdentHash ( ) . ToBase64 ( ) , " , sSID= " , m_SendStreamID , " , rSID= " , m_RecvStreamID ) ;
2014-02-02 04:20:41 +01:00
}
2014-08-05 00:22:54 +02:00
if ( flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED )
{
2014-12-29 23:04:02 +01:00
uint16_t maxPacketSize = bufbe16toh ( optionData ) ;
2015-12-24 02:47:44 +01:00
LogPrint ( eLogDebug , " Streaming: Max packet size " , maxPacketSize ) ;
2014-08-05 00:22:54 +02:00
optionData + = 2 ;
}
if ( flags & PACKET_FLAG_SIGNATURE_INCLUDED )
{
2014-08-23 01:47:29 +02:00
uint8_t signature [ 256 ] ;
2015-11-03 15:15:49 +01:00
auto signatureLen = m_RemoteIdentity - > GetSignatureLen ( ) ;
2014-08-23 01:47:29 +02:00
memcpy ( signature , optionData , signatureLen ) ;
memset ( const_cast < uint8_t * > ( optionData ) , 0 , signatureLen ) ;
2015-11-03 15:15:49 +01:00
if ( ! m_RemoteIdentity - > Verify ( packet - > GetBuffer ( ) , packet - > GetLength ( ) , signature ) )
2014-08-23 01:47:29 +02:00
{
2016-07-12 02:00:00 +02:00
LogPrint ( eLogError , " Streaming: Signature verification failed, sSID= " , m_SendStreamID , " , rSID= " , m_RecvStreamID ) ;
2015-12-18 14:33:58 +01:00
Close ( ) ;
2014-08-23 01:47:29 +02:00
flags | = PACKET_FLAG_CLOSE ;
}
memcpy ( const_cast < uint8_t * > ( optionData ) , signature , signatureLen ) ;
optionData + = signatureLen ;
2014-08-05 00:22:54 +02:00
}
2014-02-02 04:20:41 +01:00
packet - > offset = packet - > GetPayload ( ) - packet - > buf ;
if ( packet - > GetLength ( ) > 0 )
2014-03-29 13:11:00 +01:00
{
2014-04-12 03:13:52 +02:00
m_ReceiveQueue . push ( packet ) ;
2014-03-29 13:11:00 +01:00
m_ReceiveTimer . cancel ( ) ;
}
2014-02-02 04:20:41 +01:00
else
delete packet ;
m_LastReceivedSequenceNumber = receivedSeqn ;
2016-07-05 15:52:18 +02:00
if ( flags & PACKET_FLAG_RESET )
2014-01-12 21:57:10 +01:00
{
2016-07-12 02:00:00 +02:00
LogPrint ( eLogDebug , " Streaming: closing stream sSID= " , m_SendStreamID , " , rSID= " , m_RecvStreamID , " : reset flag received in packet # " , receivedSeqn ) ;
2015-03-09 00:36:33 +01:00
m_Status = eStreamStatusReset ;
2015-03-04 04:46:52 +01:00
Close ( ) ;
2014-01-15 01:00:12 +01:00
}
2016-07-05 15:52:18 +02:00
else if ( flags & PACKET_FLAG_CLOSE )
{
if ( m_Status ! = eStreamStatusClosed )
SendClose ( ) ;
m_Status = eStreamStatusClosed ;
Terminate ( ) ;
}
2013-12-13 03:36:24 +01:00
}
2014-08-11 00:27:23 +02:00
void Stream : : ProcessAck ( Packet * packet )
{
2015-01-25 23:43:34 +01:00
bool acknowledged = false ;
2015-02-03 19:46:44 +01:00
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
2014-08-11 00:27:23 +02:00
uint32_t ackThrough = packet - > GetAckThrough ( ) ;
2016-06-09 20:34:38 +02:00
if ( ackThrough > m_SequenceNumber )
{
LogPrint ( eLogError , " Streaming: Unexpected ackThrough= " , ackThrough , " > seqn= " , m_SequenceNumber ) ;
return ;
}
2014-08-12 21:57:23 +02:00
int nackCount = packet - > GetNACKCount ( ) ;
2014-08-11 00:27:23 +02:00
for ( auto it = m_SentPackets . begin ( ) ; it ! = m_SentPackets . end ( ) ; )
2016-08-09 00:53:37 +02:00
{
2014-08-12 21:57:23 +02:00
auto seqn = ( * it ) - > GetSeqn ( ) ;
if ( seqn < = ackThrough )
2014-08-11 00:27:23 +02:00
{
2014-08-12 21:57:23 +02:00
if ( nackCount > 0 )
{
bool nacked = false ;
for ( int i = 0 ; i < nackCount ; i + + )
if ( seqn = = packet - > GetNACK ( i ) )
{
nacked = true ;
break ;
}
if ( nacked )
{
2015-12-18 14:33:58 +01:00
LogPrint ( eLogDebug , " Streaming: Packet " , seqn , " NACK " ) ;
2016-08-09 00:53:37 +02:00
+ + it ;
2014-08-12 21:57:23 +02:00
continue ;
}
}
2014-08-11 00:27:23 +02:00
auto sentPacket = * it ;
2016-09-03 17:46:47 +02:00
uint64_t rtt = ts - sentPacket - > sendTime ;
if ( ts < sentPacket - > sendTime )
{
LogPrint ( eLogError , " Streaming: Packet " , seqn , " sent from the future, sendTime= " , sentPacket - > sendTime ) ;
rtt = 1 ;
}
2015-02-03 19:46:44 +01:00
m_RTT = ( m_RTT * seqn + rtt ) / ( seqn + 1 ) ;
2015-03-10 16:11:42 +01:00
m_RTO = m_RTT * 1.5 ; // TODO: implement it better
2016-07-28 16:01:20 +02:00
LogPrint ( eLogDebug , " Streaming: Packet " , seqn , " acknowledged rtt= " , rtt , " sentTime= " , sentPacket - > sendTime ) ;
2014-08-11 00:27:23 +02:00
m_SentPackets . erase ( it + + ) ;
delete sentPacket ;
2015-01-25 23:43:34 +01:00
acknowledged = true ;
2015-01-29 21:34:43 +01:00
if ( m_WindowSize < WINDOW_SIZE )
m_WindowSize + + ; // slow start
else
{
// linear growth
2015-02-03 19:46:44 +01:00
if ( ts > m_LastWindowSizeIncreaseTime + m_RTT )
2015-01-29 21:34:43 +01:00
{
m_WindowSize + + ;
if ( m_WindowSize > MAX_WINDOW_SIZE ) m_WindowSize = MAX_WINDOW_SIZE ;
m_LastWindowSizeIncreaseTime = ts ;
}
}
2016-02-11 04:51:08 +01:00
if ( ! seqn & & m_RoutingSession ) // first message confirmed
m_RoutingSession - > SetSharedRoutingPath (
std : : make_shared < i2p : : garlic : : GarlicRoutingPath > (
2016-02-14 05:02:58 +01:00
i2p : : garlic : : GarlicRoutingPath { m_CurrentOutboundTunnel , m_CurrentRemoteLease , m_RTT , 0 , 0 } ) ) ;
2014-08-11 00:27:23 +02:00
}
else
break ;
}
if ( m_SentPackets . empty ( ) )
m_ResendTimer . cancel ( ) ;
2015-01-25 23:43:34 +01:00
if ( acknowledged )
2015-03-21 21:26:14 +01:00
{
m_NumResendAttempts = 0 ;
2015-01-25 23:43:34 +01:00
SendBuffer ( ) ;
2015-03-21 21:26:14 +01:00
}
2016-07-05 23:52:11 +02:00
if ( m_Status = = eStreamStatusClosed )
Terminate ( ) ;
else if ( m_Status = = eStreamStatusClosing )
2016-07-05 15:52:18 +02:00
Close ( ) ; // check is all outgoing messages have been sent and we can send close
2014-08-11 00:27:23 +02:00
}
2014-01-27 00:22:30 +01:00
2014-10-02 03:18:41 +02:00
size_t Stream : : Send ( const uint8_t * buf , size_t len )
2014-01-02 00:19:03 +01:00
{
2015-01-25 22:18:26 +01:00
if ( len > 0 & & buf )
{
std : : unique_lock < std : : mutex > l ( m_SendBufferMutex ) ;
m_SendBuffer . clear ( ) ;
m_SendBuffer . write ( ( const char * ) buf , len ) ;
}
m_Service . post ( std : : bind ( & Stream : : SendBuffer , shared_from_this ( ) ) ) ;
2014-04-19 01:27:39 +02:00
return len ;
2014-01-02 00:19:03 +01:00
}
2014-04-19 01:27:39 +02:00
2015-04-09 21:07:25 +02:00
void Stream : : AsyncSend ( const uint8_t * buf , size_t len , SendHandler handler )
{
if ( m_SendHandler )
handler ( boost : : asio : : error : : make_error_code ( boost : : asio : : error : : in_progress ) ) ;
else
m_SendHandler = handler ;
Send ( buf , len ) ;
}
2015-01-25 22:18:26 +01:00
void Stream : : SendBuffer ( )
2015-01-25 23:43:34 +01:00
{
2015-01-29 21:34:43 +01:00
int numMsgs = m_WindowSize - m_SentPackets . size ( ) ;
2015-01-25 23:43:34 +01:00
if ( numMsgs < = 0 ) return ; // window is full
2015-01-25 22:18:26 +01:00
bool isNoAck = m_LastReceivedSequenceNumber < 0 ; // first packet
std : : vector < Packet * > packets ;
{
std : : unique_lock < std : : mutex > l ( m_SendBufferMutex ) ;
2015-03-09 00:36:33 +01:00
while ( ( m_Status = = eStreamStatusNew ) | | ( IsEstablished ( ) & & ! m_SendBuffer . eof ( ) & & numMsgs > 0 ) )
2015-01-25 22:18:26 +01:00
{
Packet * p = new Packet ( ) ;
uint8_t * packet = p - > GetBuffer ( ) ;
// TODO: implement setters
size_t size = 0 ;
htobe32buf ( packet + size , m_SendStreamID ) ;
size + = 4 ; // sendStreamID
htobe32buf ( packet + size , m_RecvStreamID ) ;
size + = 4 ; // receiveStreamID
htobe32buf ( packet + size , m_SequenceNumber + + ) ;
size + = 4 ; // sequenceNum
if ( isNoAck )
htobuf32 ( packet + size , 0 ) ;
2016-06-09 20:56:12 +02:00
else
htobe32buf ( packet + size , m_LastReceivedSequenceNumber ) ;
2015-01-25 22:18:26 +01:00
size + = 4 ; // ack Through
packet [ size ] = 0 ;
size + + ; // NACK count
2015-03-10 16:11:42 +01:00
packet [ size ] = m_RTO / 1000 ;
2015-01-25 22:18:26 +01:00
size + + ; // resend delay
2015-03-09 00:36:33 +01:00
if ( m_Status = = eStreamStatusNew )
2015-01-25 22:18:26 +01:00
{
// initial packet
2015-03-09 00:36:33 +01:00
m_Status = eStreamStatusOpen ;
2015-01-25 22:18:26 +01:00
uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED |
PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED ;
if ( isNoAck ) flags | = PACKET_FLAG_NO_ACK ;
htobe16buf ( packet + size , flags ) ;
size + = 2 ; // flags
2015-11-03 15:15:49 +01:00
size_t identityLen = m_LocalDestination . GetOwner ( ) - > GetIdentity ( ) - > GetFullLen ( ) ;
size_t signatureLen = m_LocalDestination . GetOwner ( ) - > GetIdentity ( ) - > GetSignatureLen ( ) ;
2015-01-25 22:18:26 +01:00
htobe16buf ( packet + size , identityLen + signatureLen + 2 ) ; // identity + signature + packet size
size + = 2 ; // options size
2015-11-03 15:15:49 +01:00
m_LocalDestination . GetOwner ( ) - > GetIdentity ( ) - > ToBuffer ( packet + size , identityLen ) ;
2015-01-25 22:18:26 +01:00
size + = identityLen ; // from
htobe16buf ( packet + size , STREAMING_MTU ) ;
size + = 2 ; // max packet size
uint8_t * signature = packet + size ; // set it later
memset ( signature , 0 , signatureLen ) ; // zeroes for now
size + = signatureLen ; // signature
m_SendBuffer . read ( ( char * ) ( packet + size ) , STREAMING_MTU - size ) ;
size + = m_SendBuffer . gcount ( ) ; // payload
2015-11-03 15:15:49 +01:00
m_LocalDestination . GetOwner ( ) - > Sign ( packet , size , signature ) ;
2015-01-25 22:18:26 +01:00
}
else
{
// follow on packet
htobuf16 ( packet + size , 0 ) ;
size + = 2 ; // flags
htobuf16 ( packet + size , 0 ) ; // no options
size + = 2 ; // options size
m_SendBuffer . read ( ( char * ) ( packet + size ) , STREAMING_MTU - size ) ;
size + = m_SendBuffer . gcount ( ) ; // payload
}
p - > len = size ;
packets . push_back ( p ) ;
2015-01-25 23:43:34 +01:00
numMsgs - - ;
2015-01-25 22:18:26 +01:00
}
2015-04-09 21:07:25 +02:00
if ( m_SendBuffer . eof ( ) & & m_SendHandler )
{
m_SendHandler ( boost : : system : : error_code ( ) ) ;
m_SendHandler = nullptr ;
}
2015-01-25 22:18:26 +01:00
}
if ( packets . size ( ) > 0 )
{
2016-06-25 03:54:58 +02:00
if ( m_SavedPackets . empty ( ) ) // no NACKS
{
m_IsAckSendScheduled = false ;
m_AckSendTimer . cancel ( ) ;
}
2015-01-25 22:18:26 +01:00
bool isEmpty = m_SentPackets . empty ( ) ;
2015-02-03 19:46:44 +01:00
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
2016-08-09 00:53:37 +02:00
for ( auto & it : packets )
2015-02-03 19:46:44 +01:00
{
it - > sendTime = ts ;
2015-01-25 22:18:26 +01:00
m_SentPackets . insert ( it ) ;
2015-02-03 19:46:44 +01:00
}
2015-01-25 22:18:26 +01:00
SendPackets ( packets ) ;
2015-03-09 00:36:33 +01:00
if ( m_Status = = eStreamStatusClosing & & m_SendBuffer . eof ( ) )
2015-03-07 03:39:05 +01:00
SendClose ( ) ;
2015-01-25 22:18:26 +01:00
if ( isEmpty )
ScheduleResend ( ) ;
}
}
2014-01-12 21:57:10 +01:00
2014-08-08 04:03:25 +02:00
void Stream : : SendQuickAck ( )
2014-01-11 04:23:17 +01:00
{
2014-11-29 22:21:19 +01:00
int32_t lastReceivedSeqn = m_LastReceivedSequenceNumber ;
if ( ! m_SavedPackets . empty ( ) )
{
int32_t seqn = ( * m_SavedPackets . rbegin ( ) ) - > GetSeqn ( ) ;
if ( seqn > lastReceivedSeqn ) lastReceivedSeqn = seqn ;
}
if ( lastReceivedSeqn < 0 )
{
2015-12-18 14:33:58 +01:00
LogPrint ( eLogError , " Streaming: No packets have been received yet " ) ;
2014-11-29 22:21:19 +01:00
return ;
}
2014-08-12 22:52:04 +02:00
Packet p ;
uint8_t * packet = p . GetBuffer ( ) ;
2014-01-11 04:23:17 +01:00
size_t size = 0 ;
2014-12-30 15:37:24 +01:00
htobe32buf ( packet + size , m_SendStreamID ) ;
2014-01-11 04:23:17 +01:00
size + = 4 ; // sendStreamID
2014-12-30 15:37:24 +01:00
htobe32buf ( packet + size , m_RecvStreamID ) ;
2014-01-11 04:23:17 +01:00
size + = 4 ; // receiveStreamID
2014-12-30 15:37:24 +01:00
htobuf32 ( packet + size , 0 ) ; // this is plain Ack message
2014-01-11 04:23:17 +01:00
size + = 4 ; // sequenceNum
2014-12-30 15:37:24 +01:00
htobe32buf ( packet + size , lastReceivedSeqn ) ;
2014-01-11 04:23:17 +01:00
size + = 4 ; // ack Through
2015-01-01 15:53:30 +01:00
uint8_t numNacks = 0 ;
2014-11-29 22:21:19 +01:00
if ( lastReceivedSeqn > m_LastReceivedSequenceNumber )
{
// fill NACKs
uint8_t * nacks = packet + size + 1 ;
auto nextSeqn = m_LastReceivedSequenceNumber + 1 ;
for ( auto it : m_SavedPackets )
{
auto seqn = it - > GetSeqn ( ) ;
2015-01-26 04:01:09 +01:00
if ( numNacks + ( seqn - nextSeqn ) > = 256 )
{
2016-06-29 03:00:00 +02:00
LogPrint ( eLogError , " Streaming: Number of NACKs exceeds 256. seqn= " , seqn , " nextSeqn= " , nextSeqn ) ;
2015-01-26 04:01:09 +01:00
htobe32buf ( packet + 12 , nextSeqn ) ; // change ack Through
break ;
}
2014-11-29 22:21:19 +01:00
for ( uint32_t i = nextSeqn ; i < seqn ; i + + )
{
2014-12-30 15:37:24 +01:00
htobe32buf ( nacks , i ) ;
2014-11-29 22:21:19 +01:00
nacks + = 4 ;
numNacks + + ;
}
nextSeqn = seqn + 1 ;
}
packet [ size ] = numNacks ;
size + + ; // NACK count
2014-11-30 01:15:41 +01:00
size + = numNacks * 4 ; // NACKs
2014-11-29 22:21:19 +01:00
}
else
{
// No NACKs
packet [ size ] = 0 ;
size + + ; // NACK count
}
2014-01-11 04:23:17 +01:00
size + + ; // resend delay
2014-12-30 15:37:24 +01:00
htobuf16 ( packet + size , 0 ) ; // nof flags set
2014-01-11 04:23:17 +01:00
size + = 2 ; // flags
2014-12-30 15:37:24 +01:00
htobuf16 ( packet + size , 0 ) ; // no options
2014-01-11 04:23:17 +01:00
size + = 2 ; // options size
2014-08-12 22:52:04 +02:00
p . len = size ;
2014-08-13 00:42:53 +02:00
SendPackets ( std : : vector < Packet * > { & p } ) ;
2015-12-18 14:33:58 +01:00
LogPrint ( eLogDebug , " Streaming: Quick Ack sent. " , ( int ) numNacks , " NACKs " ) ;
2014-01-11 04:23:17 +01:00
}
2014-01-12 21:57:10 +01:00
void Stream : : Close ( )
{
2016-07-12 02:00:00 +02:00
LogPrint ( eLogDebug , " Streaming: closing stream with sSID= " , m_SendStreamID , " , rSID= " , m_RecvStreamID , " , status= " , m_Status ) ;
2015-03-09 17:06:35 +01:00
switch ( m_Status )
{
case eStreamStatusOpen :
m_Status = eStreamStatusClosing ;
Close ( ) ; // recursion
if ( m_Status = = eStreamStatusClosing ) //still closing
2016-06-29 03:00:00 +02:00
LogPrint ( eLogDebug , " Streaming: Trying to send stream data before closing, sSID= " , m_SendStreamID ) ;
2015-03-09 17:06:35 +01:00
break ;
case eStreamStatusReset :
2016-07-05 15:52:18 +02:00
// TODO: send reset
Terminate ( ) ;
2015-03-09 17:06:35 +01:00
break ;
case eStreamStatusClosing :
if ( m_SentPackets . empty ( ) & & m_SendBuffer . eof ( ) ) // nothing to send
{
2015-03-10 02:37:51 +01:00
m_Status = eStreamStatusClosed ;
2015-03-09 17:06:35 +01:00
SendClose ( ) ;
}
break ;
case eStreamStatusClosed :
// already closed
Terminate ( ) ;
break ;
default :
2016-06-29 03:00:00 +02:00
LogPrint ( eLogWarning , " Streaming: Unexpected stream status " , ( int ) m_Status , " sSID= " , m_SendStreamID ) ;
2015-03-09 17:06:35 +01:00
} ;
2014-01-12 21:57:10 +01:00
}
2014-03-26 20:06:27 +01:00
2015-04-13 17:38:23 +02:00
void Stream : : SendClose ( )
2015-03-07 03:39:05 +01:00
{
Packet * p = new Packet ( ) ;
uint8_t * packet = p - > GetBuffer ( ) ;
size_t size = 0 ;
htobe32buf ( packet + size , m_SendStreamID ) ;
size + = 4 ; // sendStreamID
htobe32buf ( packet + size , m_RecvStreamID ) ;
size + = 4 ; // receiveStreamID
htobe32buf ( packet + size , m_SequenceNumber + + ) ;
size + = 4 ; // sequenceNum
2016-06-09 20:34:38 +02:00
htobe32buf ( packet + size , m_LastReceivedSequenceNumber > = 0 ? m_LastReceivedSequenceNumber : 0 ) ;
2015-03-07 03:39:05 +01:00
size + = 4 ; // ack Through
packet [ size ] = 0 ;
size + + ; // NACK count
size + + ; // resend delay
2015-04-13 17:38:23 +02:00
htobe16buf ( packet + size , PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED ) ;
2015-03-07 03:39:05 +01:00
size + = 2 ; // flags
2015-11-03 15:15:49 +01:00
size_t signatureLen = m_LocalDestination . GetOwner ( ) - > GetIdentity ( ) - > GetSignatureLen ( ) ;
2015-03-07 03:39:05 +01:00
htobe16buf ( packet + size , signatureLen ) ; // signature only
size + = 2 ; // options size
uint8_t * signature = packet + size ;
memset ( packet + size , 0 , signatureLen ) ;
size + = signatureLen ; // signature
2015-11-03 15:15:49 +01:00
m_LocalDestination . GetOwner ( ) - > Sign ( packet , size , signature ) ;
2015-03-07 03:39:05 +01:00
p - > len = size ;
m_Service . post ( std : : bind ( & Stream : : SendPacket , shared_from_this ( ) , p ) ) ;
2016-06-29 03:00:00 +02:00
LogPrint ( eLogDebug , " Streaming: FIN sent, sSID= " , m_SendStreamID ) ;
2015-03-07 03:39:05 +01:00
}
2014-03-26 20:06:27 +01:00
size_t Stream : : ConcatenatePackets ( uint8_t * buf , size_t len )
{
2014-01-11 02:21:38 +01:00
size_t pos = 0 ;
2014-04-12 03:13:52 +02:00
while ( pos < len & & ! m_ReceiveQueue . empty ( ) )
2014-01-11 02:21:38 +01:00
{
2014-04-12 03:13:52 +02:00
Packet * packet = m_ReceiveQueue . front ( ) ;
size_t l = std : : min ( packet - > GetLength ( ) , len - pos ) ;
memcpy ( buf + pos , packet - > GetBuffer ( ) , l ) ;
pos + = l ;
packet - > offset + = l ;
if ( ! packet - > GetLength ( ) )
2014-01-11 02:21:38 +01:00
{
2014-04-12 03:13:52 +02:00
m_ReceiveQueue . pop ( ) ;
delete packet ;
}
2014-01-11 02:21:38 +01:00
}
return pos ;
2014-03-26 20:06:27 +01:00
}
2014-03-18 03:55:02 +01:00
2014-03-25 00:27:20 +01:00
bool Stream : : SendPacket ( Packet * packet )
{
if ( packet )
{
2014-10-10 21:58:17 +02:00
if ( m_IsAckSendScheduled )
{
m_IsAckSendScheduled = false ;
m_AckSendTimer . cancel ( ) ;
}
2014-08-12 22:52:04 +02:00
SendPackets ( std : : vector < Packet * > { packet } ) ;
2016-07-05 15:52:18 +02:00
bool isEmpty = m_SentPackets . empty ( ) ;
m_SentPackets . insert ( packet ) ;
if ( isEmpty )
ScheduleResend ( ) ;
2014-10-04 22:27:21 +02:00
return true ;
2014-03-25 00:27:20 +01:00
}
else
return false ;
}
2014-11-27 01:25:12 +01:00
2014-08-12 22:35:35 +02:00
void Stream : : SendPackets ( const std : : vector < Packet * > & packets )
{
if ( ! m_RemoteLeaseSet )
{
UpdateCurrentRemoteLease ( ) ;
if ( ! m_RemoteLeaseSet )
{
2016-06-29 03:00:00 +02:00
LogPrint ( eLogError , " Streaming: Can't send packets, missing remote LeaseSet, sSID= " , m_SendStreamID ) ;
2014-08-12 22:35:35 +02:00
return ;
}
}
2016-11-22 21:20:48 +01:00
if ( ! m_RoutingSession | | ! m_RoutingSession - > GetOwner ( ) ) // expired and detached
m_RoutingSession = m_LocalDestination . GetOwner ( ) - > GetRoutingSession ( m_RemoteLeaseSet , true ) ;
if ( ! m_CurrentOutboundTunnel & & m_RoutingSession ) // first message to send
2016-02-11 04:51:08 +01:00
{
// try to get shared path first
2016-11-22 21:20:48 +01:00
auto routingPath = m_RoutingSession - > GetSharedRoutingPath ( ) ;
if ( routingPath )
{
m_CurrentOutboundTunnel = routingPath - > outboundTunnel ;
m_CurrentRemoteLease = routingPath - > remoteLease ;
m_RTT = routingPath - > rtt ;
m_RTO = m_RTT * 1.5 ; // TODO: implement it better
}
2016-02-11 04:51:08 +01:00
}
2015-01-28 04:31:57 +01:00
if ( ! m_CurrentOutboundTunnel | | ! m_CurrentOutboundTunnel - > IsEstablished ( ) )
2015-11-03 15:15:49 +01:00
m_CurrentOutboundTunnel = m_LocalDestination . GetOwner ( ) - > GetTunnelPool ( ) - > GetNewOutboundTunnel ( m_CurrentOutboundTunnel ) ;
2014-11-26 19:20:35 +01:00
if ( ! m_CurrentOutboundTunnel )
{
2016-06-29 03:00:00 +02:00
LogPrint ( eLogError , " Streaming: No outbound tunnels in the pool, sSID= " , m_SendStreamID ) ;
2014-11-26 19:20:35 +01:00
return ;
}
2014-08-12 22:35:35 +02:00
2015-11-03 15:15:49 +01:00
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
2016-10-27 03:40:06 +02:00
if ( ! m_CurrentRemoteLease | | ! m_CurrentRemoteLease - > endDate | | // excluded from LeaseSet
ts > = m_CurrentRemoteLease - > endDate - i2p : : data : : LEASE_ENDDATE_THRESHOLD )
2015-04-16 17:38:36 +02:00
UpdateCurrentRemoteLease ( true ) ;
2016-02-13 02:56:29 +01:00
if ( m_CurrentRemoteLease & & ts < m_CurrentRemoteLease - > endDate + i2p : : data : : LEASE_ENDDATE_THRESHOLD )
2014-10-07 16:33:17 +02:00
{
std : : vector < i2p : : tunnel : : TunnelMessageBlock > msgs ;
for ( auto it : packets )
{
2016-02-29 20:44:15 +01:00
auto msg = m_RoutingSession - > WrapSingleMessage ( m_LocalDestination . CreateDataMessage ( it - > GetBuffer ( ) , it - > GetLength ( ) , m_Port ) ) ;
2014-10-07 16:33:17 +02:00
msgs . push_back ( i2p : : tunnel : : TunnelMessageBlock
2015-03-22 23:34:39 +01:00
{
i2p : : tunnel : : eDeliveryTypeTunnel ,
2016-02-09 16:46:27 +01:00
m_CurrentRemoteLease - > tunnelGateway , m_CurrentRemoteLease - > tunnelID ,
2015-06-22 04:29:50 +02:00
msg
2015-03-22 23:34:39 +01:00
} ) ;
2014-10-13 23:03:27 +02:00
m_NumSentBytes + = it - > GetLength ( ) ;
2014-10-07 16:33:17 +02:00
}
2014-11-26 19:20:35 +01:00
m_CurrentOutboundTunnel - > SendTunnelDataMsg ( msgs ) ;
2014-08-12 22:35:35 +02:00
}
2014-10-07 16:33:17 +02:00
else
2016-10-26 19:02:19 +02:00
{
LogPrint ( eLogWarning , " Streaming: Remote lease is not available, sSID= " , m_SendStreamID ) ;
if ( m_RoutingSession )
m_RoutingSession - > SetSharedRoutingPath ( nullptr ) ; // invalidate routing path
}
2014-08-12 22:35:35 +02:00
}
2016-10-23 02:08:15 +02:00
void Stream : : SendUpdatedLeaseSet ( )
{
2016-10-25 02:58:25 +02:00
if ( m_RoutingSession )
{
if ( m_RoutingSession - > IsLeaseSetNonConfirmed ( ) )
{
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
if ( ts > m_RoutingSession - > GetLeaseSetSubmissionTime ( ) + i2p : : garlic : : LEASET_CONFIRMATION_TIMEOUT )
{
// LeaseSet was not confirmed, should try other tunnels
LogPrint ( eLogWarning , " Streaming: LeaseSet was not confrimed in " , i2p : : garlic : : LEASET_CONFIRMATION_TIMEOUT , " milliseconds. Trying to resubmit " ) ;
m_RoutingSession - > SetSharedRoutingPath ( nullptr ) ;
m_CurrentOutboundTunnel = nullptr ;
m_CurrentRemoteLease = nullptr ;
SendQuickAck ( ) ;
2016-10-23 22:16:08 +02:00
}
2016-10-25 02:58:25 +02:00
}
else if ( m_RoutingSession - > IsLeaseSetUpdated ( ) )
2016-10-23 22:16:08 +02:00
{
2016-10-25 02:58:25 +02:00
LogPrint ( eLogDebug , " Streaming: sending updated LeaseSet " ) ;
SendQuickAck ( ) ;
2016-10-23 22:16:08 +02:00
}
2016-10-23 02:08:15 +02:00
}
}
2015-03-23 18:08:04 +01:00
2014-08-11 00:27:23 +02:00
void Stream : : ScheduleResend ( )
{
m_ResendTimer . cancel ( ) ;
2016-08-29 16:41:15 +02:00
// check for invalid value
2016-10-30 14:29:43 +01:00
if ( m_RTO < = 0 ) m_RTO = INITIAL_RTO ;
2015-03-10 16:11:42 +01:00
m_ResendTimer . expires_from_now ( boost : : posix_time : : milliseconds ( m_RTO ) ) ;
2014-11-23 23:00:45 +01:00
m_ResendTimer . async_wait ( std : : bind ( & Stream : : HandleResendTimer ,
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
2014-08-11 00:27:23 +02:00
}
void Stream : : HandleResendTimer ( const boost : : system : : error_code & ecode )
{
2015-03-21 21:26:14 +01:00
if ( ecode ! = boost : : asio : : error : : operation_aborted )
2014-08-11 00:27:23 +02:00
{
2015-03-21 21:26:14 +01:00
// check for resend attempts
if ( m_NumResendAttempts > = MAX_NUM_RESEND_ATTEMPTS )
{
2016-07-12 02:00:00 +02:00
LogPrint ( eLogWarning , " Streaming: packet was not ACKed after " , MAX_NUM_RESEND_ATTEMPTS , " attempts, terminate, rSID= " , m_RecvStreamID , " , sSID= " , m_SendStreamID ) ;
2015-03-21 21:26:14 +01:00
m_Status = eStreamStatusReset ;
Close ( ) ;
return ;
}
// collect packets to resend
2015-02-03 19:46:44 +01:00
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
2014-08-12 22:35:35 +02:00
std : : vector < Packet * > packets ;
2014-08-11 00:27:23 +02:00
for ( auto it : m_SentPackets )
2014-08-12 01:32:07 +02:00
{
2015-03-21 21:26:14 +01:00
if ( ts > = it - > sendTime + m_RTO )
2015-02-03 19:46:44 +01:00
{
it - > sendTime = ts ;
2014-08-12 22:35:35 +02:00
packets . push_back ( it ) ;
2015-03-21 21:26:14 +01:00
}
2014-08-12 01:32:07 +02:00
}
2015-03-21 21:26:14 +01:00
// select tunnels if necessary and send
2014-08-12 22:35:35 +02:00
if ( packets . size ( ) > 0 )
2014-08-15 03:38:46 +02:00
{
2015-03-21 21:26:14 +01:00
m_NumResendAttempts + + ;
2015-04-15 17:52:49 +02:00
m_RTO * = 2 ;
2015-03-21 21:26:14 +01:00
switch ( m_NumResendAttempts )
2015-01-30 01:17:44 +01:00
{
2015-03-21 21:26:14 +01:00
case 1 : // congesion avoidance
m_WindowSize / = 2 ;
if ( m_WindowSize < MIN_WINDOW_SIZE ) m_WindowSize = MIN_WINDOW_SIZE ;
break ;
case 2 :
2015-04-15 17:52:49 +02:00
m_RTO = INITIAL_RTO ; // drop RTO to initial upon tunnels pair change first time
// no break here
2016-02-11 04:51:08 +01:00
case 4 :
if ( m_RoutingSession ) m_RoutingSession - > SetSharedRoutingPath ( nullptr ) ;
2015-03-21 21:26:14 +01:00
UpdateCurrentRemoteLease ( ) ; // pick another lease
2016-07-12 02:00:00 +02:00
LogPrint ( eLogWarning , " Streaming: Another remote lease has been selected for stream with rSID= " , m_RecvStreamID , " , sSID= " , m_SendStreamID ) ;
2015-03-21 21:26:14 +01:00
break ;
case 3 :
// pick another outbound tunnel
2016-02-11 04:51:08 +01:00
if ( m_RoutingSession ) m_RoutingSession - > SetSharedRoutingPath ( nullptr ) ;
2015-11-03 15:15:49 +01:00
m_CurrentOutboundTunnel = m_LocalDestination . GetOwner ( ) - > GetTunnelPool ( ) - > GetNextOutboundTunnel ( m_CurrentOutboundTunnel ) ;
2016-06-29 03:00:00 +02:00
LogPrint ( eLogWarning , " Streaming: Another outbound tunnel has been selected for stream with sSID= " , m_SendStreamID ) ;
2015-03-21 21:26:14 +01:00
break ;
default : ;
2015-01-30 01:17:44 +01:00
}
2014-08-12 22:35:35 +02:00
SendPackets ( packets ) ;
2014-08-15 03:38:46 +02:00
}
2014-08-11 00:27:23 +02:00
ScheduleResend ( ) ;
}
}
2014-10-10 17:53:27 +02:00
void Stream : : HandleAckSendTimer ( const boost : : system : : error_code & ecode )
{
2014-10-10 21:58:17 +02:00
if ( m_IsAckSendScheduled )
2014-10-10 17:53:27 +02:00
{
2015-03-23 23:07:43 +01:00
if ( m_LastReceivedSequenceNumber < 0 )
{
2016-07-12 02:00:00 +02:00
LogPrint ( eLogWarning , " Streaming: SYN has not been received after " , ACK_SEND_TIMEOUT , " milliseconds after follow on, terminate rSID= " , m_RecvStreamID , " , sSID= " , m_SendStreamID ) ;
2015-03-23 23:07:43 +01:00
m_Status = eStreamStatusReset ;
Close ( ) ;
return ;
}
2015-03-09 00:36:33 +01:00
if ( m_Status = = eStreamStatusOpen )
2016-09-07 19:25:11 +02:00
{
if ( m_RoutingSession & & m_RoutingSession - > IsLeaseSetNonConfirmed ( ) )
{
// seems something went wrong and we should re-select tunnels
m_CurrentOutboundTunnel = nullptr ;
m_CurrentRemoteLease = nullptr ;
}
2014-10-10 17:53:27 +02:00
SendQuickAck ( ) ;
2016-09-07 19:25:11 +02:00
}
2014-10-10 21:58:17 +02:00
m_IsAckSendScheduled = false ;
}
2014-10-10 17:53:27 +02:00
}
2015-04-16 17:38:36 +02:00
void Stream : : UpdateCurrentRemoteLease ( bool expired )
2014-03-23 14:25:16 +01:00
{
2016-02-10 04:42:01 +01:00
if ( ! m_RemoteLeaseSet | | m_RemoteLeaseSet - > IsExpired ( ) )
2014-08-06 17:09:06 +02:00
{
2016-02-15 00:30:07 +01:00
m_RemoteLeaseSet = m_LocalDestination . GetOwner ( ) - > FindLeaseSet ( m_RemoteIdentity - > GetIdentHash ( ) ) ;
2016-02-10 04:42:01 +01:00
if ( ! m_RemoteLeaseSet )
2016-10-23 02:08:15 +02:00
{
2016-02-10 04:42:01 +01:00
LogPrint ( eLogWarning , " Streaming: LeaseSet " , m_RemoteIdentity - > GetIdentHash ( ) . ToBase64 ( ) , " not found " ) ;
2016-10-23 02:08:15 +02:00
m_LocalDestination . GetOwner ( ) - > RequestDestination ( m_RemoteIdentity - > GetIdentHash ( ) ) ; // try to request for a next attempt
}
2014-08-06 17:09:06 +02:00
}
2014-08-01 20:54:14 +02:00
if ( m_RemoteLeaseSet )
{
2014-08-30 04:10:00 +02:00
if ( ! m_RoutingSession )
2016-01-25 04:24:39 +01:00
m_RoutingSession = m_LocalDestination . GetOwner ( ) - > GetRoutingSession ( m_RemoteLeaseSet , true ) ;
2015-03-26 15:30:29 +01:00
auto leases = m_RemoteLeaseSet - > GetNonExpiredLeases ( false ) ; // try without threshold first
if ( leases . empty ( ) )
{
2015-04-16 17:38:36 +02:00
expired = false ;
2016-02-26 22:16:59 +01:00
m_LocalDestination . GetOwner ( ) - > RequestDestination ( m_RemoteIdentity - > GetIdentHash ( ) ) ; // time to request
leases = m_RemoteLeaseSet - > GetNonExpiredLeases ( true ) ; // then with threshold
2015-03-26 15:30:29 +01:00
}
2014-08-01 20:54:14 +02:00
if ( ! leases . empty ( ) )
{
2015-04-16 17:38:36 +02:00
bool updated = false ;
2016-02-09 16:46:27 +01:00
if ( expired & & m_CurrentRemoteLease )
2015-04-16 17:38:36 +02:00
{
2016-08-09 00:53:37 +02:00
for ( const auto & it : leases )
2016-02-09 16:46:27 +01:00
if ( ( it - > tunnelGateway = = m_CurrentRemoteLease - > tunnelGateway ) & & ( it - > tunnelID ! = m_CurrentRemoteLease - > tunnelID ) )
2015-04-16 17:38:36 +02:00
{
m_CurrentRemoteLease = it ;
updated = true ;
break ;
}
}
if ( ! updated )
{
2015-11-03 15:15:49 +01:00
uint32_t i = rand ( ) % leases . size ( ) ;
2016-02-09 16:46:27 +01:00
if ( m_CurrentRemoteLease & & leases [ i ] - > tunnelID = = m_CurrentRemoteLease - > tunnelID )
2015-04-16 17:38:36 +02:00
// make sure we don't select previous
i = ( i + 1 ) % leases . size ( ) ; // if so, pick next
m_CurrentRemoteLease = leases [ i ] ;
}
2014-08-01 20:54:14 +02:00
}
else
2014-10-17 03:09:59 +02:00
{
2016-10-26 19:02:19 +02:00
LogPrint ( eLogWarning , " Streaming: All remote leases are expired " ) ;
2015-03-26 15:30:29 +01:00
m_RemoteLeaseSet = nullptr ;
2016-02-09 16:46:27 +01:00
m_CurrentRemoteLease = nullptr ;
2016-02-26 22:16:59 +01:00
// we have requested expired before, no need to do it twice
2014-10-17 03:09:59 +02:00
}
2014-08-01 20:54:14 +02:00
}
2014-03-23 14:25:16 +01:00
else
2016-10-26 19:02:19 +02:00
{
LogPrint ( eLogWarning , " Streaming: Remote LeaseSet not found " ) ;
2016-02-09 16:46:27 +01:00
m_CurrentRemoteLease = nullptr ;
2016-10-26 19:02:19 +02:00
}
2014-10-22 17:46:54 +02:00
}
2016-05-25 22:18:02 +02:00
StreamingDestination : : StreamingDestination ( std : : shared_ptr < i2p : : client : : ClientDestination > owner , uint16_t localPort , bool gzip ) :
2016-02-29 20:44:15 +01:00
m_Owner ( owner ) , m_LocalPort ( localPort ) , m_Gzip ( gzip ) ,
2016-10-28 02:46:05 +02:00
m_LastIncomingReceiveStreamID ( 0 ) ,
2016-07-28 17:16:29 +02:00
m_PendingIncomingTimer ( m_Owner - > GetService ( ) ) ,
m_ConnTrackTimer ( m_Owner - > GetService ( ) ) ,
m_ConnsPerMinute ( DEFAULT_MAX_CONNS_PER_MIN ) ,
m_LastBanClear ( i2p : : util : : GetMillisecondsSinceEpoch ( ) )
2015-11-03 15:15:49 +01:00
{
}
2014-10-23 03:36:11 +02:00
2015-11-03 15:15:49 +01:00
StreamingDestination : : ~ StreamingDestination ( )
{
2016-08-09 00:53:37 +02:00
for ( auto & it : m_SavedPackets )
2016-02-08 20:42:20 +01:00
{
for ( auto it1 : it . second ) delete it1 ;
it . second . clear ( ) ;
}
m_SavedPackets . clear ( ) ;
2015-11-03 15:15:49 +01:00
}
2014-10-22 17:46:54 +02:00
void StreamingDestination : : Start ( )
2016-07-28 17:16:29 +02:00
{
ScheduleConnTrack ( ) ;
2014-10-22 17:46:54 +02:00
}
void StreamingDestination : : Stop ( )
{
ResetAcceptor ( ) ;
2015-12-15 04:23:28 +01:00
m_PendingIncomingTimer . cancel ( ) ;
2016-07-28 17:16:29 +02:00
m_ConnTrackTimer . cancel ( ) ;
2014-10-22 17:46:54 +02:00
{
std : : unique_lock < std : : mutex > l ( m_StreamsMutex ) ;
m_Streams . clear ( ) ;
2016-07-28 17:16:29 +02:00
}
{
std : : unique_lock < std : : mutex > l ( m_ConnsMutex ) ;
m_Conns . clear ( ) ;
}
2014-10-22 17:46:54 +02:00
}
2014-12-31 04:37:14 +01:00
2014-10-22 17:46:54 +02:00
void StreamingDestination : : HandleNextPacket ( Packet * packet )
{
uint32_t sendStreamID = packet - > GetSendStreamID ( ) ;
if ( sendStreamID )
{
auto it = m_Streams . find ( sendStreamID ) ;
if ( it ! = m_Streams . end ( ) )
it - > second - > HandleNextPacket ( packet ) ;
else
{
2016-06-29 03:00:00 +02:00
LogPrint ( eLogError , " Streaming: Unknown stream sSID= " , sendStreamID ) ;
2014-10-22 17:46:54 +02:00
delete packet ;
}
}
2015-01-02 16:04:57 +01:00
else
2014-10-22 17:46:54 +02:00
{
2015-01-02 16:04:57 +01:00
if ( packet - > IsSYN ( ) & & ! packet - > GetSeqn ( ) ) // new incoming stream
{
2016-07-28 17:42:31 +02:00
uint32_t receiveStreamID = packet - > GetReceiveStreamID ( ) ;
2016-11-22 21:20:48 +01:00
if ( receiveStreamID = = m_LastIncomingReceiveStreamID )
2016-10-28 02:46:05 +02:00
{
// already pending
LogPrint ( eLogWarning , " Streaming: Incoming streaming with rSID= " , receiveStreamID , " already exists " ) ;
delete packet ; // drop it, because previous should be connected
return ;
2016-11-22 21:20:48 +01:00
}
2016-10-28 02:46:05 +02:00
auto incomingStream = CreateNewIncomingStream ( ) ;
2016-07-28 17:42:31 +02:00
incomingStream - > HandleNextPacket ( packet ) ; // SYN
2016-07-28 17:16:29 +02:00
auto ident = incomingStream - > GetRemoteIdentity ( ) ;
if ( ident )
{
auto ih = ident - > GetIdentHash ( ) ;
if ( DropNewStream ( ih ) )
{
// drop
2016-07-28 18:48:32 +02:00
LogPrint ( eLogWarning , " Streaming: Dropping connection, too many inbound streams from " , ih . ToBase32 ( ) ) ;
2016-07-28 21:33:03 +02:00
incomingStream - > Terminate ( ) ;
2016-07-28 17:16:29 +02:00
return ;
}
2016-07-28 21:33:03 +02:00
}
2016-10-28 02:46:05 +02:00
m_LastIncomingReceiveStreamID = receiveStreamID ;
2016-07-28 17:37:33 +02:00
// handle saved packets if any
{
auto it = m_SavedPackets . find ( receiveStreamID ) ;
if ( it ! = m_SavedPackets . end ( ) )
{
LogPrint ( eLogDebug , " Streaming: Processing " , it - > second . size ( ) , " saved packets for rSID= " , receiveStreamID ) ;
for ( auto it1 : it - > second )
incomingStream - > HandleNextPacket ( it1 ) ;
m_SavedPackets . erase ( it ) ;
}
}
2016-02-08 20:42:20 +01:00
// accept
2015-01-02 16:04:57 +01:00
if ( m_Acceptor ! = nullptr )
m_Acceptor ( incomingStream ) ;
else
{
2015-12-18 14:33:58 +01:00
LogPrint ( eLogWarning , " Streaming: Acceptor for incoming stream is not set " ) ;
2015-12-15 04:23:28 +01:00
if ( m_PendingIncomingStreams . size ( ) < MAX_PENDING_INCOMING_BACKLOG )
{
m_PendingIncomingStreams . push_back ( incomingStream ) ;
m_PendingIncomingTimer . cancel ( ) ;
m_PendingIncomingTimer . expires_from_now ( boost : : posix_time : : seconds ( PENDING_INCOMING_TIMEOUT ) ) ;
m_PendingIncomingTimer . async_wait ( std : : bind ( & StreamingDestination : : HandlePendingIncomingTimer ,
2016-02-08 20:42:20 +01:00
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
2016-06-29 03:00:00 +02:00
LogPrint ( eLogDebug , " Streaming: Pending incoming stream added, rSID= " , receiveStreamID ) ;
2015-12-15 04:23:28 +01:00
}
else
{
2015-12-18 14:33:58 +01:00
LogPrint ( eLogWarning , " Streaming: Pending incoming streams backlog exceeds " , MAX_PENDING_INCOMING_BACKLOG ) ;
2015-12-15 04:23:28 +01:00
incomingStream - > Close ( ) ;
}
2015-01-02 16:04:57 +01:00
}
}
else // follow on packet without SYN
{
uint32_t receiveStreamID = packet - > GetReceiveStreamID ( ) ;
2016-08-09 00:53:37 +02:00
for ( auto & it : m_Streams )
2015-01-02 16:04:57 +01:00
if ( it . second - > GetSendStreamID ( ) = = receiveStreamID )
{
// found
it . second - > HandleNextPacket ( packet ) ;
return ;
}
2016-02-08 20:42:20 +01:00
// save follow on packet
auto it = m_SavedPackets . find ( receiveStreamID ) ;
if ( it ! = m_SavedPackets . end ( ) )
it - > second . push_back ( packet ) ;
else
{
2016-02-10 01:00:00 +01:00
m_SavedPackets [ receiveStreamID ] = std : : list < Packet * > { packet } ;
2016-02-08 20:42:20 +01:00
auto timer = std : : make_shared < boost : : asio : : deadline_timer > ( m_Owner - > GetService ( ) ) ;
timer - > expires_from_now ( boost : : posix_time : : seconds ( PENDING_INCOMING_TIMEOUT ) ) ;
auto s = shared_from_this ( ) ;
timer - > async_wait ( [ s , timer , receiveStreamID ] ( const boost : : system : : error_code & ecode )
{
2016-02-08 21:47:39 +01:00
if ( ecode ! = boost : : asio : : error : : operation_aborted )
2016-02-08 20:42:20 +01:00
{
auto it = s - > m_SavedPackets . find ( receiveStreamID ) ;
if ( it ! = s - > m_SavedPackets . end ( ) )
{
for ( auto it1 : it - > second ) delete it1 ;
it - > second . clear ( ) ;
s - > m_SavedPackets . erase ( it ) ;
}
}
} ) ;
}
2015-01-02 16:04:57 +01:00
}
2014-10-22 17:46:54 +02:00
}
}
2015-12-15 04:23:28 +01:00
2015-01-27 17:27:58 +01:00
std : : shared_ptr < Stream > StreamingDestination : : CreateNewOutgoingStream ( std : : shared_ptr < const i2p : : data : : LeaseSet > remote , int port )
2014-10-22 17:46:54 +02:00
{
2015-11-03 15:15:49 +01:00
auto s = std : : make_shared < Stream > ( m_Owner - > GetService ( ) , * this , remote , port ) ;
2014-10-22 17:46:54 +02:00
std : : unique_lock < std : : mutex > l ( m_StreamsMutex ) ;
m_Streams [ s - > GetRecvStreamID ( ) ] = s ;
return s ;
}
2014-11-23 17:33:58 +01:00
std : : shared_ptr < Stream > StreamingDestination : : CreateNewIncomingStream ( )
2014-10-22 17:46:54 +02:00
{
2015-11-03 15:15:49 +01:00
auto s = std : : make_shared < Stream > ( m_Owner - > GetService ( ) , * this ) ;
2014-10-22 17:46:54 +02:00
std : : unique_lock < std : : mutex > l ( m_StreamsMutex ) ;
m_Streams [ s - > GetRecvStreamID ( ) ] = s ;
return s ;
}
2014-11-23 17:33:58 +01:00
void StreamingDestination : : DeleteStream ( std : : shared_ptr < Stream > stream )
2014-10-22 17:46:54 +02:00
{
if ( stream )
{
std : : unique_lock < std : : mutex > l ( m_StreamsMutex ) ;
auto it = m_Streams . find ( stream - > GetRecvStreamID ( ) ) ;
if ( it ! = m_Streams . end ( ) )
m_Streams . erase ( it ) ;
}
2014-08-26 20:56:00 +02:00
}
2014-10-22 20:01:23 +02:00
2015-12-15 04:23:28 +01:00
void StreamingDestination : : SetAcceptor ( const Acceptor & acceptor )
{
m_Owner - > GetService ( ) . post ( [ acceptor , this ] ( void )
{
m_Acceptor = acceptor ;
2016-08-09 00:53:37 +02:00
for ( auto & it : m_PendingIncomingStreams )
2015-12-15 04:23:28 +01:00
if ( it - > GetStatus ( ) = = eStreamStatusOpen ) // still open?
m_Acceptor ( it ) ;
m_PendingIncomingStreams . clear ( ) ;
m_PendingIncomingTimer . cancel ( ) ;
} ) ;
}
void StreamingDestination : : ResetAcceptor ( )
2015-12-15 04:36:23 +01:00
{
if ( m_Acceptor ) m_Acceptor ( nullptr ) ;
m_Acceptor = nullptr ;
2015-12-15 04:23:28 +01:00
}
2016-12-23 16:09:40 +01:00
void StreamingDestination : : AcceptOnce ( const Acceptor & acceptor )
{
m_Owner - > GetService ( ) . post ( [ acceptor , this ] ( void )
{
if ( ! m_PendingIncomingStreams . empty ( ) )
{
acceptor ( m_PendingIncomingStreams . front ( ) ) ;
m_PendingIncomingStreams . pop_front ( ) ;
if ( m_PendingIncomingStreams . empty ( ) )
m_PendingIncomingTimer . cancel ( ) ;
}
else // we must save old acceptor and set it back
{
auto oldAcceptor = m_Acceptor ;
m_Acceptor = [ acceptor , oldAcceptor , this ] ( std : : shared_ptr < Stream > stream )
{
acceptor ( stream ) ;
m_Acceptor = oldAcceptor ;
} ;
}
} ) ;
}
2015-12-15 04:23:28 +01:00
void StreamingDestination : : HandlePendingIncomingTimer ( const boost : : system : : error_code & ecode )
{
if ( ecode ! = boost : : asio : : error : : operation_aborted )
{
2015-12-18 14:33:58 +01:00
LogPrint ( eLogWarning , " Streaming: Pending incoming timeout expired " ) ;
2016-08-09 00:53:37 +02:00
for ( auto & it : m_PendingIncomingStreams )
2015-12-15 04:23:28 +01:00
it - > Close ( ) ;
m_PendingIncomingStreams . clear ( ) ;
}
}
2014-10-22 21:01:30 +02:00
void StreamingDestination : : HandleDataMessagePayload ( const uint8_t * buf , size_t len )
{
// unzip it
Packet * uncompressed = new Packet ;
uncompressed - > offset = 0 ;
2015-11-03 15:15:49 +01:00
uncompressed - > len = m_Inflator . Inflate ( buf , len , uncompressed - > buf , MAX_PACKET_SIZE ) ;
if ( uncompressed - > len )
2014-10-22 21:01:30 +02:00
HandleNextPacket ( uncompressed ) ;
else
delete uncompressed ;
}
2016-02-29 20:44:15 +01:00
std : : shared_ptr < I2NPMessage > StreamingDestination : : CreateDataMessage ( const uint8_t * payload , size_t len , uint16_t toPort )
{
auto msg = NewI2NPShortMessage ( ) ;
if ( ! m_Gzip | | len < = i2p : : stream : : COMPRESSION_THRESHOLD_SIZE )
m_Deflator . SetCompressionLevel ( Z_NO_COMPRESSION ) ;
else
m_Deflator . SetCompressionLevel ( Z_DEFAULT_COMPRESSION ) ;
uint8_t * buf = msg - > GetPayload ( ) ;
buf + = 4 ; // reserve for lengthlength
msg - > len + = 4 ;
size_t size = m_Deflator . Deflate ( payload , len , buf , msg - > maxLen - msg - > len ) ;
if ( size )
{
htobe32buf ( msg - > GetPayload ( ) , size ) ; // length
htobe16buf ( buf + 4 , m_LocalPort ) ; // source port
htobe16buf ( buf + 6 , toPort ) ; // destination port
buf [ 9 ] = i2p : : client : : PROTOCOL_TYPE_STREAMING ; // streaming protocol
msg - > len + = size ;
msg - > FillI2NPMessageHeader ( eI2NPData ) ;
}
else
msg = nullptr ;
return msg ;
2016-07-28 17:16:29 +02:00
}
void StreamingDestination : : SetMaxConnsPerMinute ( const uint32_t conns )
{
m_ConnsPerMinute = conns ;
2016-07-28 18:17:24 +02:00
LogPrint ( eLogDebug , " Streaming: Set max conns per minute per destination to " , conns ) ;
2016-07-28 17:16:29 +02:00
}
bool StreamingDestination : : DropNewStream ( const i2p : : data : : IdentHash & ih )
{
std : : lock_guard < std : : mutex > lock ( m_ConnsMutex ) ;
if ( m_Banned . size ( ) > MAX_BANNED_CONNS ) return true ; // overload
2016-07-28 18:17:24 +02:00
auto end = std : : end ( m_Banned ) ;
if ( std : : find ( std : : begin ( m_Banned ) , end , ih ) ! = end ) return true ; // already banned
2016-07-28 17:16:29 +02:00
auto itr = m_Conns . find ( ih ) ;
if ( itr = = m_Conns . end ( ) )
m_Conns [ ih ] = 0 ;
2016-07-28 18:17:24 +02:00
m_Conns [ ih ] + = 1 ;
2016-07-28 17:16:29 +02:00
2016-07-28 17:20:24 +02:00
bool ban = m_Conns [ ih ] > = m_ConnsPerMinute ;
2016-07-28 17:16:29 +02:00
if ( ban )
{
m_Banned . push_back ( ih ) ;
m_Conns . erase ( ih ) ;
2016-07-28 18:17:24 +02:00
LogPrint ( eLogWarning , " Streaming: ban " , ih . ToBase32 ( ) ) ;
2016-07-28 17:16:29 +02:00
}
return ban ;
}
void StreamingDestination : : HandleConnTrack ( const boost : : system : : error_code & ecode )
{
if ( ecode ! = boost : : asio : : error : : operation_aborted )
{
{ // acquire lock
std : : lock_guard < std : : mutex > lock ( m_ConnsMutex ) ;
// clear conn tracking
m_Conns . clear ( ) ;
// check for ban clear
auto ts = i2p : : util : : GetMillisecondsSinceEpoch ( ) ;
if ( ts - m_LastBanClear > = DEFAULT_BAN_INTERVAL )
{
// clear bans
m_Banned . clear ( ) ;
m_LastBanClear = ts ;
}
}
// reschedule timer
ScheduleConnTrack ( ) ;
}
}
void StreamingDestination : : ScheduleConnTrack ( )
{
m_ConnTrackTimer . expires_from_now ( boost : : posix_time : : seconds ( 60 ) ) ;
m_ConnTrackTimer . async_wait (
std : : bind ( & StreamingDestination : : HandleConnTrack ,
shared_from_this ( ) , std : : placeholders : : _1 ) ) ;
}
2013-12-13 03:36:24 +01:00
}
}