This commit is contained in:
Jeff Becker 2016-11-01 10:26:40 -04:00
parent cd9cd84c5b
commit d4a0076aba
No known key found for this signature in database
GPG key ID: AB950234D6EA286B
13 changed files with 232 additions and 234 deletions

View file

@ -42,7 +42,7 @@ namespace i2p
std::unique_ptr<i2p::client::I2PControlService> m_I2PControlService; std::unique_ptr<i2p::client::I2PControlService> m_I2PControlService;
std::unique_ptr<i2p::transport::UPnP> UPnP; std::unique_ptr<i2p::transport::UPnP> UPnP;
#ifdef WITH_EVENTS #ifdef WITH_EVENTS
std::unique_ptr<i2p::event::WebsocketServer> m_WebsocketServer; std::unique_ptr<i2p::event::WebsocketServer> m_WebsocketServer;
#endif #endif
}; };
@ -298,23 +298,25 @@ namespace i2p
} }
#ifdef WITH_EVENTS #ifdef WITH_EVENTS
bool websocket; i2p::config::GetOption("websockets.enabled", websocket); bool websocket; i2p::config::GetOption("websockets.enabled", websocket);
if(websocket) { if(websocket) {
std::string websocketAddr; i2p::config::GetOption("websockets.address", websocketAddr); std::string websocketAddr; i2p::config::GetOption("websockets.address", websocketAddr);
uint16_t websocketPort; i2p::config::GetOption("websockets.port", websocketPort); uint16_t websocketPort; i2p::config::GetOption("websockets.port", websocketPort);
LogPrint(eLogInfo, "Daemon: starting Websocket server at ", websocketAddr, ":", websocketPort); LogPrint(eLogInfo, "Daemon: starting Websocket server at ", websocketAddr, ":", websocketPort);
d.m_WebsocketServer = std::unique_ptr<i2p::event::WebsocketServer>(new i2p::event::WebsocketServer (websocketAddr, websocketPort)); d.m_WebsocketServer = std::unique_ptr<i2p::event::WebsocketServer>(new i2p::event::WebsocketServer (websocketAddr, websocketPort));
d.m_WebsocketServer->Start(); d.m_WebsocketServer->Start();
i2p::event::core.SetListener(d.m_WebsocketServer->ToListener()); i2p::event::core.SetListener(d.m_WebsocketServer->ToListener());
} }
#endif #endif
return true; return true;
} }
bool Daemon_Singleton::stop() bool Daemon_Singleton::stop()
{ {
i2p::event::core.SetListener(nullptr); #ifdef WITH_EVENTS
i2p::event::core.SetListener(nullptr);
#endif
LogPrint(eLogInfo, "Daemon: shutting down"); LogPrint(eLogInfo, "Daemon: shutting down");
LogPrint(eLogInfo, "Daemon: stopping Client"); LogPrint(eLogInfo, "Daemon: stopping Client");
i2p::client::context.Stop(); i2p::client::context.Stop();
@ -342,15 +344,15 @@ namespace i2p
d.m_I2PControlService = nullptr; d.m_I2PControlService = nullptr;
} }
#ifdef WITH_EVENTS #ifdef WITH_EVENTS
if (d.m_WebsocketServer) { if (d.m_WebsocketServer) {
LogPrint(eLogInfo, "Daemon: stopping Websocket server"); LogPrint(eLogInfo, "Daemon: stopping Websocket server");
d.m_WebsocketServer->Stop(); d.m_WebsocketServer->Stop();
d.m_WebsocketServer = nullptr; d.m_WebsocketServer = nullptr;
} }
#endif #endif
i2p::crypto::TerminateCrypto (); i2p::crypto::TerminateCrypto ();
return true; return true;
} }
} }
} }

View file

@ -3,26 +3,28 @@
namespace i2p namespace i2p
{ {
namespace event namespace event
{ {
EventCore core; EventCore core;
void EventCore::SetListener(EventListener * l) void EventCore::SetListener(EventListener * l)
{ {
m_listener = l; m_listener = l;
LogPrint(eLogInfo, "Event: listener set"); LogPrint(eLogInfo, "Event: listener set");
} }
void EventCore::QueueEvent(const EventType & ev) void EventCore::QueueEvent(const EventType & ev)
{ {
if(m_listener) if(m_listener)
m_listener->HandleEvent(ev); m_listener->HandleEvent(ev);
} }
} }
} }
void EmitEvent(const EventType & e) void EmitEvent(const EventType & e)
{ {
i2p::event::core.QueueEvent(e); #ifdef WITH_EVENTS
i2p::event::core.QueueEvent(e);
#endif
} }

38
Event.h
View file

@ -10,26 +10,26 @@ typedef std::map<std::string, std::string> EventType;
namespace i2p namespace i2p
{ {
namespace event namespace event
{ {
class EventListener { class EventListener {
public: public:
virtual ~EventListener() {}; virtual ~EventListener() {};
virtual void HandleEvent(const EventType & ev) = 0; virtual void HandleEvent(const EventType & ev) = 0;
}; };
class EventCore class EventCore
{ {
public: public:
void QueueEvent(const EventType & ev); void QueueEvent(const EventType & ev);
void SetListener(EventListener * l); void SetListener(EventListener * l);
private: private:
EventListener * m_listener = nullptr; EventListener * m_listener = nullptr;
}; };
extern EventCore core; extern EventCore core;
} }
} }
void EmitEvent(const EventType & ev); void EmitEvent(const EventType & ev);

View file

@ -15,7 +15,7 @@ USE_MESHNET := no
USE_UPNP := no USE_UPNP := no
ifeq ($(WEBSOCKET),1) ifeq ($(WEBSOCKETS),1)
NEEDED_CXXFLAGS += -DWITH_EVENTS NEEDED_CXXFLAGS += -DWITH_EVENTS
DAEMON_SRC += Websocket.cpp DAEMON_SRC += Websocket.cpp
endif endif

View file

@ -605,10 +605,10 @@ namespace transport
if (!memcmp (m_NextMessage->buf + m_NextMessageOffset - 4, checksum, 4)) if (!memcmp (m_NextMessage->buf + m_NextMessageOffset - 4, checksum, 4))
{ {
if (!m_NextMessage->IsExpired ()) if (!m_NextMessage->IsExpired ())
{ {
EmitEvent({{"type", "transport.recvmsg"} , {"ident", GetIdentHashBase64()}, {"number", "1"}}); EmitEvent({{"type", "transport.recvmsg"} , {"ident", GetIdentHashBase64()}, {"number", "1"}});
m_Handler.PutNextMessage (m_NextMessage); m_Handler.PutNextMessage (m_NextMessage);
} }
else else
LogPrint (eLogInfo, "NTCP: message expired"); LogPrint (eLogInfo, "NTCP: message expired");
} }

View file

@ -234,11 +234,11 @@ namespace transport
if (!m_ReceivedMessages.count (msgID)) if (!m_ReceivedMessages.count (msgID))
{ {
m_ReceivedMessages.insert (msgID); m_ReceivedMessages.insert (msgID);
m_LastMessageReceivedTime = i2p::util::GetSinceEpoch<Time> (); m_LastMessageReceivedTime = i2p::util::GetSecondsSinceEpoch ();
if (!msg->IsExpired ()) { if (!msg->IsExpired ()) {
EmitEvent({{"type", "transport.recvmsg"} , {"ident", m_Session.GetIdentHashBase64()}, {"number", "1"}}); EmitEvent({{"type", "transport.recvmsg"} , {"ident", m_Session.GetIdentHashBase64()}, {"number", "1"}});
m_Handler.PutNextMessage (msg); m_Handler.PutNextMessage (msg);
} }
else else
LogPrint (eLogDebug, "SSU: message expired"); LogPrint (eLogDebug, "SSU: message expired");
} }

View file

@ -65,8 +65,8 @@ namespace transport
virtual ~TransportSession () {}; virtual ~TransportSession () {};
virtual void Done () = 0; virtual void Done () = 0;
std::string GetIdentHashBase64() const { return m_RemoteIdentity->GetIdentHash().ToBase64(); } std::string GetIdentHashBase64() const { return m_RemoteIdentity->GetIdentHash().ToBase64(); }
std::shared_ptr<const i2p::data::IdentityEx> GetRemoteIdentity () { return m_RemoteIdentity; }; std::shared_ptr<const i2p::data::IdentityEx> GetRemoteIdentity () { return m_RemoteIdentity; };
void SetRemoteIdentity (std::shared_ptr<const i2p::data::IdentityEx> ident) { m_RemoteIdentity = ident; }; void SetRemoteIdentity (std::shared_ptr<const i2p::data::IdentityEx> ident) { m_RemoteIdentity = ident; };

View file

@ -231,8 +231,7 @@ namespace transport
void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs) void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs)
{ {
EmitEvent({{"type" , "transport.sendmsg"}, {"ident", ident.ToBase64()}, {"number", std::to_string(msgs.size())}});
EmitEvent({{"type" , "transport.sendmsg"}, {"ident", ident.ToBase64()}, {"number", std::to_string(msgs.size())}});
m_Service.post (std::bind (&Transports::PostMessages, this, ident, msgs)); m_Service.post (std::bind (&Transports::PostMessages, this, ident, msgs));
} }
@ -507,10 +506,9 @@ namespace transport
} }
if (m_SSUServer) if (m_SSUServer)
{ {
bool nat; i2p::config::GetOption("nat", nat); bool nat; i2p::config::GetOption("nat", nat);
if (nat) if (nat)
i2p::context.SetStatus (eRouterStatusTesting); i2p::context.SetStatus (eRouterStatusTesting);
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
{ {
auto router = i2p::data::netdb.GetRandomPeerTestRouter (); auto router = i2p::data::netdb.GetRandomPeerTestRouter ();
@ -569,11 +567,10 @@ namespace transport
auto remoteIdentity = session->GetRemoteIdentity (); auto remoteIdentity = session->GetRemoteIdentity ();
if (!remoteIdentity) return; if (!remoteIdentity) return;
auto ident = remoteIdentity->GetIdentHash (); auto ident = remoteIdentity->GetIdentHash ();
auto it = m_Peers.find (ident); auto it = m_Peers.find (ident);
if (it != m_Peers.end ()) if (it != m_Peers.end ())
{ {
EmitEvent({{"type" , "transport.connected"}, {"ident", ident.ToBase64()}, {"inbound", "false"}}); EmitEvent({{"type" , "transport.connected"}, {"ident", ident.ToBase64()}, {"inbound", "false"}});
bool sendDatabaseStore = true; bool sendDatabaseStore = true;
if (it->second.delayedMessages.size () > 0) if (it->second.delayedMessages.size () > 0)
{ {
@ -599,22 +596,22 @@ namespace transport
session->Done(); session->Done();
return; return;
} }
EmitEvent({{"type" , "transport.connected"}, {"ident", ident.ToBase64()}, {"inbound", "true"}}); EmitEvent({{"type" , "transport.connected"}, {"ident", ident.ToBase64()}, {"inbound", "true"}});
session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore
std::unique_lock<std::mutex> l(m_PeersMutex); std::unique_lock<std::mutex> l(m_PeersMutex);
m_Peers.insert (std::make_pair (ident, Peer{ 0, nullptr, { session }, i2p::util::GetSecondsSinceEpoch (), {} })); m_Peers.insert (std::make_pair (ident, Peer{ 0, nullptr, { session }, i2p::util::GetSecondsSinceEpoch (), {} }));
} }
}); });
} }
void Transports::PeerDisconnected (std::shared_ptr<TransportSession> session) void Transports::PeerDisconnected (std::shared_ptr<TransportSession> session)
{ {
m_Service.post([session, this]() m_Service.post([session, this]()
{ {
auto remoteIdentity = session->GetRemoteIdentity (); auto remoteIdentity = session->GetRemoteIdentity ();
if (!remoteIdentity) return; if (!remoteIdentity) return;
auto ident = remoteIdentity->GetIdentHash (); auto ident = remoteIdentity->GetIdentHash ();
EmitEvent({{"type" , "transport.disconnected"}, {"ident", ident.ToBase64()}}); EmitEvent({{"type" , "transport.disconnected"}, {"ident", ident.ToBase64()}});
auto it = m_Peers.find (ident); auto it = m_Peers.find (ident);
if (it != m_Peers.end ()) if (it != m_Peers.end ())
{ {
@ -678,13 +675,13 @@ namespace transport
std::advance (it, rand () % m_Peers.size ()); std::advance (it, rand () % m_Peers.size ());
return it != m_Peers.end () ? it->second.router : nullptr; return it != m_Peers.end () ? it->second.router : nullptr;
} }
void Transports::RestrictRoutesToFamilies(std::set<std::string> families) void Transports::RestrictRoutesToFamilies(std::set<std::string> families)
{ {
std::lock_guard<std::mutex> lock(m_FamilyMutex); std::lock_guard<std::mutex> lock(m_FamilyMutex);
m_TrustedFamilies.clear(); m_TrustedFamilies.clear();
for ( const auto& fam : families ) for ( const auto& fam : families )
m_TrustedFamilies.push_back(fam); m_TrustedFamilies.push_back(fam);
} }
void Transports::RestrictRoutesToRouters(std::set<i2p::data::IdentHash> routers) void Transports::RestrictRoutesToRouters(std::set<i2p::data::IdentHash> routers)
{ {
@ -694,14 +691,14 @@ namespace transport
m_TrustedRouters.push_back(ri); m_TrustedRouters.push_back(ri);
} }
bool Transports::RoutesRestricted() const { bool Transports::RoutesRestricted() const {
std::unique_lock<std::mutex> famlock(m_FamilyMutex); std::unique_lock<std::mutex> famlock(m_FamilyMutex);
std::unique_lock<std::mutex> routerslock(m_TrustedRoutersMutex); std::unique_lock<std::mutex> routerslock(m_TrustedRoutersMutex);
return m_TrustedFamilies.size() > 0 || m_TrustedRouters.size() > 0; return m_TrustedFamilies.size() > 0 || m_TrustedRouters.size() > 0;
} }
/** XXX: if routes are not restricted this dies */ /** XXX: if routes are not restricted this dies */
std::shared_ptr<const i2p::data::RouterInfo> Transports::GetRestrictedPeer() const std::shared_ptr<const i2p::data::RouterInfo> Transports::GetRestrictedPeer() const
{ {
{ {
std::lock_guard<std::mutex> l(m_FamilyMutex); std::lock_guard<std::mutex> l(m_FamilyMutex);
@ -734,7 +731,7 @@ namespace transport
} }
} }
return nullptr; return nullptr;
} }
bool Transports::IsRestrictedPeer(const i2p::data::IdentHash & ih) const bool Transports::IsRestrictedPeer(const i2p::data::IdentHash & ih) const
{ {

View file

@ -11,6 +11,7 @@
#include "Transports.h" #include "Transports.h"
#include "NetDb.h" #include "NetDb.h"
#include "Tunnel.h" #include "Tunnel.h"
#include "TunnelPool.h"
namespace i2p namespace i2p
{ {
@ -30,7 +31,7 @@ namespace tunnel
void Tunnel::Build (uint32_t replyMsgID, std::shared_ptr<OutboundTunnel> outboundTunnel) void Tunnel::Build (uint32_t replyMsgID, std::shared_ptr<OutboundTunnel> outboundTunnel)
{ {
#ifdef WITH_EVENTS #ifdef WITH_EVENTS
std::string peers = i2p::context.GetIdentity()->GetIdentHash().ToBase64(); std::string peers = i2p::context.GetIdentity()->GetIdentHash().ToBase64();
#endif #endif
auto numHops = m_Config->GetNumHops (); auto numHops = m_Config->GetNumHops ();
int numRecords = numHops <= STANDARD_NUM_RECORDS ? STANDARD_NUM_RECORDS : numHops; int numRecords = numHops <= STANDARD_NUM_RECORDS ? STANDARD_NUM_RECORDS : numHops;
@ -58,14 +59,13 @@ namespace tunnel
hop->recordIndex = idx; hop->recordIndex = idx;
i++; i++;
#ifdef WITH_EVENTS #ifdef WITH_EVENTS
peers += ":" + hop->ident->GetIdentHash().ToBase64(); peers += ":" + hop->ident->GetIdentHash().ToBase64();
#endif #endif
hop = hop->next; hop = hop->next;
} }
#ifdef WITH_EVENTS #ifdef WITH_EVENTS
EmitTunnelEvent("tunnel.build", this, peers); EmitTunnelEvent("tunnel.build", this, peers);
#endif #endif
// fill up fake records with random data // fill up fake records with random data
for (int i = numHops; i < numRecords; i++) for (int i = numHops; i < numRecords; i++)
{ {
@ -191,12 +191,12 @@ namespace tunnel
return ret; return ret;
} }
void Tunnel::SetState(TunnelState state) void Tunnel::SetState(TunnelState state)
{ {
m_State = state; m_State = state;
EmitTunnelEvent("tunnel.state", this, state); EmitTunnelEvent("tunnel.state", this, state);
} }
void Tunnel::PrintHops (std::stringstream& s) const void Tunnel::PrintHops (std::stringstream& s) const
{ {
@ -599,7 +599,7 @@ namespace tunnel
} }
} }
if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultTimeout); if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultTimeout);
EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed); EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed);
// delete // delete
it = pendingTunnels.erase (it); it = pendingTunnels.erase (it);
m_NumFailedTunnelCreations++; m_NumFailedTunnelCreations++;
@ -611,7 +611,7 @@ namespace tunnel
LogPrint (eLogDebug, "Tunnel: pending build request ", it->first, " failed, deleted"); LogPrint (eLogDebug, "Tunnel: pending build request ", it->first, " failed, deleted");
if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultRejected); if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultRejected);
EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed); EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed);
it = pendingTunnels.erase (it); it = pendingTunnels.erase (it);
m_NumFailedTunnelCreations++; m_NumFailedTunnelCreations++;
@ -800,7 +800,7 @@ namespace tunnel
{ {
if (config) if (config)
return CreateTunnel<InboundTunnel>(config, outboundTunnel); return CreateTunnel<InboundTunnel>(config, outboundTunnel);
else else
return CreateZeroHopsInboundTunnel (); return CreateZeroHopsInboundTunnel ();
} }

View file

@ -89,7 +89,7 @@ namespace tunnel
std::vector<std::shared_ptr<const i2p::data::IdentityEx> > GetPeers () const; std::vector<std::shared_ptr<const i2p::data::IdentityEx> > GetPeers () const;
std::vector<std::shared_ptr<const i2p::data::IdentityEx> > GetInvertedPeers () const; std::vector<std::shared_ptr<const i2p::data::IdentityEx> > GetInvertedPeers () const;
TunnelState GetState () const { return m_State; }; TunnelState GetState () const { return m_State; };
void SetState (TunnelState state); void SetState (TunnelState state);
bool IsEstablished () const { return m_State == eTunnelStateEstablished; }; bool IsEstablished () const { return m_State == eTunnelStateEstablished; };
bool IsFailed () const { return m_State == eTunnelStateFailed; }; bool IsFailed () const { return m_State == eTunnelStateFailed; };
bool IsRecreated () const { return m_IsRecreated; }; bool IsRecreated () const { return m_IsRecreated; };

View file

@ -7,6 +7,7 @@
#include "Garlic.h" #include "Garlic.h"
#include "Transports.h" #include "Transports.h"
#include "Log.h" #include "Log.h"
#include "Tunnel.h"
#include "TunnelPool.h" #include "TunnelPool.h"
#include "Destination.h" #include "Destination.h"
#include "Event.h" #include "Event.h"
@ -70,7 +71,7 @@ namespace tunnel
{ {
if (!m_IsActive) return; if (!m_IsActive) return;
{ {
EmitTunnelEvent("tunnels.created", createdTunnel); EmitTunnelEvent("tunnels.created", createdTunnel);
std::unique_lock<std::mutex> l(m_InboundTunnelsMutex); std::unique_lock<std::mutex> l(m_InboundTunnelsMutex);
m_InboundTunnels.insert (createdTunnel); m_InboundTunnels.insert (createdTunnel);
} }
@ -82,8 +83,7 @@ namespace tunnel
{ {
if (expiredTunnel) if (expiredTunnel)
{ {
EmitTunnelEvent("tunnels.expired", expiredTunnel); EmitTunnelEvent("tunnels.expired", expiredTunnel);
expiredTunnel->SetTunnelPool (nullptr); expiredTunnel->SetTunnelPool (nullptr);
for (auto& it: m_Tests) for (auto& it: m_Tests)
if (it.second.second == expiredTunnel) it.second.second = nullptr; if (it.second.second == expiredTunnel) it.second.second = nullptr;
@ -97,7 +97,7 @@ namespace tunnel
{ {
if (!m_IsActive) return; if (!m_IsActive) return;
{ {
EmitTunnelEvent("tunnels.created", createdTunnel); EmitTunnelEvent("tunnels.created", createdTunnel);
std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex); std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex);
m_OutboundTunnels.insert (createdTunnel); m_OutboundTunnels.insert (createdTunnel);
} }
@ -108,8 +108,7 @@ namespace tunnel
{ {
if (expiredTunnel) if (expiredTunnel)
{ {
EmitTunnelEvent("tunnels.expired", expiredTunnel); EmitTunnelEvent("tunnels.expired", expiredTunnel);
expiredTunnel->SetTunnelPool (nullptr); expiredTunnel->SetTunnelPool (nullptr);
for (auto& it: m_Tests) for (auto& it: m_Tests)
if (it.second.first == expiredTunnel) it.second.first = nullptr; if (it.second.first == expiredTunnel) it.second.first = nullptr;

View file

@ -15,123 +15,123 @@
namespace i2p namespace i2p
{ {
namespace event namespace event
{ {
typedef websocketpp::server<websocketpp::config::asio> ServerImpl; typedef websocketpp::server<websocketpp::config::asio> ServerImpl;
typedef websocketpp::connection_hdl ServerConn; typedef websocketpp::connection_hdl ServerConn;
class WebsocketServerImpl : public EventListener class WebsocketServerImpl : public EventListener
{ {
private: private:
typedef ServerImpl::message_ptr MessagePtr; typedef ServerImpl::message_ptr MessagePtr;
public: public:
WebsocketServerImpl(const std::string & addr, int port) : m_run(false), m_thread(nullptr) WebsocketServerImpl(const std::string & addr, int port) : m_run(false), m_thread(nullptr)
{ {
m_server.init_asio(); m_server.init_asio();
m_server.set_open_handler(std::bind(&WebsocketServerImpl::ConnOpened, this, std::placeholders::_1)); m_server.set_open_handler(std::bind(&WebsocketServerImpl::ConnOpened, this, std::placeholders::_1));
m_server.set_close_handler(std::bind(&WebsocketServerImpl::ConnClosed, this, std::placeholders::_1)); m_server.set_close_handler(std::bind(&WebsocketServerImpl::ConnClosed, this, std::placeholders::_1));
m_server.set_message_handler(std::bind(&WebsocketServerImpl::OnConnMessage, this, std::placeholders::_1, std::placeholders::_2)); m_server.set_message_handler(std::bind(&WebsocketServerImpl::OnConnMessage, this, std::placeholders::_1, std::placeholders::_2));
m_server.listen(boost::asio::ip::address::from_string(addr), port); m_server.listen(boost::asio::ip::address::from_string(addr), port);
} }
~WebsocketServerImpl() ~WebsocketServerImpl()
{ {
} }
void Start() { void Start() {
m_run = true; m_run = true;
m_server.start_accept(); m_server.start_accept();
m_thread = new std::thread([&] () { m_thread = new std::thread([&] () {
while(m_run) { while(m_run) {
try { try {
m_server.run(); m_server.run();
} catch (std::exception & e ) { } catch (std::exception & e ) {
LogPrint(eLogError, "Websocket server: ", e.what()); LogPrint(eLogError, "Websocket server: ", e.what());
} }
} }
}); });
} }
void Stop() { void Stop() {
m_run = false; m_run = false;
m_server.stop(); m_server.stop();
if(m_thread) { if(m_thread) {
m_thread->join(); m_thread->join();
delete m_thread; delete m_thread;
} }
m_thread = nullptr; m_thread = nullptr;
} }
void ConnOpened(ServerConn c) void ConnOpened(ServerConn c)
{ {
std::lock_guard<std::mutex> lock(m_connsMutex); std::lock_guard<std::mutex> lock(m_connsMutex);
m_conns.insert(c); m_conns.insert(c);
} }
void ConnClosed(ServerConn c) void ConnClosed(ServerConn c)
{ {
std::lock_guard<std::mutex> lock(m_connsMutex); std::lock_guard<std::mutex> lock(m_connsMutex);
m_conns.erase(c); m_conns.erase(c);
} }
void OnConnMessage(ServerConn conn, ServerImpl::message_ptr msg) void OnConnMessage(ServerConn conn, ServerImpl::message_ptr msg)
{ {
(void) conn; (void) conn;
(void) msg; (void) msg;
} }
void HandleEvent(const EventType & ev) void HandleEvent(const EventType & ev)
{ {
std::lock_guard<std::mutex> lock(m_connsMutex); std::lock_guard<std::mutex> lock(m_connsMutex);
LogPrint(eLogDebug, "websocket event"); LogPrint(eLogDebug, "websocket event");
boost::property_tree::ptree event; boost::property_tree::ptree event;
for (const auto & item : ev) { for (const auto & item : ev) {
event.put(item.first, item.second); event.put(item.first, item.second);
} }
std::ostringstream ss; std::ostringstream ss;
write_json(ss, event); write_json(ss, event);
std::string s = ss.str(); std::string s = ss.str();
ConnList::iterator it; ConnList::iterator it;
for (it = m_conns.begin(); it != m_conns.end(); ++it) { for (it = m_conns.begin(); it != m_conns.end(); ++it) {
ServerImpl::connection_ptr con = m_server.get_con_from_hdl(*it); ServerImpl::connection_ptr con = m_server.get_con_from_hdl(*it);
con->send(s); con->send(s);
} }
} }
private: private:
typedef std::set<ServerConn, std::owner_less<ServerConn> > ConnList; typedef std::set<ServerConn, std::owner_less<ServerConn> > ConnList;
bool m_run; bool m_run;
std::thread * m_thread; std::thread * m_thread;
std::mutex m_connsMutex; std::mutex m_connsMutex;
ConnList m_conns; ConnList m_conns;
ServerImpl m_server; ServerImpl m_server;
}; };
WebsocketServer::WebsocketServer(const std::string & addr, int port) : m_impl(new WebsocketServerImpl(addr, port)) {} WebsocketServer::WebsocketServer(const std::string & addr, int port) : m_impl(new WebsocketServerImpl(addr, port)) {}
WebsocketServer::~WebsocketServer() WebsocketServer::~WebsocketServer()
{ {
delete m_impl; delete m_impl;
} }
void WebsocketServer::Start() void WebsocketServer::Start()
{ {
m_impl->Start(); m_impl->Start();
} }
void WebsocketServer::Stop() void WebsocketServer::Stop()
{ {
m_impl->Stop(); m_impl->Stop();
} }
EventListener * WebsocketServer::ToListener() EventListener * WebsocketServer::ToListener()
{ {
return m_impl; return m_impl;
} }
} }
} }

View file

@ -1,30 +1,28 @@
#ifndef WEBSOCKET_H__ #ifndef WEBSOCKET_H__
#define WEBSOCKET_H__ #define WEBSOCKET_H__
#include "Event.h" #include "Event.h"
namespace i2p namespace i2p
{ {
namespace event namespace event
{ {
class WebsocketServerImpl; class WebsocketServerImpl;
class WebsocketServer class WebsocketServer
{ {
public: public:
WebsocketServer(const std::string & addr, int port); WebsocketServer(const std::string & addr, int port);
~WebsocketServer(); ~WebsocketServer();
void Start(); void Start();
void Stop(); void Stop();
EventListener * ToListener(); EventListener * ToListener();
private: private:
WebsocketServerImpl * m_impl; WebsocketServerImpl * m_impl;
}; };
} }
} }
#endif #endif