wait for confirmantion of publishing

This commit is contained in:
orignal 2014-11-28 13:01:35 -05:00
parent 3e3cfa3d68
commit e1c25fedb0
4 changed files with 94 additions and 33 deletions

View file

@ -12,7 +12,8 @@ namespace client
{ {
ClientDestination::ClientDestination (bool isPublic, i2p::data::SigningKeyType sigType): ClientDestination::ClientDestination (bool isPublic, i2p::data::SigningKeyType sigType):
m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr),
m_LeaseSet (nullptr), m_IsPublic (isPublic), m_DatagramDestination (nullptr) m_LeaseSet (nullptr), m_IsPublic (isPublic), m_PublishReplyToken (0),
m_DatagramDestination (nullptr), m_PublishConfirmationTimer (nullptr)
{ {
m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (sigType); m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (sigType);
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
@ -25,7 +26,8 @@ namespace client
ClientDestination::ClientDestination (const std::string& fullPath, bool isPublic): ClientDestination::ClientDestination (const std::string& fullPath, bool isPublic):
m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr),
m_LeaseSet (nullptr), m_IsPublic (isPublic), m_DatagramDestination (nullptr) m_LeaseSet (nullptr), m_IsPublic (isPublic), m_PublishReplyToken (0),
m_DatagramDestination (nullptr), m_PublishConfirmationTimer (nullptr)
{ {
std::ifstream s(fullPath.c_str (), std::ifstream::binary); std::ifstream s(fullPath.c_str (), std::ifstream::binary);
if (s.is_open ()) if (s.is_open ())
@ -61,7 +63,8 @@ namespace client
ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic): ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic):
m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr),
m_Keys (keys), m_LeaseSet (nullptr), m_IsPublic (isPublic), m_DatagramDestination (nullptr) m_Keys (keys), m_LeaseSet (nullptr), m_IsPublic (isPublic), m_PublishReplyToken (0),
m_DatagramDestination (nullptr), m_PublishConfirmationTimer (nullptr)
{ {
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
@ -80,6 +83,7 @@ namespace client
i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool);
delete m_LeaseSet; delete m_LeaseSet;
delete m_Work; delete m_Work;
delete m_PublishConfirmationTimer;
delete m_Service; delete m_Service;
delete m_StreamingDestination; delete m_StreamingDestination;
delete m_DatagramDestination; delete m_DatagramDestination;
@ -94,6 +98,7 @@ namespace client
void ClientDestination::Start () void ClientDestination::Start ()
{ {
m_Service = new boost::asio::io_service; m_Service = new boost::asio::io_service;
m_PublishConfirmationTimer = new boost::asio::deadline_timer (*m_Service);
m_Work = new boost::asio::io_service::work (*m_Service); m_Work = new boost::asio::io_service::work (*m_Service);
m_Pool->SetActive (true); m_Pool->SetActive (true);
m_IsRunning = true; m_IsRunning = true;
@ -113,6 +118,8 @@ namespace client
if (m_Pool) if (m_Pool)
i2p::tunnel::tunnels.StopTunnelPool (m_Pool); i2p::tunnel::tunnels.StopTunnelPool (m_Pool);
m_IsRunning = false; m_IsRunning = false;
delete m_PublishConfirmationTimer;
m_PublishConfirmationTimer = nullptr;
if (m_Service) if (m_Service)
m_Service->stop (); m_Service->stop ();
if (m_Thread) if (m_Thread)
@ -209,21 +216,35 @@ namespace client
offset += 36; offset += 36;
if (msg->type == 1) // LeaseSet if (msg->type == 1) // LeaseSet
{ {
LogPrint ("Remote LeaseSet"); LogPrint (eLogDebug, "Remote LeaseSet");
auto it = m_RemoteLeaseSets.find (msg->key); auto it = m_RemoteLeaseSets.find (msg->key);
if (it != m_RemoteLeaseSets.end ()) if (it != m_RemoteLeaseSets.end ())
{ {
it->second->Update (buf + offset, len - offset); it->second->Update (buf + offset, len - offset);
LogPrint ("Remote LeaseSet updated"); LogPrint (eLogDebug, "Remote LeaseSet updated");
} }
else else
{ {
LogPrint ("New remote LeaseSet added"); LogPrint (eLogDebug, "New remote LeaseSet added");
m_RemoteLeaseSets[msg->key] = new i2p::data::LeaseSet (buf + offset, len - offset); m_RemoteLeaseSets[msg->key] = new i2p::data::LeaseSet (buf + offset, len - offset);
} }
} }
else else
LogPrint ("Unexpected client's DatabaseStore type ", msg->type, ". Dropped"); LogPrint (eLogError, "Unexpected client's DatabaseStore type ", msg->type, ". Dropped");
}
void ClientDestination::HandleDeliveryStatusMessage (I2NPMessage * msg)
{
I2NPDeliveryStatusMsg * deliveryStatus = (I2NPDeliveryStatusMsg *)msg->GetPayload ();
uint32_t msgID = be32toh (deliveryStatus->msgID);
if (msgID == m_PublishReplyToken)
{
LogPrint (eLogDebug, "Publishing confirmed");
m_PublishReplyToken = 0;
i2p::DeleteI2NPMessage (msg);
}
else
i2p::garlic::GarlicDestination::HandleDeliveryStatusMessage (msg);
} }
void ClientDestination::SetLeaseSetUpdated () void ClientDestination::SetLeaseSetUpdated ()
@ -231,7 +252,62 @@ namespace client
i2p::garlic::GarlicDestination::SetLeaseSetUpdated (); i2p::garlic::GarlicDestination::SetLeaseSetUpdated ();
UpdateLeaseSet (); UpdateLeaseSet ();
if (m_IsPublic) if (m_IsPublic)
i2p::data::netdb.PublishLeaseSet (m_LeaseSet, m_Pool); Publish ();
}
void ClientDestination::Publish ()
{
if (!m_LeaseSet || !m_Pool)
{
LogPrint (eLogError, "Can't publish non-existing LeaseSet");
return;
}
if (m_PublishReplyToken)
{
LogPrint (eLogInfo, "Publishing is pending");
return;
}
auto outbound = m_Pool->GetNextOutboundTunnel ();
if (!outbound)
{
LogPrint ("Can't publish LeaseSet. No outbound tunnels");
return;
}
std::set<i2p::data::IdentHash> excluded;
auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), excluded);
if (!floodfill)
{
LogPrint ("Can't publish LeaseSet. No floodfills found");
return;
}
LogPrint (eLogDebug, "Publish LeaseSet of ", GetIdentHash ().ToBase32 ());
m_PublishReplyToken = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
auto msg = WrapMessage (*floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken));
if (m_PublishConfirmationTimer)
{
m_PublishConfirmationTimer->expires_from_now (boost::posix_time::seconds(PUBLISH_CONFIRMATION_TIMEOUT));
m_PublishConfirmationTimer->async_wait (std::bind (&ClientDestination::HandlePublishConfirmationTimer,
this, std::placeholders::_1));
}
else
{
LogPrint (eLogWarning, "Destination's thread is not running");
m_PublishReplyToken = 0;
}
outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg);
}
void ClientDestination::HandlePublishConfirmationTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
if (m_PublishReplyToken)
{
LogPrint (eLogWarning, "Publish confirmation was not received in ", PUBLISH_CONFIRMATION_TIMEOUT, "seconds. Try again");
m_PublishReplyToken = 0;
Publish ();
}
}
} }
void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len) void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len)

View file

@ -19,6 +19,7 @@ namespace client
const uint8_t PROTOCOL_TYPE_STREAMING = 6; const uint8_t PROTOCOL_TYPE_STREAMING = 6;
const uint8_t PROTOCOL_TYPE_DATAGRAM = 17; const uint8_t PROTOCOL_TYPE_DATAGRAM = 17;
const uint8_t PROTOCOL_TYPE_RAW = 18; const uint8_t PROTOCOL_TYPE_RAW = 18;
const int PUBLISH_CONFIRMATION_TIMEOUT = 5; // in seconds
class ClientDestination: public i2p::garlic::GarlicDestination class ClientDestination: public i2p::garlic::GarlicDestination
{ {
@ -69,7 +70,10 @@ namespace client
void Run (); void Run ();
void UpdateLeaseSet (); void UpdateLeaseSet ();
void HandleDatabaseStoreMessage (const uint8_t * buf, size_t len); void Publish ();
void HandlePublishConfirmationTimer (const boost::system::error_code& ecode);
void HandleDatabaseStoreMessage (const uint8_t * buf, size_t len);
void HandleDeliveryStatusMessage (I2NPMessage * msg);
private: private:
@ -84,10 +88,13 @@ namespace client
i2p::tunnel::TunnelPool * m_Pool; i2p::tunnel::TunnelPool * m_Pool;
i2p::data::LeaseSet * m_LeaseSet; i2p::data::LeaseSet * m_LeaseSet;
bool m_IsPublic; bool m_IsPublic;
uint32_t m_PublishReplyToken;
i2p::stream::StreamingDestination * m_StreamingDestination; i2p::stream::StreamingDestination * m_StreamingDestination;
i2p::datagram::DatagramDestination * m_DatagramDestination; i2p::datagram::DatagramDestination * m_DatagramDestination;
boost::asio::deadline_timer * m_PublishConfirmationTimer;
public: public:
// for HTTP only // for HTTP only

View file

@ -896,26 +896,5 @@ namespace data
it++; it++;
} }
} }
void NetDb::PublishLeaseSet (const LeaseSet * leaseSet, i2p::tunnel::TunnelPool * pool)
{
if (!leaseSet || !pool) return;
auto outbound = pool->GetNextOutboundTunnel ();
if (!outbound)
{
LogPrint ("Can't publish LeaseSet. No outbound tunnels");
return;
}
std::set<IdentHash> excluded;
auto floodfill = GetClosestFloodfill (leaseSet->GetIdentHash (), excluded);
if (!floodfill)
{
LogPrint ("Can't publish LeaseSet. No floodfills found");
return;
}
uint32_t replyToken = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
auto msg = pool->GetGarlicDestination ().WrapMessage (*floodfill, i2p::CreateDatabaseStoreMsg (leaseSet, replyToken));
outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg);
}
} }
} }

View file

@ -67,7 +67,6 @@ namespace data
std::shared_ptr<RouterInfo> FindRouter (const IdentHash& ident) const; std::shared_ptr<RouterInfo> FindRouter (const IdentHash& ident) const;
LeaseSet * FindLeaseSet (const IdentHash& destination) const; LeaseSet * FindLeaseSet (const IdentHash& destination) const;
void PublishLeaseSet (const LeaseSet * leaseSet, i2p::tunnel::TunnelPool * pool);
void RequestDestination (const IdentHash& destination, bool isLeaseSet = false, void RequestDestination (const IdentHash& destination, bool isLeaseSet = false,
i2p::tunnel::TunnelPool * pool = nullptr); i2p::tunnel::TunnelPool * pool = nullptr);
@ -78,6 +77,7 @@ namespace data
std::shared_ptr<const RouterInfo> GetRandomRouter () const; std::shared_ptr<const RouterInfo> GetRandomRouter () const;
std::shared_ptr<const RouterInfo> GetRandomRouter (std::shared_ptr<const RouterInfo> compatibleWith) const; std::shared_ptr<const RouterInfo> GetRandomRouter (std::shared_ptr<const RouterInfo> compatibleWith) const;
std::shared_ptr<const RouterInfo> GetHighBandwidthRandomRouter (std::shared_ptr<const RouterInfo> compatibleWith) const; std::shared_ptr<const RouterInfo> GetHighBandwidthRandomRouter (std::shared_ptr<const RouterInfo> compatibleWith) const;
std::shared_ptr<const RouterInfo> GetClosestFloodfill (const IdentHash& destination, const std::set<IdentHash>& excluded) const;
void SetUnreachable (const IdentHash& ident, bool unreachable); void SetUnreachable (const IdentHash& ident, bool unreachable);
void PostI2NPMsg (I2NPMessage * msg); void PostI2NPMsg (I2NPMessage * msg);
@ -95,7 +95,6 @@ namespace data
void Run (); // exploratory thread void Run (); // exploratory thread
void Explore (int numDestinations); void Explore (int numDestinations);
void Publish (); void Publish ();
std::shared_ptr<const RouterInfo> GetClosestFloodfill (const IdentHash& destination, const std::set<IdentHash>& excluded) const;
void ManageLeaseSets (); void ManageLeaseSets ();
RequestedDestination * CreateRequestedDestination (const IdentHash& dest, RequestedDestination * CreateRequestedDestination (const IdentHash& dest,