mirror of
https://github.com/PurpleI2P/i2pd.git
synced 2025-01-22 21:37:17 +01:00
ping/pong for streaming
This commit is contained in:
parent
ac5a4fe70f
commit
fd9229c467
|
@ -851,6 +851,7 @@ namespace client
|
|||
bool isPublic, const std::map<std::string, std::string> * params):
|
||||
LeaseSetDestination (service, isPublic, params),
|
||||
m_Keys (keys), m_StreamingAckDelay (DEFAULT_INITIAL_ACK_DELAY),
|
||||
m_IsStreamingAnswerPings (DEFAULT_ANSWER_PINGS),
|
||||
m_DatagramDestination (nullptr), m_RefCounter (0),
|
||||
m_ReadyChecker(service)
|
||||
{
|
||||
|
@ -918,6 +919,9 @@ namespace client
|
|||
auto it = params->find (I2CP_PARAM_STREAMING_INITIAL_ACK_DELAY);
|
||||
if (it != params->end ())
|
||||
m_StreamingAckDelay = std::stoi(it->second);
|
||||
it = params->find (I2CP_PARAM_STREAMING_ANSWER_PINGS);
|
||||
if (it != params->end ())
|
||||
m_IsStreamingAnswerPings = (it->second == "true");
|
||||
|
||||
if (GetLeaseSetType () == i2p::data::NETDB_STORE_TYPE_ENCRYPTED_LEASESET2)
|
||||
{
|
||||
|
|
|
@ -78,6 +78,8 @@ namespace client
|
|||
// streaming
|
||||
const char I2CP_PARAM_STREAMING_INITIAL_ACK_DELAY[] = "i2p.streaming.initialAckDelay";
|
||||
const int DEFAULT_INITIAL_ACK_DELAY = 200; // milliseconds
|
||||
const char I2CP_PARAM_STREAMING_ANSWER_PINGS[] = "i2p.streaming.answerPings";
|
||||
const int DEFAULT_ANSWER_PINGS = true;
|
||||
|
||||
typedef std::function<void (std::shared_ptr<i2p::stream::Stream> stream)> StreamRequestComplete;
|
||||
|
||||
|
@ -242,6 +244,7 @@ namespace client
|
|||
bool IsAcceptingStreams () const;
|
||||
void AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor);
|
||||
int GetStreamingAckDelay () const { return m_StreamingAckDelay; }
|
||||
bool IsStreamingAnswerPings () const { return m_IsStreamingAnswerPings; }
|
||||
|
||||
// datagram
|
||||
i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; };
|
||||
|
@ -275,6 +278,7 @@ namespace client
|
|||
std::unique_ptr<EncryptionKey> m_ECIESx25519EncryptionKey;
|
||||
|
||||
int m_StreamingAckDelay;
|
||||
bool m_IsStreamingAnswerPings;
|
||||
std::shared_ptr<i2p::stream::StreamingDestination> m_StreamingDestination; // default
|
||||
std::map<uint16_t, std::shared_ptr<i2p::stream::StreamingDestination> > m_StreamingDestinationsByPorts;
|
||||
i2p::datagram::DatagramDestination * m_DatagramDestination;
|
||||
|
|
|
@ -351,6 +351,28 @@ namespace stream
|
|||
return true;
|
||||
}
|
||||
|
||||
void Stream::HandlePing (Packet * packet)
|
||||
{
|
||||
uint16_t flags = packet->GetFlags ();
|
||||
if (ProcessOptions (flags, packet) && m_RemoteIdentity)
|
||||
{
|
||||
// send pong
|
||||
Packet p;
|
||||
memset (p.buf, 0, 22); // minimal header all zeroes
|
||||
memcpy (p.buf + 4, packet->buf, 4); // but receiveStreamID is the sendStreamID from the ping
|
||||
htobe16buf (p.buf + 18, PACKET_FLAG_ECHO); // and echo flag
|
||||
ssize_t payloadLen = packet->len - (packet->GetPayload () - packet->buf);
|
||||
if (payloadLen > 0)
|
||||
memcpy (p.buf + 22, packet->GetPayload (), payloadLen);
|
||||
else
|
||||
payloadLen = 0;
|
||||
p.len = payloadLen + 22;
|
||||
SendPackets (std::vector<Packet *> { &p });
|
||||
LogPrint (eLogDebug, "Streaming: Pong of ", p.len, " bytes sent");
|
||||
}
|
||||
m_LocalDestination.DeletePacket (packet);
|
||||
}
|
||||
|
||||
void Stream::ProcessAck (Packet * packet)
|
||||
{
|
||||
bool acknowledged = false;
|
||||
|
@ -609,6 +631,7 @@ namespace stream
|
|||
packet[size] = 0;
|
||||
size++; // NACK count
|
||||
}
|
||||
packet[size] = 0;
|
||||
size++; // resend delay
|
||||
htobuf16 (packet + size, 0); // no flags set
|
||||
size += 2; // flags
|
||||
|
@ -666,6 +689,7 @@ namespace stream
|
|||
size += 4; // ack Through
|
||||
packet[size] = 0;
|
||||
size++; // NACK count
|
||||
packet[size] = 0;
|
||||
size++; // resend delay
|
||||
htobe16buf (packet + size, PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED);
|
||||
size += 2; // flags
|
||||
|
@ -1016,6 +1040,13 @@ namespace stream
|
|||
auto it = m_Streams.find (sendStreamID);
|
||||
if (it != m_Streams.end ())
|
||||
it->second->HandleNextPacket (packet);
|
||||
else if (packet->IsEcho () && m_Owner->IsStreamingAnswerPings ())
|
||||
{
|
||||
// ping
|
||||
LogPrint (eLogInfo, "Streaming: Ping received sSID=", sendStreamID);
|
||||
auto s = std::make_shared<Stream> (m_Owner->GetService (), *this);
|
||||
s->HandlePing (packet);
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint (eLogInfo, "Streaming: Unknown stream sSID=", sendStreamID);
|
||||
|
|
|
@ -87,6 +87,7 @@ namespace stream
|
|||
|
||||
bool IsSYN () const { return GetFlags () & PACKET_FLAG_SYNCHRONIZE; };
|
||||
bool IsNoAck () const { return GetFlags () & PACKET_FLAG_NO_ACK; };
|
||||
bool IsEcho () const { return GetFlags () & PACKET_FLAG_ECHO; };
|
||||
};
|
||||
|
||||
struct PacketCmp
|
||||
|
@ -168,6 +169,7 @@ namespace stream
|
|||
StreamingDestination& GetLocalDestination () { return m_LocalDestination; };
|
||||
|
||||
void HandleNextPacket (Packet * packet);
|
||||
void HandlePing (Packet * packet);
|
||||
size_t Send (const uint8_t * buf, size_t len);
|
||||
void AsyncSend (const uint8_t * buf, size_t len, SendHandler handler);
|
||||
|
||||
|
|
|
@ -453,6 +453,7 @@ namespace client
|
|||
options[I2CP_PARAM_MIN_TUNNEL_LATENCY] = GetI2CPOption(section, I2CP_PARAM_MIN_TUNNEL_LATENCY, DEFAULT_MIN_TUNNEL_LATENCY);
|
||||
options[I2CP_PARAM_MAX_TUNNEL_LATENCY] = GetI2CPOption(section, I2CP_PARAM_MAX_TUNNEL_LATENCY, DEFAULT_MAX_TUNNEL_LATENCY);
|
||||
options[I2CP_PARAM_STREAMING_INITIAL_ACK_DELAY] = GetI2CPOption(section, I2CP_PARAM_STREAMING_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY);
|
||||
options[I2CP_PARAM_STREAMING_ANSWER_PINGS] = GetI2CPOption(section, I2CP_PARAM_STREAMING_ANSWER_PINGS, DEFAULT_ANSWER_PINGS);
|
||||
options[I2CP_PARAM_LEASESET_TYPE] = GetI2CPOption(section, I2CP_PARAM_LEASESET_TYPE, DEFAULT_LEASESET_TYPE);
|
||||
std::string encType = GetI2CPStringOption(section, I2CP_PARAM_LEASESET_ENCRYPTION_TYPE, "");
|
||||
if (encType.length () > 0) options[I2CP_PARAM_LEASESET_ENCRYPTION_TYPE] = encType;
|
||||
|
|
Loading…
Reference in a new issue