sync StreamCreate

This commit is contained in:
orignal 2022-11-08 19:52:43 -05:00
parent 851be41d0d
commit 9fd60b52f1
3 changed files with 49 additions and 34 deletions

View file

@ -1129,6 +1129,35 @@ namespace client
}); });
} }
template<typename Dest>
std::shared_ptr<i2p::stream::Stream> ClientDestination::CreateStreamSync (const Dest& dest, int port)
{
std::shared_ptr<i2p::stream::Stream> stream;
std::condition_variable streamRequestComplete;
std::mutex streamRequestCompleteMutex;
std::unique_lock<std::mutex> l(streamRequestCompleteMutex);
CreateStream (
[&streamRequestComplete, &streamRequestCompleteMutex, &stream](std::shared_ptr<i2p::stream::Stream> s)
{
stream = s;
std::unique_lock<std::mutex> l(streamRequestCompleteMutex);
streamRequestComplete.notify_all ();
},
dest, port);
streamRequestComplete.wait (l);
return stream;
}
std::shared_ptr<i2p::stream::Stream> ClientDestination::CreateStream (const i2p::data::IdentHash& dest, int port)
{
return CreateStreamSync (dest, port);
}
std::shared_ptr<i2p::stream::Stream> ClientDestination::CreateStream (std::shared_ptr<const i2p::data::BlindedPublicKey> dest, int port)
{
return CreateStreamSync (dest, port);
}
std::shared_ptr<i2p::stream::Stream> ClientDestination::CreateStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port) std::shared_ptr<i2p::stream::Stream> ClientDestination::CreateStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port)
{ {
if (m_StreamingDestination) if (m_StreamingDestination)

View file

@ -247,6 +247,8 @@ namespace client
// following methods operate with default streaming destination // following methods operate with default streaming destination
void CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port = 0); void CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port = 0);
void CreateStream (StreamRequestComplete streamRequestComplete, std::shared_ptr<const i2p::data::BlindedPublicKey> dest, int port = 0); void CreateStream (StreamRequestComplete streamRequestComplete, std::shared_ptr<const i2p::data::BlindedPublicKey> dest, int port = 0);
std::shared_ptr<i2p::stream::Stream> CreateStream (const i2p::data::IdentHash& dest, int port = 0); // sync
std::shared_ptr<i2p::stream::Stream> CreateStream (std::shared_ptr<const i2p::data::BlindedPublicKey> dest, int port = 0); // sync
std::shared_ptr<i2p::stream::Stream> CreateStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0); std::shared_ptr<i2p::stream::Stream> CreateStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0);
void SendPing (const i2p::data::IdentHash& to); void SendPing (const i2p::data::IdentHash& to);
void SendPing (std::shared_ptr<const i2p::data::BlindedPublicKey> to); void SendPing (std::shared_ptr<const i2p::data::BlindedPublicKey> to);
@ -282,6 +284,9 @@ namespace client
void PersistTemporaryKeys (EncryptionKey * keys, bool isSingleKey); void PersistTemporaryKeys (EncryptionKey * keys, bool isSingleKey);
void ReadAuthKey (const std::string& group, const std::map<std::string, std::string> * params); void ReadAuthKey (const std::string& group, const std::map<std::string, std::string> * params);
template<typename Dest>
std::shared_ptr<i2p::stream::Stream> CreateStreamSync (const Dest& dest, int port);
private: private:
i2p::data::PrivateKeys m_Keys; i2p::data::PrivateKeys m_Keys;

View file

@ -833,40 +833,22 @@ namespace client
} }
else else
m_Ident = addr->identHash; m_Ident = addr->identHash;
/* this code block still needs some love */ // save url parts for later use
std::condition_variable newDataReceived; std::string dest_host = url.host;
std::mutex newDataReceivedMutex; int dest_port = url.port ? url.port : 80;
auto leaseSet = i2p::client::context.GetSharedLocalDestination ()->FindLeaseSet (m_Ident); // try to create stream to addressbook site
if (!leaseSet) auto stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream (m_Ident, dest_port);
if (!stream)
{ {
std::unique_lock<std::mutex> l(newDataReceivedMutex);
i2p::client::context.GetSharedLocalDestination ()->RequestDestination (m_Ident,
[&newDataReceived, &leaseSet, &newDataReceivedMutex](std::shared_ptr<i2p::data::LeaseSet> ls)
{
leaseSet = ls;
std::unique_lock<std::mutex> l1(newDataReceivedMutex);
newDataReceived.notify_all ();
});
if (newDataReceived.wait_for (l, std::chrono::seconds (SUBSCRIPTION_REQUEST_TIMEOUT)) == std::cv_status::timeout)
{
LogPrint (eLogError, "Addressbook: Subscription LeaseSet request timeout expired");
i2p::client::context.GetSharedLocalDestination ()->CancelDestinationRequest (m_Ident, false); // don't notify, because we know it already
return false;
}
}
if (!leaseSet) {
/* still no leaseset found */
LogPrint (eLogError, "Addressbook: LeaseSet for address ", url.host, " not found"); LogPrint (eLogError, "Addressbook: LeaseSet for address ", url.host, " not found");
return false; return false;
} }
if (m_Etag.empty() && m_LastModified.empty()) { if (m_Etag.empty() && m_LastModified.empty())
{
m_Book.GetEtag (m_Ident, m_Etag, m_LastModified); m_Book.GetEtag (m_Ident, m_Etag, m_LastModified);
LogPrint (eLogDebug, "Addressbook: Loaded for ", url.host, ": ETag: ", m_Etag, ", Last-Modified: ", m_LastModified); LogPrint (eLogDebug, "Addressbook: Loaded for ", url.host, ": ETag: ", m_Etag, ", Last-Modified: ", m_LastModified);
} }
/* save url parts for later use */ // create http request & send it
std::string dest_host = url.host;
int dest_port = url.port ? url.port : 80;
/* create http request & send it */
i2p::http::HTTPReq req; i2p::http::HTTPReq req;
req.AddHeader("Host", dest_host); req.AddHeader("Host", dest_host);
req.AddHeader("User-Agent", "Wget/1.11.4"); req.AddHeader("User-Agent", "Wget/1.11.4");
@ -877,15 +859,14 @@ namespace client
req.AddHeader("If-None-Match", m_Etag); req.AddHeader("If-None-Match", m_Etag);
if (!m_LastModified.empty()) if (!m_LastModified.empty())
req.AddHeader("If-Modified-Since", m_LastModified); req.AddHeader("If-Modified-Since", m_LastModified);
/* convert url to relative */ // convert url to relative
url.schema = ""; url.schema = "";
url.host = ""; url.host = "";
req.uri = url.to_string(); req.uri = url.to_string();
req.version = "HTTP/1.1"; req.version = "HTTP/1.1";
auto stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream (leaseSet, dest_port);
std::string request = req.to_string(); std::string request = req.to_string();
stream->Send ((const uint8_t *) request.data(), request.length()); stream->Send ((const uint8_t *) request.data(), request.length());
/* read response */ // read response
std::string response; std::string response;
uint8_t recv_buf[4096]; uint8_t recv_buf[4096];
bool end = false; bool end = false;
@ -910,7 +891,7 @@ namespace client
// process remaining buffer // process remaining buffer
while (size_t len = stream->ReadSome (recv_buf, sizeof(recv_buf))) while (size_t len = stream->ReadSome (recv_buf, sizeof(recv_buf)))
response.append ((char *)recv_buf, len); response.append ((char *)recv_buf, len);
/* parse response */ // parse response
i2p::http::HTTPRes res; i2p::http::HTTPRes res;
int res_head_len = res.parse(response); int res_head_len = res.parse(response);
if (res_head_len < 0) if (res_head_len < 0)
@ -923,7 +904,7 @@ namespace client
LogPrint(eLogError, "Addressbook: Incomplete http response from ", dest_host, ", interrupted by timeout"); LogPrint(eLogError, "Addressbook: Incomplete http response from ", dest_host, ", interrupted by timeout");
return false; return false;
} }
/* assert: res_head_len > 0 */ // assert: res_head_len > 0
response.erase(0, res_head_len); response.erase(0, res_head_len);
if (res.code == 304) if (res.code == 304)
{ {
@ -946,7 +927,7 @@ namespace client
LogPrint(eLogError, "Addressbook: Response size mismatch, expected: ", len, ", got: ", response.length(), "bytes"); LogPrint(eLogError, "Addressbook: Response size mismatch, expected: ", len, ", got: ", response.length(), "bytes");
return false; return false;
} }
/* assert: res.code == 200 */ // assert: res.code == 200
auto it = res.headers.find("ETag"); auto it = res.headers.find("ETag");
if (it != res.headers.end()) m_Etag = it->second; if (it != res.headers.end()) m_Etag = it->second;
it = res.headers.find("Last-Modified"); it = res.headers.find("Last-Modified");