diff --git a/ClientContext.cpp b/ClientContext.cpp index b7300ad2..215768c2 100644 --- a/ClientContext.cpp +++ b/ClientContext.cpp @@ -290,7 +290,7 @@ namespace client } return infos; } - + std::shared_ptr ClientContext::CreateNewLocalDestination (bool isPublic, i2p::data::SigningKeyType sigType, const std::map * params) { diff --git a/ClientContext.h b/ClientContext.h index d82058c6..356f28c4 100644 --- a/ClientContext.h +++ b/ClientContext.h @@ -68,7 +68,7 @@ namespace client const SAMBridge * GetSAMBridge () const { return m_SamBridge; }; std::vector > GetForwardInfosFor(const i2p::data::IdentHash & destination); - + private: void ReadTunnels (); diff --git a/Config.cpp b/Config.cpp index 4750a9e3..e72749bc 100644 --- a/Config.cpp +++ b/Config.cpp @@ -179,6 +179,12 @@ namespace config { ("trust.routers", value()->default_value(""), "Only Connect to these routers") ("trust.hidden", value()->default_value(false), "Should we hide our router from other routers?"); + options_description websocket("Websocket Options"); + websocket.add_options() + ("websockets.enabled", value()->default_value(false), "enable websocket server") + ("websockets.address", value()->default_value("127.0.0.1"), "address to bind websocket server on") + ("websockets.port", value()->default_value(7666), "port to bind websocket server on"); + m_OptionsDesc .add(general) .add(limits) @@ -193,7 +199,8 @@ namespace config { .add(precomputation) .add(reseed) .add(addressbook) - .add(trust) + .add(trust) + .add(websocket) ; } diff --git a/Daemon.cpp b/Daemon.cpp index ce2f4173..9c0d15f8 100644 --- a/Daemon.cpp +++ b/Daemon.cpp @@ -25,6 +25,9 @@ #include "UPnP.h" #include "util.h" +#include "Event.h" +#include "Websocket.h" + namespace i2p { namespace util @@ -38,6 +41,9 @@ namespace i2p std::unique_ptr httpServer; std::unique_ptr m_I2PControlService; std::unique_ptr UPnP; +#ifdef WITH_EVENTS + std::unique_ptr m_WebsocketServer; +#endif }; Daemon_Singleton::Daemon_Singleton() : isDaemon(false), running(true), d(*new Daemon_Singleton_Private()) {} @@ -115,6 +121,9 @@ namespace i2p bool precomputation; i2p::config::GetOption("precomputation.elgamal", precomputation); i2p::crypto::InitCrypto (precomputation); + + int netID; i2p::config::GetOption("netid", netID); + i2p::context.SetNetID (netID); i2p::context.Init (); bool ipv6; i2p::config::GetOption("ipv6", ipv6); @@ -138,9 +147,6 @@ namespace i2p uint16_t transitTunnels; i2p::config::GetOption("limits.transittunnels", transitTunnels); SetMaxNumTransitTunnels (transitTunnels); - int netID; i2p::config::GetOption("netid", netID); - i2p::context.SetNetID (netID); - bool isFloodfill; i2p::config::GetOption("floodfill", isFloodfill); if (isFloodfill) { LogPrint(eLogInfo, "Daemon: router will be floodfill"); @@ -290,12 +296,27 @@ namespace i2p d.m_I2PControlService = std::unique_ptr(new i2p::client::I2PControlService (i2pcpAddr, i2pcpPort)); d.m_I2PControlService->Start (); } +#ifdef WITH_EVENTS + bool websocket; i2p::config::GetOption("websockets.enabled", websocket); + if(websocket) { + std::string websocketAddr; i2p::config::GetOption("websockets.address", websocketAddr); + uint16_t websocketPort; i2p::config::GetOption("websockets.port", websocketPort); + LogPrint(eLogInfo, "Daemon: starting Websocket server at ", websocketAddr, ":", websocketPort); + d.m_WebsocketServer = std::unique_ptr(new i2p::event::WebsocketServer (websocketAddr, websocketPort)); + d.m_WebsocketServer->Start(); + i2p::event::core.SetListener(d.m_WebsocketServer->ToListener()); + } + +#endif return true; } bool Daemon_Singleton::stop() { +#ifdef WITH_EVENTS + i2p::event::core.SetListener(nullptr); +#endif LogPrint(eLogInfo, "Daemon: shutting down"); LogPrint(eLogInfo, "Daemon: stopping Client"); i2p::client::context.Stop(); @@ -321,10 +342,17 @@ namespace i2p LogPrint(eLogInfo, "Daemon: stopping I2PControl"); d.m_I2PControlService->Stop (); d.m_I2PControlService = nullptr; - } + } +#ifdef WITH_EVENTS + if (d.m_WebsocketServer) { + LogPrint(eLogInfo, "Daemon: stopping Websocket server"); + d.m_WebsocketServer->Stop(); + d.m_WebsocketServer = nullptr; + } +#endif i2p::crypto::TerminateCrypto (); return true; } - } +} } diff --git a/Event.cpp b/Event.cpp new file mode 100644 index 00000000..e148538e --- /dev/null +++ b/Event.cpp @@ -0,0 +1,32 @@ +#include "Event.h" +#include "Log.h" + +namespace i2p +{ + namespace event + { +#ifdef WITH_EVENTS + EventCore core; +#endif + + void EventCore::SetListener(EventListener * l) + { + m_listener = l; + LogPrint(eLogInfo, "Event: listener set"); + } + + void EventCore::QueueEvent(const EventType & ev) + { + if(m_listener) + m_listener->HandleEvent(ev); + } + } +} + +void EmitEvent(const EventType & e) +{ +#ifdef WITH_EVENTS + i2p::event::core.QueueEvent(e); +#endif +} + diff --git a/Event.h b/Event.h new file mode 100644 index 00000000..1ab37847 --- /dev/null +++ b/Event.h @@ -0,0 +1,37 @@ +#ifndef EVENT_H__ +#define EVENT_H__ +#include +#include +#include + +#include + +typedef std::map EventType; + +namespace i2p +{ + namespace event + { + class EventListener { + public: + virtual ~EventListener() {}; + virtual void HandleEvent(const EventType & ev) = 0; + }; + + class EventCore + { + public: + void QueueEvent(const EventType & ev); + void SetListener(EventListener * l); + + private: + EventListener * m_listener = nullptr; + }; +#ifdef WITH_EVENTS + extern EventCore core; +#endif + } +} +void EmitEvent(const EventType & ev); + +#endif diff --git a/I2NPProtocol.cpp b/I2NPProtocol.cpp index cdc4fb9b..96b30cb6 100644 --- a/I2NPProtocol.cpp +++ b/I2NPProtocol.cpp @@ -547,7 +547,6 @@ namespace i2p uint8_t typeID = msg[I2NP_HEADER_TYPEID_OFFSET]; uint32_t msgID = bufbe32toh (msg + I2NP_HEADER_MSGID_OFFSET); LogPrint (eLogDebug, "I2NP: msg received len=", len,", type=", (int)typeID, ", msgID=", (unsigned int)msgID); - uint8_t * buf = msg + I2NP_HEADER_SIZE; int size = bufbe16toh (msg + I2NP_HEADER_SIZE_OFFSET); switch (typeID) diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 8756da8b..93bcff0c 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -238,8 +238,8 @@ namespace client if (line == "\r") endOfHeader = true; else { - if (line.find ("Host:") != std::string::npos) - m_OutHeader << "Host: " << m_Host << "\r\n"; + if (m_Host.length () > 0 && line.find ("Host:") != std::string::npos) + m_OutHeader << "Host: " << m_Host << "\r\n"; // override host else m_OutHeader << line << "\n"; } @@ -501,7 +501,7 @@ namespace client int port, std::shared_ptr localDestination, const std::string& host, int inport, bool gzip): I2PServerTunnel (name, address, port, localDestination, inport, gzip), - m_Host (host.length () > 0 ? host : address) + m_Host (host) { } diff --git a/Log.h b/Log.h index 79bbeb3f..b4eb70cd 100644 --- a/Log.h +++ b/Log.h @@ -152,13 +152,13 @@ namespace log { std::string text; /**< message text as single string */ LogLevel level; /**< message level */ std::thread::id tid; /**< id of thread that generated message */ - + LogMsg (LogLevel lvl, std::time_t ts, const std::string & txt): timestamp(ts), text(txt), level(lvl) {}; }; Log & Logger(); } // log -} // i2p +} /** internal usage only -- folding args array to single string */ template diff --git a/Makefile b/Makefile index 70869e42..d0e675c7 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,12 @@ USE_STATIC := no USE_MESHNET := no USE_UPNP := no + +ifeq ($(WEBSOCKETS),1) + NEEDED_CXXFLAGS += -DWITH_EVENTS + DAEMON_SRC += Websocket.cpp +endif + ifeq ($(UNAME),Darwin) DAEMON_SRC += DaemonLinux.cpp ifeq ($(HOMEBREW),1) diff --git a/NTCPSession.cpp b/NTCPSession.cpp index 20dc9ab0..f9116e3d 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -11,6 +11,9 @@ #include "Transports.h" #include "NetDb.h" #include "NTCPSession.h" +#ifdef WITH_EVENTS +#include "Event.h" +#endif using namespace i2p::crypto; @@ -604,7 +607,12 @@ namespace transport if (!memcmp (m_NextMessage->buf + m_NextMessageOffset - 4, checksum, 4)) { if (!m_NextMessage->IsExpired ()) + { +#ifdef WITH_EVENTS + EmitEvent({{"type", "transport.recvmsg"} , {"ident", GetIdentHashBase64()}, {"number", "1"}}); +#endif m_Handler.PutNextMessage (m_NextMessage); + } else LogPrint (eLogInfo, "NTCP: message expired"); } diff --git a/SSUData.cpp b/SSUData.cpp index e5abfd54..ad38cf25 100644 --- a/SSUData.cpp +++ b/SSUData.cpp @@ -5,6 +5,9 @@ #include "NetDb.h" #include "SSU.h" #include "SSUData.h" +#ifdef WITH_EVENTS +#include "Event.h" +#endif namespace i2p { @@ -234,8 +237,13 @@ namespace transport { m_ReceivedMessages.insert (msgID); m_LastMessageReceivedTime = i2p::util::GetSecondsSinceEpoch (); - if (!msg->IsExpired ()) + if (!msg->IsExpired ()) + { +#ifdef WITH_EVENTS + EmitEvent({{"type", "transport.recvmsg"} , {"ident", m_Session.GetIdentHashBase64()}, {"number", "1"}}); +#endif m_Handler.PutNextMessage (msg); + } else LogPrint (eLogDebug, "SSU: message expired"); } diff --git a/Timestamp.cpp b/Timestamp.cpp index fbe51ea1..6079c881 100644 --- a/Timestamp.cpp +++ b/Timestamp.cpp @@ -1,6 +1,7 @@ #include #include #include +#include "Log.h" #include "I2PEndian.h" #include "Timestamp.h" @@ -8,7 +9,7 @@ namespace i2p { namespace util { - std::chrono::system_clock::duration g_TimeOffset = std::chrono::system_clock::duration::zero (); + static int64_t g_TimeOffset = 0; // in seconds void SyncTimeWithNTP (const std::string& address) { @@ -23,25 +24,34 @@ namespace util socket.open (boost::asio::ip::udp::v4 (), ec); if (!ec) { - uint8_t request[48];// 48 bytes NTP request - memset (request, 0, 48); - request[0] = 0x80; // client mode, version 0 - uint8_t * response = new uint8_t[1500]; // MTU + uint8_t buf[48];// 48 bytes NTP request/response + memset (buf, 0, 48); + htobe32buf (buf, (3 << 27) | (3 << 24)); // RFC 4330 size_t len = 0; try { - socket.send_to (boost::asio::buffer (request, 48), ep); - len = socket.receive_from (boost::asio::buffer (response, 1500), ep); + socket.send_to (boost::asio::buffer (buf, 48), ep); + int i = 0; + while (!socket.available() && i < 10) // 10 seconds max + { + std::this_thread::sleep_for (std::chrono::seconds(1)); + i++; + } + if (socket.available ()) + len = socket.receive_from (boost::asio::buffer (buf, 48), ep); } catch (std::exception& e) { + LogPrint (eLogError, "NTP error: ", e.what ()); } if (len >= 8) { - uint32_t ts = bufbe32toh (response + 4); + auto ourTs = GetSecondsSinceEpoch (); + uint32_t ts = bufbe32toh (buf + 32); if (ts > 2208988800U) ts -= 2208988800U; // 1/1/1970 from 1/1/1900 + g_TimeOffset = ts - ourTs; + LogPrint (eLogInfo, address, " time offset from system time is ", g_TimeOffset, " seconds"); } - delete[] response; } } } diff --git a/Timestamp.h b/Timestamp.h index 2e61d856..d48cb164 100644 --- a/Timestamp.h +++ b/Timestamp.h @@ -8,8 +8,6 @@ namespace i2p { namespace util { - extern std::chrono::system_clock::duration g_TimeOffset; - inline uint64_t GetMillisecondsSinceEpoch () { return std::chrono::duration_cast( diff --git a/TransportSession.h b/TransportSession.h index 9c97d02e..5950fb06 100644 --- a/TransportSession.h +++ b/TransportSession.h @@ -64,6 +64,8 @@ namespace transport virtual ~TransportSession () {}; virtual void Done () = 0; + + std::string GetIdentHashBase64() const { return m_RemoteIdentity ? m_RemoteIdentity->GetIdentHash().ToBase64() : ""; } std::shared_ptr GetRemoteIdentity () { return m_RemoteIdentity; }; void SetRemoteIdentity (std::shared_ptr ident) { m_RemoteIdentity = ident; }; diff --git a/Transports.cpp b/Transports.cpp index 2e5af1f8..913106ae 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -5,6 +5,10 @@ #include "NetDb.h" #include "Transports.h" #include "Config.h" +#ifdef WITH_EVENTS +#include "Event.h" +#include "util.h" +#endif using namespace i2p::data; @@ -230,6 +234,9 @@ namespace transport void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs) { +#ifdef WITH_EVENTS + EmitEvent({{"type" , "transport.sendmsg"}, {"ident", ident.ToBase64()}, {"number", std::to_string(msgs.size())}}); +#endif m_Service.post (std::bind (&Transports::PostMessages, this, ident, msgs)); } @@ -504,10 +511,9 @@ namespace transport } if (m_SSUServer) { - bool nat; i2p::config::GetOption("nat", nat); - if (nat) - i2p::context.SetStatus (eRouterStatusTesting); - + bool nat; i2p::config::GetOption("nat", nat); + if (nat) + i2p::context.SetStatus (eRouterStatusTesting); for (int i = 0; i < 5; i++) { auto router = i2p::data::netdb.GetRandomPeerTestRouter (); @@ -569,6 +575,9 @@ namespace transport auto it = m_Peers.find (ident); if (it != m_Peers.end ()) { +#ifdef WITH_EVENTS + EmitEvent({{"type" , "transport.connected"}, {"ident", ident.ToBase64()}, {"inbound", "false"}}); +#endif bool sendDatabaseStore = true; if (it->second.delayedMessages.size () > 0) { @@ -594,20 +603,26 @@ namespace transport session->Done(); return; } +#ifdef WITH_EVENTS + EmitEvent({{"type" , "transport.connected"}, {"ident", ident.ToBase64()}, {"inbound", "true"}}); +#endif session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore std::unique_lock l(m_PeersMutex); m_Peers.insert (std::make_pair (ident, Peer{ 0, nullptr, { session }, i2p::util::GetSecondsSinceEpoch (), {} })); } - }); + }); } void Transports::PeerDisconnected (std::shared_ptr session) { m_Service.post([session, this]() - { + { auto remoteIdentity = session->GetRemoteIdentity (); if (!remoteIdentity) return; auto ident = remoteIdentity->GetIdentHash (); +#ifdef WITH_EVENTS + EmitEvent({{"type" , "transport.disconnected"}, {"ident", ident.ToBase64()}}); +#endif auto it = m_Peers.find (ident); if (it != m_Peers.end ()) { @@ -671,13 +686,13 @@ namespace transport std::advance (it, rand () % m_Peers.size ()); return it != m_Peers.end () ? it->second.router : nullptr; } - void Transports::RestrictRoutesToFamilies(std::set families) - { - std::lock_guard lock(m_FamilyMutex); - m_TrustedFamilies.clear(); - for ( const auto& fam : families ) - m_TrustedFamilies.push_back(fam); - } + void Transports::RestrictRoutesToFamilies(std::set families) + { + std::lock_guard lock(m_FamilyMutex); + m_TrustedFamilies.clear(); + for ( const auto& fam : families ) + m_TrustedFamilies.push_back(fam); + } void Transports::RestrictRoutesToRouters(std::set routers) { @@ -687,14 +702,14 @@ namespace transport m_TrustedRouters.push_back(ri); } - bool Transports::RoutesRestricted() const { - std::unique_lock famlock(m_FamilyMutex); + bool Transports::RoutesRestricted() const { + std::unique_lock famlock(m_FamilyMutex); std::unique_lock routerslock(m_TrustedRoutersMutex); - return m_TrustedFamilies.size() > 0 || m_TrustedRouters.size() > 0; - } + return m_TrustedFamilies.size() > 0 || m_TrustedRouters.size() > 0; + } - /** XXX: if routes are not restricted this dies */ - std::shared_ptr Transports::GetRestrictedPeer() const + /** XXX: if routes are not restricted this dies */ + std::shared_ptr Transports::GetRestrictedPeer() const { { std::lock_guard l(m_FamilyMutex); @@ -727,7 +742,7 @@ namespace transport } } return nullptr; - } + } bool Transports::IsRestrictedPeer(const i2p::data::IdentHash & ih) const { diff --git a/Tunnel.cpp b/Tunnel.cpp index 0c1b278a..59dfc519 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -11,6 +11,7 @@ #include "Transports.h" #include "NetDb.h" #include "Tunnel.h" +#include "TunnelPool.h" namespace i2p { @@ -29,12 +30,14 @@ namespace tunnel void Tunnel::Build (uint32_t replyMsgID, std::shared_ptr outboundTunnel) { +#ifdef WITH_EVENTS + std::string peers = i2p::context.GetIdentity()->GetIdentHash().ToBase64(); +#endif auto numHops = m_Config->GetNumHops (); int numRecords = numHops <= STANDARD_NUM_RECORDS ? STANDARD_NUM_RECORDS : numHops; auto msg = NewI2NPShortMessage (); *msg->GetPayload () = numRecords; msg->len += numRecords*TUNNEL_BUILD_RECORD_SIZE + 1; - // shuffle records std::vector recordIndicies; for (int i = 0; i < numRecords; i++) recordIndicies.push_back(i); @@ -55,8 +58,14 @@ namespace tunnel hop->CreateBuildRequestRecord (records + idx*TUNNEL_BUILD_RECORD_SIZE, msgID); hop->recordIndex = idx; i++; +#ifdef WITH_EVENTS + peers += ":" + hop->ident->GetIdentHash().ToBase64(); +#endif hop = hop->next; } +#ifdef WITH_EVENTS + EmitTunnelEvent("tunnel.build", this, peers); +#endif // fill up fake records with random data for (int i = numHops; i < numRecords; i++) { @@ -182,6 +191,13 @@ namespace tunnel return ret; } + void Tunnel::SetState(TunnelState state) + { + m_State = state; + EmitTunnelEvent("tunnel.state", this, state); + } + + void Tunnel::PrintHops (std::stringstream& s) const { // hops are in inverted order, we must print in direct order @@ -582,6 +598,7 @@ namespace tunnel hop = hop->next; } } + EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed); // delete it = pendingTunnels.erase (it); m_NumFailedTunnelCreations++; @@ -591,6 +608,9 @@ namespace tunnel break; case eTunnelStateBuildFailed: LogPrint (eLogDebug, "Tunnel: pending build request ", it->first, " failed, deleted"); + + EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed); + it = pendingTunnels.erase (it); m_NumFailedTunnelCreations++; break; @@ -776,7 +796,7 @@ namespace tunnel std::shared_ptr Tunnels::CreateInboundTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel) { - if (config) + if (config) return CreateTunnel(config, outboundTunnel); else return CreateZeroHopsInboundTunnel (); diff --git a/Tunnel.h b/Tunnel.h index 5bc8b195..54917d49 100644 --- a/Tunnel.h +++ b/Tunnel.h @@ -19,11 +19,49 @@ #include "TunnelGateway.h" #include "TunnelBase.h" #include "I2NPProtocol.h" +#include "Event.h" namespace i2p { namespace tunnel -{ +{ + + template + static void EmitTunnelEvent(const std::string & ev, const TunnelT & t) + { +#ifdef WITH_EVENTS + EmitEvent({{"type", ev}, {"tid", std::to_string(t->GetTunnelID())}}); +#else + (void) ev; + (void) t; +#endif + } + + template + static void EmitTunnelEvent(const std::string & ev, TunnelT * t, const T & val) + { +#ifdef WITH_EVENTS + EmitEvent({{"type", ev}, {"tid", std::to_string(t->GetTunnelID())}, {"value", std::to_string(val)}, {"inbound", std::to_string(t->IsInbound())}}); +#else + (void) ev; + (void) t; + (void) val; +#endif + } + + template + static void EmitTunnelEvent(const std::string & ev, TunnelT * t, const std::string & val) + { +#ifdef WITH_EVENTS + EmitEvent({{"type", ev}, {"tid", std::to_string(t->GetTunnelID())}, {"value", val}, {"inbound", std::to_string(t->IsInbound())}}); +#else + (void) ev; + (void) t; + (void) val; +#endif + } + + const int TUNNEL_EXPIRATION_TIMEOUT = 660; // 11 minutes const int TUNNEL_EXPIRATION_THRESHOLD = 60; // 1 minute const int TUNNEL_RECREATION_THRESHOLD = 90; // 1.5 minutes @@ -40,7 +78,7 @@ namespace tunnel eTunnelStateFailed, eTunnelStateExpiring }; - + class OutboundTunnel; class InboundTunnel; class Tunnel: public TunnelBase @@ -62,12 +100,13 @@ namespace tunnel std::vector > GetPeers () const; std::vector > GetInvertedPeers () const; TunnelState GetState () const { return m_State; }; - void SetState (TunnelState state) { m_State = state; }; + void SetState (TunnelState state); bool IsEstablished () const { return m_State == eTunnelStateEstablished; }; bool IsFailed () const { return m_State == eTunnelStateFailed; }; bool IsRecreated () const { return m_IsRecreated; }; void SetIsRecreated () { m_IsRecreated = true; }; - + virtual bool IsInbound() const = 0; + std::shared_ptr GetTunnelPool () const { return m_Pool; }; void SetTunnelPool (std::shared_ptr pool) { m_Pool = pool; }; @@ -107,6 +146,8 @@ namespace tunnel // implements TunnelBase void HandleTunnelDataMsg (std::shared_ptr tunnelMsg); + + bool IsInbound() const { return false; } private: @@ -123,7 +164,7 @@ namespace tunnel void HandleTunnelDataMsg (std::shared_ptr msg); virtual size_t GetNumReceivedBytes () const { return m_Endpoint.GetNumReceivedBytes (); }; void Print (std::stringstream& s) const; - + bool IsInbound() const { return true; } private: TunnelEndpoint m_Endpoint; diff --git a/TunnelBase.h b/TunnelBase.h index bec06400..1b4e1e1f 100644 --- a/TunnelBase.h +++ b/TunnelBase.h @@ -48,7 +48,7 @@ namespace tunnel uint32_t GetCreationTime () const { return m_CreationTime; }; void SetCreationTime (uint32_t t) { m_CreationTime = t; }; - + private: uint32_t m_TunnelID, m_NextTunnelID; diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 35272f2c..56d72bfb 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -7,12 +7,18 @@ #include "Garlic.h" #include "Transports.h" #include "Log.h" +#include "Tunnel.h" #include "TunnelPool.h" +#include "Destination.h" +#ifdef WITH_EVENTS +#include "Event.h" +#endif namespace i2p { namespace tunnel { + TunnelPool::TunnelPool (int numInboundHops, int numOutboundHops, int numInboundTunnels, int numOutboundTunnels): m_NumInboundHops (numInboundHops), m_NumOutboundHops (numOutboundHops), m_NumInboundTunnels (numInboundTunnels), m_NumOutboundTunnels (numOutboundTunnels), m_IsActive (true), @@ -67,6 +73,9 @@ namespace tunnel { if (!m_IsActive) return; { +#ifdef WITH_EVENTS + EmitTunnelEvent("tunnels.created", createdTunnel); +#endif std::unique_lock l(m_InboundTunnelsMutex); m_InboundTunnels.insert (createdTunnel); } @@ -77,7 +86,10 @@ namespace tunnel void TunnelPool::TunnelExpired (std::shared_ptr expiredTunnel) { if (expiredTunnel) - { + { +#ifdef WITH_EVENTS + EmitTunnelEvent("tunnels.expired", expiredTunnel); +#endif expiredTunnel->SetTunnelPool (nullptr); for (auto& it: m_Tests) if (it.second.second == expiredTunnel) it.second.second = nullptr; @@ -91,6 +103,9 @@ namespace tunnel { if (!m_IsActive) return; { +#ifdef WITH_EVENTS + EmitTunnelEvent("tunnels.created", createdTunnel); +#endif std::unique_lock l(m_OutboundTunnelsMutex); m_OutboundTunnels.insert (createdTunnel); } @@ -101,6 +116,9 @@ namespace tunnel { if (expiredTunnel) { +#ifdef WITH_EVENTS + EmitTunnelEvent("tunnels.expired", expiredTunnel); +#endif expiredTunnel->SetTunnelPool (nullptr); for (auto& it: m_Tests) if (it.second.first == expiredTunnel) it.second.first = nullptr; diff --git a/Websocket.cpp b/Websocket.cpp new file mode 100644 index 00000000..0de44efe --- /dev/null +++ b/Websocket.cpp @@ -0,0 +1,137 @@ +#include "Websocket.h" +#include "Log.h" + +#include + +#include +#include +#include +#define GCC47_BOOST149 ((BOOST_VERSION == 104900) && (__GNUC__ == 4) && (__GNUC_MINOR__ >= 7)) +#if !GCC47_BOOST149 +#include +#endif + +#include + +namespace i2p +{ + namespace event + { + + typedef websocketpp::server ServerImpl; + typedef websocketpp::connection_hdl ServerConn; + + class WebsocketServerImpl : public EventListener + { + private: + typedef ServerImpl::message_ptr MessagePtr; + public: + + WebsocketServerImpl(const std::string & addr, int port) : m_run(false), m_thread(nullptr) + { + m_server.init_asio(); + m_server.set_open_handler(std::bind(&WebsocketServerImpl::ConnOpened, this, std::placeholders::_1)); + m_server.set_close_handler(std::bind(&WebsocketServerImpl::ConnClosed, this, std::placeholders::_1)); + m_server.set_message_handler(std::bind(&WebsocketServerImpl::OnConnMessage, this, std::placeholders::_1, std::placeholders::_2)); + + m_server.listen(boost::asio::ip::address::from_string(addr), port); + } + + ~WebsocketServerImpl() + { + } + + void Start() { + m_run = true; + m_server.start_accept(); + m_thread = new std::thread([&] () { + while(m_run) { + try { + m_server.run(); + } catch (std::exception & e ) { + LogPrint(eLogError, "Websocket server: ", e.what()); + } + } + }); + } + + void Stop() { + m_run = false; + m_server.stop(); + if(m_thread) { + m_thread->join(); + delete m_thread; + } + m_thread = nullptr; + } + + void ConnOpened(ServerConn c) + { + std::lock_guard lock(m_connsMutex); + m_conns.insert(c); + } + + void ConnClosed(ServerConn c) + { + std::lock_guard lock(m_connsMutex); + m_conns.erase(c); + } + + void OnConnMessage(ServerConn conn, ServerImpl::message_ptr msg) + { + (void) conn; + (void) msg; + } + + void HandleEvent(const EventType & ev) + { + std::lock_guard lock(m_connsMutex); + LogPrint(eLogDebug, "websocket event"); + boost::property_tree::ptree event; + for (const auto & item : ev) { + event.put(item.first, item.second); + } + std::ostringstream ss; + write_json(ss, event); + std::string s = ss.str(); + + ConnList::iterator it; + for (it = m_conns.begin(); it != m_conns.end(); ++it) { + ServerImpl::connection_ptr con = m_server.get_con_from_hdl(*it); + con->send(s); + } + } + + private: + typedef std::set > ConnList; + bool m_run; + std::thread * m_thread; + std::mutex m_connsMutex; + ConnList m_conns; + ServerImpl m_server; + }; + + + WebsocketServer::WebsocketServer(const std::string & addr, int port) : m_impl(new WebsocketServerImpl(addr, port)) {} + WebsocketServer::~WebsocketServer() + { + delete m_impl; + } + + + void WebsocketServer::Start() + { + m_impl->Start(); + } + + void WebsocketServer::Stop() + { + m_impl->Stop(); + } + + EventListener * WebsocketServer::ToListener() + { + return m_impl; + } + } +} diff --git a/Websocket.h b/Websocket.h new file mode 100644 index 00000000..2ddca38f --- /dev/null +++ b/Websocket.h @@ -0,0 +1,28 @@ +#ifndef WEBSOCKET_H__ +#define WEBSOCKET_H__ +#include "Event.h" +namespace i2p +{ + namespace event + { + + class WebsocketServerImpl; + + class WebsocketServer + { + public: + WebsocketServer(const std::string & addr, int port); + ~WebsocketServer(); + + void Start(); + void Stop(); + + EventListener * ToListener(); + + private: + WebsocketServerImpl * m_impl; + }; + + } +} +#endif diff --git a/android/jni/Android.mk b/android/jni/Android.mk index a31fcfb2..a3d58ce6 100755 --- a/android/jni/Android.mk +++ b/android/jni/Android.mk @@ -58,6 +58,7 @@ LOCAL_SRC_FILES := DaemonAndroid.cpp i2pd_android.cpp \ ../../TunnelGateway.cpp \ ../../TunnelPool.cpp \ ../../Timestamp.cpp \ + ../../Event.cpp \ ../../util.cpp \ ../../i2pd.cpp ../../UPnP.cpp diff --git a/build/CMakeLists.txt b/build/CMakeLists.txt index f6697612..5291a82f 100644 --- a/build/CMakeLists.txt +++ b/build/CMakeLists.txt @@ -19,6 +19,7 @@ option(WITH_MESHNET "Build for cjdns test network" OFF) option(WITH_ADDRSANITIZER "Build with address sanitizer unix only" OFF) option(WITH_THREADSANITIZER "Build with thread sanitizer unix only" OFF) option(WITH_I2LUA "Build for i2lua" OFF) +option(WITH_WEBSOCKETS "Build with websocket ui" OFF) # paths set ( CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake_modules" ) @@ -59,8 +60,14 @@ set (LIBI2PD_SRC "${CMAKE_SOURCE_DIR}/Signature.cpp" "${CMAKE_SOURCE_DIR}/Timestamp.cpp" "${CMAKE_SOURCE_DIR}/api.cpp" + "${CMAKE_SOURCE_DIR}/Event.cpp" ) +if (WITH_WEBSOCKETS) + add_definitions(-DWITH_EVENTS) + find_package(websocketpp REQUIRED) +endif () + if (CMAKE_SYSTEM_NAME STREQUAL "Windows" OR MSYS) list (APPEND LIBI2PD_SRC "${CMAKE_SOURCE_DIR}/I2PEndian.cpp") endif () @@ -92,6 +99,9 @@ set (CLIENT_SRC "${CMAKE_SOURCE_DIR}/I2CP.cpp" ) +if(WITH_WEBSOCKETS) + list (APPEND CLIENT_SRC "${CMAKE_SOURCE_DIR}/Websocket.cpp") +endif () add_library(i2pdclient ${CLIENT_SRC}) set (DAEMON_SRC @@ -367,6 +377,7 @@ message(STATUS " MESHNET : ${WITH_MESHNET}") message(STATUS " ADDRSANITIZER : ${WITH_ADDRSANITIZER}") message(STATUS " THEADSANITIZER : ${WITH_THREADSANITIZER}") message(STATUS " I2LUA : ${WITH_I2LUA}") +message(STATUS " WEBSOCKETS : ${WITH_WEBSOCKETS}") message(STATUS "---------------------------------------") #Handle paths nicely diff --git a/docs/building/requirements.md b/docs/building/requirements.md index 8712a03d..1bb163e1 100644 --- a/docs/building/requirements.md +++ b/docs/building/requirements.md @@ -12,3 +12,4 @@ Optional tools: * cmake >= 2.8 (or 3.3+ if you want to use precompiled headers on windows) * miniupnp library (for upnp support) +* [websocketpp](https://github.com/zaphoyd/websocketpp/) (for websocket ui) diff --git a/docs/building/unix.md b/docs/building/unix.md index b7a638f5..17623159 100644 --- a/docs/building/unix.md +++ b/docs/building/unix.md @@ -46,6 +46,9 @@ Available CMake options(each option has a form of `=`, for more info * `WITH_AESNI` build with AES-NI support (ON/OFF) * `WITH_HARDENING` enable hardening features (ON/OFF) (gcc only) * `WITH_PCH` use pre-compiled header (experimental, speeds up build) +* `WITH_I2LUA` used when building i2lua +* `WITH_WEBSOCKETS` enable websocket server + Also there is `-L` flag for CMake that could be used to list current cached options: diff --git a/filelist.mk b/filelist.mk index cb1263e3..94ce2f22 100644 --- a/filelist.mk +++ b/filelist.mk @@ -5,7 +5,7 @@ LIB_SRC = \ SSUSession.cpp SSUData.cpp Streaming.cpp Identity.cpp TransitTunnel.cpp \ Transports.cpp Tunnel.cpp TunnelEndpoint.cpp TunnelPool.cpp TunnelGateway.cpp \ Destination.cpp Base.cpp I2PEndian.cpp FS.cpp Config.cpp Family.cpp \ - Config.cpp HTTP.cpp Timestamp.cpp util.cpp api.cpp + Config.cpp HTTP.cpp Timestamp.cpp util.cpp api.cpp Event.cpp LIB_CLIENT_SRC = \ AddressBook.cpp BOB.cpp ClientContext.cpp I2PTunnel.cpp I2PService.cpp \ diff --git a/qt/i2pd_qt/i2pd_qt.pro b/qt/i2pd_qt/i2pd_qt.pro index 0972f65c..143c7b8d 100644 --- a/qt/i2pd_qt/i2pd_qt.pro +++ b/qt/i2pd_qt/i2pd_qt.pro @@ -36,7 +36,7 @@ SOURCES += DaemonQT.cpp mainwindow.cpp \ ../../SSUData.cpp ../../SSUSession.cpp ../../Streaming.cpp ../../TransitTunnel.cpp \ ../../Transports.cpp ../../Tunnel.cpp ../../TunnelEndpoint.cpp ../../TunnelGateway.cpp \ ../../TunnelPool.cpp ../../UPnP.cpp ../../Gzip.cpp ../../Timestamp.cpp ../../util.cpp \ - ../../i2pd.cpp + ../../Event.cpp ../../i2pd.cpp HEADERS += DaemonQT.h mainwindow.h \ ../../HTTPServer.h ../../I2PControl.h ../../UPnP.h ../../Daemon.h ../../Config.h \ @@ -50,7 +50,7 @@ HEADERS += DaemonQT.h mainwindow.h \ ../../Streaming.h ../../Timestamp.h ../../TransitTunnel.h ../../Transports.h \ ../../TransportSession.h ../../Tunnel.h ../../TunnelBase.h ../../TunnelConfig.h \ ../../TunnelEndpoint.h ../../TunnelGateway.h ../../TunnelPool.h ../../UPnP.h \ - ../../util.h ../../version.h ../../Gzip.h ../../Tag.h + ../../util.h ../../version.h ../../Gzip.h ../../Tag.h ../../Event.h FORMS += mainwindow.ui