delete stream in shared destination's thread. steady_timer instead deadline_timer

This commit is contained in:
orignal
2026-03-01 18:41:16 -05:00
parent 4387e74a34
commit 39ad36d901
2 changed files with 52 additions and 50 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2025, The PurpleI2P Project
* Copyright (c) 2013-2026, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
@@ -63,7 +63,7 @@ namespace client
void SaveEtag (const i2p::data::IdentHash& subscription, const std::string& etag, const std::string& lastModified) override;
bool GetEtag (const i2p::data::IdentHash& subscription, std::string& etag, std::string& lastModified) override;
void ResetEtags () override;
private:
int LoadFromFile (const std::string& filename, Addresses& addresses); // returns -1 if can't open file, otherwise number of records
@@ -74,7 +74,7 @@ namespace client
std::string etagsPath, indexPath, localPath;
bool m_IsPersist;
std::string m_HostsFile; // file to dump hosts.txt, empty if not used
std::unordered_map<i2p::data::IdentHash, std::pair<std::vector<uint8_t>, uint64_t> > m_FullAddressCache; // ident hash -> (full ident buffer, last access timestamp)
std::unordered_map<i2p::data::IdentHash, std::pair<std::vector<uint8_t>, uint64_t> > m_FullAddressCache; // ident hash -> (full ident buffer, last access timestamp)
std::mutex m_FullAddressCacheMutex;
};
@@ -106,9 +106,9 @@ namespace client
{
it->second.second = ts;
return std::make_shared<i2p::data::IdentityEx>(it->second.first.data (), it->second.first.size ());
}
}
}
if (!m_IsPersist)
{
LogPrint(eLogDebug, "Addressbook: Persistence is disabled");
@@ -116,7 +116,7 @@ namespace client
}
std::string filename = storage.Path(ident.ToBase32());
std::ifstream f(filename, std::ifstream::binary);
if (!f.is_open ())
if (!f.is_open ())
{
LogPrint(eLogDebug, "Addressbook: Requested, but not found: ", filename);
return nullptr;
@@ -124,7 +124,7 @@ namespace client
f.seekg (0,std::ios::end);
size_t len = f.tellg ();
if (len < i2p::data::DEFAULT_IDENTITY_SIZE)
if (len < i2p::data::DEFAULT_IDENTITY_SIZE)
{
LogPrint (eLogError, "Addressbook: File ", filename, " is too short: ", len);
return nullptr;
@@ -136,11 +136,11 @@ namespace client
{
LogPrint (eLogError, "Addressbook: Couldn't read ", filename);
return nullptr;
}
}
{
std::lock_guard<std::mutex> l(m_FullAddressCacheMutex);
m_FullAddressCache.try_emplace (ident, buf, ts);
}
}
return std::make_shared<i2p::data::IdentityEx>(buf.data (), len);
}
@@ -162,13 +162,13 @@ namespace client
{
std::string path = storage.Path(address->GetIdentHash().ToBase32());
std::ofstream f (path, std::ofstream::binary | std::ofstream::out);
if (!f.is_open ())
if (!f.is_open ())
{
LogPrint (eLogError, "Addressbook: Can't open file ", path);
return;
}
f.write ((const char *)buf.data (), len);
}
}
}
void AddressBookFilesystemStorage::RemoveAddress (const i2p::data::IdentHash& ident)
@@ -330,9 +330,9 @@ namespace client
it = m_FullAddressCache.erase (it);
else
it++;
}
}
}
}
//---------------------------------------------------------------------
Address::Address (std::string_view b32):
@@ -401,14 +401,14 @@ namespace client
{
m_AddressCacheUpdateTimer->cancel ();
m_AddressCacheUpdateTimer = nullptr;
}
}
bool isDownloading = m_Downloading.valid ();
if (isDownloading)
{
if (m_Downloading.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
isDownloading = false;
else
{
else
{
LogPrint (eLogInfo, "Addressbook: Subscriptions are downloading, abort");
for (int i = 0; i < 30; i++)
{
@@ -419,7 +419,7 @@ namespace client
break;
}
}
}
}
if (!isDownloading)
m_Downloading.get ();
else
@@ -443,12 +443,12 @@ namespace client
auto addr = std::make_shared<const Address>(address.substr (0, pos));
return addr->IsValid () ? addr : nullptr;
}
else
else
#if __cplusplus >= 202002L // C++20
if (address.ends_with (".i2p"))
#else
if (address.find (".i2p") != std::string::npos)
#endif
#endif
{
if (!m_IsEnabled) return nullptr;
auto addr = FindAddress (address);
@@ -483,14 +483,14 @@ namespace client
i2p::data::IdentHash identHash;
if (identHash.FromBase32(jump.substr (0, pos)) && identHash == addr->identHash)
return true;
}
}
else
{
{
i2p::data::IdentityEx ident;
if (ident.FromBase64 (jump) && ident.GetIdentHash () == addr->identHash)
return true;
}
return false;
}
@@ -575,9 +575,9 @@ namespace client
addr = addr.substr(0, pos); // remove comments
#if __cplusplus >= 202002L // C++20
if (name.ends_with (".b32.i2p"))
#else
#else
if (name.find(".b32.i2p") != name.npos)
#endif
#endif
{
LogPrint (eLogError, "Addressbook: Skipped adding of b32 address: ", name);
continue;
@@ -585,16 +585,16 @@ namespace client
#if __cplusplus >= 202002L // C++20
if (!name.ends_with (".i2p"))
#else
#else
if (name.find(".i2p") == name.npos)
#endif
#endif
{
LogPrint (eLogError, "Addressbook: Malformed domain: ", name);
continue;
}
auto ident = std::make_shared<i2p::data::IdentityEx> ();
if (!ident->FromBase64(addr))
if (!ident->FromBase64(addr))
{
LogPrint (eLogError, "Addressbook: Malformed address ", addr, " for ", name);
incomplete = f.eof ();
@@ -609,10 +609,10 @@ namespace client
{
it->second->identHash = ident->GetIdentHash ();
if (m_Storage)
{
{
m_Storage->AddAddress (ident);
m_Storage->RemoveAddress (it->second->identHash);
}
}
LogPrint (eLogInfo, "Addressbook: Updated host: ", name);
}
}
@@ -733,7 +733,7 @@ namespace client
}
if (m_SubscriptionsUpdateTimer)
{
m_SubscriptionsUpdateTimer->expires_from_now (boost::posix_time::minutes(nextUpdateTimeout));
m_SubscriptionsUpdateTimer->expires_after (std::chrono::minutes(nextUpdateTimeout));
m_SubscriptionsUpdateTimer->async_wait (std::bind (&AddressBook::HandleSubscriptionsUpdateTimer,
this, std::placeholders::_1));
}
@@ -747,8 +747,8 @@ namespace client
auto dest = i2p::client::context.GetSharedLocalDestination ();
if (dest)
{
m_SubscriptionsUpdateTimer = std::make_unique<boost::asio::deadline_timer>(dest->GetService ());
m_SubscriptionsUpdateTimer->expires_from_now (boost::posix_time::minutes(INITIAL_SUBSCRIPTION_UPDATE_TIMEOUT));
m_SubscriptionsUpdateTimer = std::make_unique<boost::asio::steady_timer>(dest->GetService ());
m_SubscriptionsUpdateTimer->expires_after (std::chrono::minutes(INITIAL_SUBSCRIPTION_UPDATE_TIMEOUT));
m_SubscriptionsUpdateTimer->async_wait (std::bind (&AddressBook::HandleSubscriptionsUpdateTimer,
this, std::placeholders::_1));
}
@@ -776,7 +776,7 @@ namespace client
{
m_Downloading.get ();
isDownloading = false;
}
}
if (!isDownloading && dest->IsReady ())
{
if (!m_IsLoaded)
@@ -800,7 +800,7 @@ namespace client
else
{
// try it again later
m_SubscriptionsUpdateTimer->expires_from_now (boost::posix_time::minutes(INITIAL_SUBSCRIPTION_RETRY_TIMEOUT));
m_SubscriptionsUpdateTimer->expires_after (std::chrono::minutes(INITIAL_SUBSCRIPTION_RETRY_TIMEOUT));
m_SubscriptionsUpdateTimer->async_wait (std::bind (&AddressBook::HandleSubscriptionsUpdateTimer,
this, std::placeholders::_1));
}
@@ -904,23 +904,23 @@ namespace client
{
auto dest = i2p::client::context.GetSharedLocalDestination ();
if(dest)
m_AddressCacheUpdateTimer = std::make_unique<boost::asio::deadline_timer>(dest->GetService ());
}
m_AddressCacheUpdateTimer = std::make_unique<boost::asio::steady_timer>(dest->GetService ());
}
if (m_AddressCacheUpdateTimer)
{
m_AddressCacheUpdateTimer->expires_from_now (boost::posix_time::seconds(ADDRESS_CACHE_UPDATE_INTERVAL ));
{
m_AddressCacheUpdateTimer->expires_after (std::chrono::seconds(ADDRESS_CACHE_UPDATE_INTERVAL ));
m_AddressCacheUpdateTimer->async_wait (
[this](const boost::system::error_code& ecode)
[this](const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
if (m_Storage) m_Storage->CleanUpCache ();
ScheduleCacheUpdate ();
}
}
});
}
}
}
}
AddressBookSubscription::AddressBookSubscription (AddressBook& book, std::string_view link):
m_Book (book), m_Link (link)
{
@@ -1010,6 +1010,8 @@ namespace client
// process remaining buffer
while (size_t len = stream->ReadSome (recv_buf, sizeof(recv_buf)))
response.append ((char *)recv_buf, len);
// destroy stream in destination's thread
boost::asio::post (i2p::client::context.GetSharedLocalDestination ()->GetService (), [s = std::move (stream)](){});
// parse response
i2p::http::HTTPRes res;
int res_head_len = res.parse(response);

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2025, The PurpleI2P Project
* Copyright (c) 2013-2026, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
@@ -62,7 +62,7 @@ namespace client
public:
typedef std::map<std::string, std::shared_ptr<Address>, std::less<> > Addresses;
virtual ~AddressBookStorage () {};
virtual std::shared_ptr<const i2p::data::IdentityEx> GetAddress (const i2p::data::IdentHash& ident) = 0;
virtual void AddAddress (std::shared_ptr<const i2p::data::IdentityEx> address) = 0;
@@ -84,7 +84,7 @@ namespace client
class AddressBook
{
public:
AddressBook ();
~AddressBook ();
void Start ();
@@ -107,7 +107,7 @@ namespace client
bool GetEtag (const i2p::data::IdentHash& subscription, std::string& etag, std::string& lastModified);
bool IsEnabled () const { return m_IsEnabled; }
private:
void StartSubscriptions ();
@@ -124,7 +124,7 @@ namespace client
void HandleLookupResponse (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len);
void ScheduleCacheUpdate ();
private:
std::mutex m_AddressBookMutex;
@@ -138,7 +138,7 @@ namespace client
int m_NumRetries;
std::vector<std::shared_ptr<AddressBookSubscription> > m_Subscriptions;
std::shared_ptr<AddressBookSubscription> m_DefaultSubscription; // in case if we don't know any addresses yet
std::unique_ptr<boost::asio::deadline_timer> m_SubscriptionsUpdateTimer, m_AddressCacheUpdateTimer;
std::unique_ptr<boost::asio::steady_timer> m_SubscriptionsUpdateTimer, m_AddressCacheUpdateTimer;
bool m_IsEnabled;
};