shared pointers for streams

This commit is contained in:
orignal 2014-11-23 11:33:58 -05:00
parent 4dc33a6f45
commit 4bd8b44ab2
14 changed files with 46 additions and 48 deletions

View file

@ -274,7 +274,7 @@ namespace client
} }
} }
i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote, int port) std::shared_ptr<i2p::stream::Stream> ClientDestination::CreateStream (const i2p::data::LeaseSet& remote, int port)
{ {
if (m_StreamingDestination) if (m_StreamingDestination)
return m_StreamingDestination->CreateNewOutgoingStream (remote, port); return m_StreamingDestination->CreateNewOutgoingStream (remote, port);

View file

@ -3,6 +3,7 @@
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <memory>
#include "Identity.h" #include "Identity.h"
#include "TunnelPool.h" #include "TunnelPool.h"
#include "CryptoConst.h" #include "CryptoConst.h"
@ -41,7 +42,7 @@ namespace client
// streaming // streaming
i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; }; i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; };
i2p::stream::Stream * CreateStream (const i2p::data::LeaseSet& remote, int port = 0); std::shared_ptr<i2p::stream::Stream> CreateStream (const i2p::data::LeaseSet& remote, int port = 0);
void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor); void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor);
void StopAcceptingStreams (); void StopAcceptingStreams ();
bool IsAcceptingStreams () const; bool IsAcceptingStreams () const;

View file

@ -518,7 +518,7 @@ namespace util
{ {
m_Stream->Close (); m_Stream->Close ();
i2p::stream::DeleteStream (m_Stream); i2p::stream::DeleteStream (m_Stream);
m_Stream = nullptr; m_Stream.reset ();
} }
m_Socket->close (); m_Socket->close ();
//delete this; //delete this;

View file

@ -3,6 +3,7 @@
#include <sstream> #include <sstream>
#include <thread> #include <thread>
#include <memory>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/array.hpp> #include <boost/array.hpp>
#include "LeaseSet.h" #include "LeaseSet.h"
@ -79,7 +80,7 @@ namespace util
boost::asio::ip::tcp::socket * m_Socket; boost::asio::ip::tcp::socket * m_Socket;
boost::asio::deadline_timer m_Timer; boost::asio::deadline_timer m_Timer;
i2p::stream::Stream * m_Stream; std::shared_ptr<i2p::stream::Stream> m_Stream;
char m_Buffer[HTTP_CONNECTION_BUFFER_SIZE + 1], m_StreamBuffer[HTTP_CONNECTION_BUFFER_SIZE + 1]; char m_Buffer[HTTP_CONNECTION_BUFFER_SIZE + 1], m_StreamBuffer[HTTP_CONNECTION_BUFFER_SIZE + 1];
size_t m_BufferLen; size_t m_BufferLen;
request m_Request; request m_Request;

View file

@ -20,7 +20,7 @@ namespace client
Receive (); Receive ();
} }
I2PTunnelConnection::I2PTunnelConnection (I2PTunnel * owner, i2p::stream::Stream * stream, I2PTunnelConnection::I2PTunnelConnection (I2PTunnel * owner, std::shared_ptr<i2p::stream::Stream> stream,
boost::asio::ip::tcp::socket * socket, const boost::asio::ip::tcp::endpoint& target): boost::asio::ip::tcp::socket * socket, const boost::asio::ip::tcp::endpoint& target):
m_Socket (socket), m_Stream (stream), m_Owner (owner) m_Socket (socket), m_Stream (stream), m_Owner (owner)
{ {
@ -40,7 +40,7 @@ namespace client
{ {
m_Stream->Close (); m_Stream->Close ();
i2p::stream::DeleteStream (m_Stream); i2p::stream::DeleteStream (m_Stream);
m_Stream = nullptr; m_Stream.reset ();
} }
m_Socket->close (); m_Socket->close ();
if (m_Owner) if (m_Owner)
@ -275,7 +275,7 @@ namespace client
LogPrint ("Local destination not set for server tunnel"); LogPrint ("Local destination not set for server tunnel");
} }
void I2PServerTunnel::HandleAccept (i2p::stream::Stream * stream) void I2PServerTunnel::HandleAccept (std::shared_ptr<i2p::stream::Stream> stream)
{ {
if (stream) if (stream)
new I2PTunnelConnection (this, stream, new boost::asio::ip::tcp::socket (GetService ()), m_Endpoint); new I2PTunnelConnection (this, stream, new boost::asio::ip::tcp::socket (GetService ()), m_Endpoint);

View file

@ -4,6 +4,7 @@
#include <inttypes.h> #include <inttypes.h>
#include <string> #include <string>
#include <set> #include <set>
#include <memory>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include "Identity.h" #include "Identity.h"
#include "Destination.h" #include "Destination.h"
@ -24,7 +25,7 @@ namespace client
I2PTunnelConnection (I2PTunnel * owner, boost::asio::ip::tcp::socket * socket, I2PTunnelConnection (I2PTunnel * owner, boost::asio::ip::tcp::socket * socket,
const i2p::data::LeaseSet * leaseSet); const i2p::data::LeaseSet * leaseSet);
I2PTunnelConnection (I2PTunnel * owner, i2p::stream::Stream * stream, boost::asio::ip::tcp::socket * socket, I2PTunnelConnection (I2PTunnel * owner, std::shared_ptr<i2p::stream::Stream> stream, boost::asio::ip::tcp::socket * socket,
const boost::asio::ip::tcp::endpoint& target); const boost::asio::ip::tcp::endpoint& target);
~I2PTunnelConnection (); ~I2PTunnelConnection ();
@ -44,7 +45,7 @@ namespace client
uint8_t m_Buffer[I2P_TUNNEL_CONNECTION_BUFFER_SIZE], m_StreamBuffer[I2P_TUNNEL_CONNECTION_BUFFER_SIZE]; uint8_t m_Buffer[I2P_TUNNEL_CONNECTION_BUFFER_SIZE], m_StreamBuffer[I2P_TUNNEL_CONNECTION_BUFFER_SIZE];
boost::asio::ip::tcp::socket * m_Socket; boost::asio::ip::tcp::socket * m_Socket;
i2p::stream::Stream * m_Stream; std::shared_ptr<i2p::stream::Stream> m_Stream;
I2PTunnel * m_Owner; I2PTunnel * m_Owner;
}; };
@ -111,7 +112,7 @@ namespace client
private: private:
void Accept (); void Accept ();
void HandleAccept (i2p::stream::Stream * stream); void HandleAccept (std::shared_ptr<i2p::stream::Stream> stream);
private: private:

View file

@ -34,7 +34,7 @@ namespace client
{ {
m_Stream->Close (); m_Stream->Close ();
i2p::stream::DeleteStream (m_Stream); i2p::stream::DeleteStream (m_Stream);
m_Stream = nullptr; m_Stream.reset ();
} }
} }
@ -531,7 +531,7 @@ namespace client
I2PReceive (); I2PReceive ();
} }
void SAMSocket::HandleI2PAccept (i2p::stream::Stream * stream) void SAMSocket::HandleI2PAccept (std::shared_ptr<i2p::stream::Stream> stream)
{ {
if (stream) if (stream)
{ {

4
SAM.h
View file

@ -90,7 +90,7 @@ namespace client
void I2PReceive (); void I2PReceive ();
void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleI2PAccept (i2p::stream::Stream * stream); void HandleI2PAccept (std::shared_ptr<i2p::stream::Stream> stream);
void HandleWriteI2PData (const boost::system::error_code& ecode); void HandleWriteI2PData (const boost::system::error_code& ecode);
void HandleI2PDatagramReceive (const i2p::data::IdentityEx& ident, const uint8_t * buf, size_t len); void HandleI2PDatagramReceive (const i2p::data::IdentityEx& ident, const uint8_t * buf, size_t len);
@ -118,7 +118,7 @@ namespace client
SAMSocketType m_SocketType; SAMSocketType m_SocketType;
std::string m_ID; // nickname std::string m_ID; // nickname
bool m_IsSilent; bool m_IsSilent;
i2p::stream::Stream * m_Stream; std::shared_ptr<i2p::stream::Stream> m_Stream;
SAMSession * m_Session; SAMSession * m_Session;
}; };

View file

@ -77,8 +77,7 @@ namespace proxy
{ {
if (m_stream) { if (m_stream) {
LogPrint("--- socks4a close stream"); LogPrint("--- socks4a close stream");
delete m_stream; m_stream.reset ();
m_stream = nullptr;
} }
} }

View file

@ -5,7 +5,7 @@
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <vector> #include <vector>
#include <mutex> #include <mutex>
#include <memory>
#include "Identity.h" #include "Identity.h"
#include "Streaming.h" #include "Streaming.h"
@ -47,7 +47,7 @@ namespace proxy
boost::asio::io_service * m_ios; boost::asio::io_service * m_ios;
boost::asio::ip::tcp::socket * m_sock; boost::asio::ip::tcp::socket * m_sock;
boost::asio::deadline_timer m_ls_timer; boost::asio::deadline_timer m_ls_timer;
i2p::stream::Stream * m_stream; std::shared_ptr<i2p::stream::Stream> m_stream;
i2p::data::LeaseSet * m_ls; i2p::data::LeaseSet * m_ls;
i2p::data::IdentHash m_dest; i2p::data::IdentHash m_dest;
state m_state; state m_state;

View file

@ -103,7 +103,7 @@ namespace stream
m_IsAckSendScheduled = true; m_IsAckSendScheduled = true;
m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(ACK_SEND_TIMEOUT)); m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(ACK_SEND_TIMEOUT));
m_AckSendTimer.async_wait (boost::bind (&Stream::HandleAckSendTimer, m_AckSendTimer.async_wait (boost::bind (&Stream::HandleAckSendTimer,
this, boost::asio::placeholders::error)); shared_from_this (), boost::asio::placeholders::error));
} }
} }
else if (isSyn) else if (isSyn)
@ -461,7 +461,7 @@ namespace stream
m_ResendTimer.cancel (); m_ResendTimer.cancel ();
m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_TIMEOUT)); m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_TIMEOUT));
m_ResendTimer.async_wait (boost::bind (&Stream::HandleResendTimer, m_ResendTimer.async_wait (boost::bind (&Stream::HandleResendTimer,
this, boost::asio::placeholders::error)); shared_from_this (), boost::asio::placeholders::error));
} }
void Stream::HandleResendTimer (const boost::system::error_code& ecode) void Stream::HandleResendTimer (const boost::system::error_code& ecode)
@ -563,8 +563,6 @@ namespace stream
ResetAcceptor (); ResetAcceptor ();
{ {
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
for (auto it: m_Streams)
delete it.second;
m_Streams.clear (); m_Streams.clear ();
} }
} }
@ -597,36 +595,30 @@ namespace stream
} }
} }
Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port) std::shared_ptr<Stream> StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port)
{ {
Stream * s = new Stream (*m_Owner.GetService (), *this, remote, port); auto s = std::make_shared<Stream> (*m_Owner.GetService (), *this, remote, port);
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams[s->GetRecvStreamID ()] = s;
return s; return s;
} }
Stream * StreamingDestination::CreateNewIncomingStream () std::shared_ptr<Stream> StreamingDestination::CreateNewIncomingStream ()
{ {
Stream * s = new Stream (*m_Owner.GetService (), *this); auto s = std::make_shared<Stream> (*m_Owner.GetService (), *this);
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams[s->GetRecvStreamID ()] = s;
return s; return s;
} }
void StreamingDestination::DeleteStream (Stream * stream) void StreamingDestination::DeleteStream (std::shared_ptr<Stream> stream)
{ {
if (stream) if (stream)
{ {
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
auto it = m_Streams.find (stream->GetRecvStreamID ()); auto it = m_Streams.find (stream->GetRecvStreamID ());
if (it != m_Streams.end ()) if (it != m_Streams.end ())
{
m_Streams.erase (it); m_Streams.erase (it);
if (m_Owner.GetService ())
m_Owner.GetService ()->post ([stream](void) { delete stream; });
else
delete stream;
}
} }
} }
@ -651,7 +643,7 @@ namespace stream
} }
} }
void DeleteStream (Stream * stream) void DeleteStream (std::shared_ptr<Stream> stream)
{ {
if (stream) if (stream)
stream->GetLocalDestination ().DeleteStream (stream); stream->GetLocalDestination ().DeleteStream (stream);

View file

@ -7,6 +7,7 @@
#include <set> #include <set>
#include <queue> #include <queue>
#include <functional> #include <functional>
#include <memory>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include "I2PEndian.h" #include "I2PEndian.h"
@ -78,7 +79,7 @@ namespace stream
}; };
class StreamingDestination; class StreamingDestination;
class Stream class Stream: public std::enable_shared_from_this<Stream>
{ {
public: public:
@ -153,7 +154,7 @@ namespace stream
{ {
public: public:
typedef std::function<void (Stream *)> Acceptor; typedef std::function<void (std::shared_ptr<Stream>)> Acceptor;
StreamingDestination (i2p::client::ClientDestination& owner): m_Owner (owner) {}; StreamingDestination (i2p::client::ClientDestination& owner): m_Owner (owner) {};
~StreamingDestination () {}; ~StreamingDestination () {};
@ -161,8 +162,8 @@ namespace stream
void Start (); void Start ();
void Stop (); void Stop ();
Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port = 0); std::shared_ptr<Stream> CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port = 0);
void DeleteStream (Stream * stream); void DeleteStream (std::shared_ptr<Stream> stream);
void SetAcceptor (const Acceptor& acceptor) { m_Acceptor = acceptor; }; void SetAcceptor (const Acceptor& acceptor) { m_Acceptor = acceptor; };
void ResetAcceptor () { m_Acceptor = nullptr; }; void ResetAcceptor () { m_Acceptor = nullptr; };
bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
@ -173,13 +174,13 @@ namespace stream
private: private:
void HandleNextPacket (Packet * packet); void HandleNextPacket (Packet * packet);
Stream * CreateNewIncomingStream (); std::shared_ptr<Stream> CreateNewIncomingStream ();
private: private:
i2p::client::ClientDestination& m_Owner; i2p::client::ClientDestination& m_Owner;
std::mutex m_StreamsMutex; std::mutex m_StreamsMutex;
std::map<uint32_t, Stream *> m_Streams; std::map<uint32_t, std::shared_ptr<Stream> > m_Streams;
Acceptor m_Acceptor; Acceptor m_Acceptor;
public: public:
@ -188,7 +189,7 @@ namespace stream
const decltype(m_Streams)& GetStreams () const { return m_Streams; }; const decltype(m_Streams)& GetStreams () const { return m_Streams; };
}; };
void DeleteStream (Stream * stream); void DeleteStream (std::shared_ptr<Stream> stream);
//------------------------------------------------- //-------------------------------------------------
@ -197,15 +198,17 @@ namespace stream
{ {
if (!m_ReceiveQueue.empty ()) if (!m_ReceiveQueue.empty ())
{ {
m_Service.post ([=](void) { this->HandleReceiveTimer ( auto s = shared_from_this();
m_Service.post ([=](void) { s->HandleReceiveTimer (
boost::asio::error::make_error_code (boost::asio::error::operation_aborted), boost::asio::error::make_error_code (boost::asio::error::operation_aborted),
buffer, handler); }); buffer, handler); });
} }
else else
{ {
m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(timeout)); m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(timeout));
auto s = shared_from_this();
m_ReceiveTimer.async_wait ([=](const boost::system::error_code& ecode) m_ReceiveTimer.async_wait ([=](const boost::system::error_code& ecode)
{ this->HandleReceiveTimer (ecode, buffer, handler); }); { s->HandleReceiveTimer (ecode, buffer, handler); });
} }
} }

View file

@ -73,7 +73,7 @@ namespace api
i2p::data::netdb.RequestDestination (remote, true, dest->GetTunnelPool ()); i2p::data::netdb.RequestDestination (remote, true, dest->GetTunnelPool ());
} }
i2p::stream::Stream * CreateStream (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote) std::shared_ptr<i2p::stream::Stream> CreateStream (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote)
{ {
auto leaseSet = i2p::data::netdb.FindLeaseSet (remote); auto leaseSet = i2p::data::netdb.FindLeaseSet (remote);
if (leaseSet) if (leaseSet)
@ -95,7 +95,7 @@ namespace api
dest->AcceptStreams (acceptor); dest->AcceptStreams (acceptor);
} }
void DestroyStream (i2p::stream::Stream * stream) void DestroyStream (std::shared_ptr<i2p::stream::Stream> stream)
{ {
if (stream) if (stream)
{ {

View file

@ -1,6 +1,7 @@
#ifndef API_H__ #ifndef API_H__
#define API_H__ #define API_H__
#include <memory>
#include "Identity.h" #include "Identity.h"
#include "Destination.h" #include "Destination.h"
#include "Streaming.h" #include "Streaming.h"
@ -16,14 +17,14 @@ namespace api
// destinations // destinations
i2p::client::ClientDestination * CreateLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true); i2p::client::ClientDestination * CreateLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true);
i2p::client::ClientDestination * CreateLocalDestination (bool isPublic = false, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_DSA_SHA1); // transient destinations usually not published i2p::client::ClientDestination * CreateLocalDestination (bool isPublic = false, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256); // transient destinations usually not published
void DestoroyLocalDestination (i2p::client::ClientDestination * dest); void DestoroyLocalDestination (i2p::client::ClientDestination * dest);
// streams // streams
void RequestLeaseSet (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote); void RequestLeaseSet (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote);
i2p::stream::Stream * CreateStream (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote); std::shared_ptr<i2p::stream::Stream> CreateStream (i2p::client::ClientDestination * dest, const i2p::data::IdentHash& remote);
void AcceptStream (i2p::client::ClientDestination * dest, const i2p::stream::StreamingDestination::Acceptor& acceptor); void AcceptStream (i2p::client::ClientDestination * dest, const i2p::stream::StreamingDestination::Acceptor& acceptor);
void DestroyStream (i2p::stream::Stream * stream); void DestroyStream (std::shared_ptr<i2p::stream::Stream> stream);
} }
} }