Using shared_ptr for ProviderManager to prevent object lifetime issues

This commit is contained in:
Dan Paulat 2022-06-04 08:16:47 -05:00
parent 655e9d0634
commit bb3b9094b8

View file

@ -104,7 +104,8 @@ public:
level3ProductRecordsMap_ {}, level3ProductRecordsMap_ {},
level2ProductRecordMutex_ {}, level2ProductRecordMutex_ {},
level3ProductRecordMutex_ {}, level3ProductRecordMutex_ {},
level2ProviderManager_ {common::RadarProductGroup::Level2}, level2ProviderManager_ {
std::make_shared<ProviderManager>(common::RadarProductGroup::Level2)},
level3ProviderManagerMap_ {}, level3ProviderManagerMap_ {},
level3ProviderManagerMutex_ {}, level3ProviderManagerMutex_ {},
initializeMutex_ {}, initializeMutex_ {},
@ -117,12 +118,12 @@ public:
radarSite_ = std::make_shared<config::RadarSite>(); radarSite_ = std::make_shared<config::RadarSite>();
} }
level2ProviderManager_.provider_ = level2ProviderManager_->provider_ =
provider::NexradDataProviderFactory::CreateLevel2DataProvider(radarId); provider::NexradDataProviderFactory::CreateLevel2DataProvider(radarId);
} }
~RadarProductManagerImpl() ~RadarProductManagerImpl()
{ {
level2ProviderManager_.Disable(); level2ProviderManager_->Disable();
std::shared_lock lock(level3ProviderManagerMutex_); std::shared_lock lock(level3ProviderManagerMutex_);
std::for_each(std::execution::par_unseq, std::for_each(std::execution::par_unseq,
@ -131,15 +132,16 @@ public:
[](auto& p) [](auto& p)
{ {
auto& [key, providerManager] = p; auto& [key, providerManager] = p;
providerManager.Disable(); providerManager->Disable();
}); });
} }
RadarProductManager* self_; RadarProductManager* self_;
void EnableRefresh(RadarProductManagerImpl::ProviderManager& providerManager, void EnableRefresh(
bool enabled); std::shared_ptr<RadarProductManagerImpl::ProviderManager> providerManager,
void RefreshData(ProviderManager& providerManager); bool enabled);
void RefreshData(std::shared_ptr<ProviderManager> providerManager);
std::shared_ptr<types::RadarProductRecord> std::shared_ptr<types::RadarProductRecord>
GetLevel2ProductRecord(std::chrono::system_clock::time_point time); GetLevel2ProductRecord(std::chrono::system_clock::time_point time);
@ -149,13 +151,12 @@ public:
std::shared_ptr<types::RadarProductRecord> std::shared_ptr<types::RadarProductRecord>
StoreRadarProductRecord(std::shared_ptr<types::RadarProductRecord> record); StoreRadarProductRecord(std::shared_ptr<types::RadarProductRecord> record);
void void LoadProviderData(std::chrono::system_clock::time_point time,
LoadProviderData(std::chrono::system_clock::time_point time, std::shared_ptr<ProviderManager> providerManager,
RadarProductManagerImpl::ProviderManager& providerManager, RadarProductRecordMap& recordMap,
RadarProductRecordMap& recordMap, std::shared_mutex& recordMutex,
std::shared_mutex& recordMutex, std::mutex& loadDataMutex,
std::mutex& loadDataMutex, std::shared_ptr<request::NexradFileRequest> request);
std::shared_ptr<request::NexradFileRequest> request);
static void static void
LoadNexradFile(CreateNexradFileFunction load, LoadNexradFile(CreateNexradFileFunction load,
@ -177,9 +178,10 @@ public:
std::shared_mutex level2ProductRecordMutex_; std::shared_mutex level2ProductRecordMutex_;
std::shared_mutex level3ProductRecordMutex_; std::shared_mutex level3ProductRecordMutex_;
ProviderManager level2ProviderManager_; std::shared_ptr<ProviderManager> level2ProviderManager_;
std::unordered_map<std::string, ProviderManager> level3ProviderManagerMap_; std::unordered_map<std::string, std::shared_ptr<ProviderManager>>
std::shared_mutex level3ProviderManagerMutex_; level3ProviderManagerMap_;
std::shared_mutex level3ProviderManagerMutex_;
std::mutex initializeMutex_; std::mutex initializeMutex_;
std::mutex loadLevel2DataMutex_; std::mutex loadLevel2DataMutex_;
@ -368,14 +370,16 @@ void RadarProductManager::EnableRefresh(common::RadarProductGroup group,
p->level3ProviderManagerMap_.emplace( p->level3ProviderManagerMap_.emplace(
std::piecewise_construct, std::piecewise_construct,
std::forward_as_tuple(product), std::forward_as_tuple(product),
std::forward_as_tuple(common::RadarProductGroup::Level3, product)); std::forward_as_tuple(
p->level3ProviderManagerMap_.at(product).provider_ = std::make_shared<RadarProductManagerImpl::ProviderManager>(
common::RadarProductGroup::Level3, product)));
p->level3ProviderManagerMap_.at(product)->provider_ =
provider::NexradDataProviderFactory::CreateLevel3DataProvider( provider::NexradDataProviderFactory::CreateLevel3DataProvider(
p->radarId_, product); p->radarId_, product);
} }
RadarProductManagerImpl::ProviderManager& providerManager = std::shared_ptr<RadarProductManagerImpl::ProviderManager>
p->level3ProviderManagerMap_.at(product); providerManager = p->level3ProviderManagerMap_.at(product);
lock.unlock(); lock.unlock();
@ -383,12 +387,12 @@ void RadarProductManager::EnableRefresh(common::RadarProductGroup group,
} }
} }
void RadarProductManagerImpl::EnableRefresh(ProviderManager& providerManager, void RadarProductManagerImpl::EnableRefresh(
bool enabled) std::shared_ptr<ProviderManager> providerManager, bool enabled)
{ {
if (providerManager.refreshEnabled_ != enabled) if (providerManager->refreshEnabled_ != enabled)
{ {
providerManager.refreshEnabled_ = enabled; providerManager->refreshEnabled_ = enabled;
if (enabled) if (enabled)
{ {
@ -397,29 +401,32 @@ void RadarProductManagerImpl::EnableRefresh(ProviderManager& providerManager,
} }
} }
void RadarProductManagerImpl::RefreshData(ProviderManager& providerManager) void RadarProductManagerImpl::RefreshData(
std::shared_ptr<ProviderManager> providerManager)
{ {
logger_->debug("RefreshData: {}", providerManager.name()); logger_->debug("RefreshData: {}", providerManager->name());
{ {
std::unique_lock lock(providerManager.refreshTimerMutex_); std::unique_lock lock(providerManager->refreshTimerMutex_);
providerManager.refreshTimer_.cancel(); providerManager->refreshTimer_.cancel();
} }
util::async( util::async(
[&]() [=]()
{ {
auto [newObjects, totalObjects] = providerManager.provider_->Refresh(); auto [newObjects, totalObjects] =
providerManager->provider_->Refresh();
std::chrono::milliseconds interval = kRetryInterval_; std::chrono::milliseconds interval = kRetryInterval_;
if (newObjects > 0) if (newObjects > 0)
{ {
std::string key = providerManager.provider_->FindLatestKey(); std::string key = providerManager->provider_->FindLatestKey();
auto latestTime = providerManager.provider_->GetTimePointByKey(key); auto latestTime =
providerManager->provider_->GetTimePointByKey(key);
auto updatePeriod = providerManager.provider_->update_period(); auto updatePeriod = providerManager->provider_->update_period();
auto lastModified = providerManager.provider_->last_modified(); auto lastModified = providerManager->provider_->last_modified();
interval = std::chrono::duration_cast<std::chrono::milliseconds>( interval = std::chrono::duration_cast<std::chrono::milliseconds>(
updatePeriod - updatePeriod -
(std::chrono::system_clock::now() - lastModified)); (std::chrono::system_clock::now() - lastModified));
@ -429,29 +436,29 @@ void RadarProductManagerImpl::RefreshData(ProviderManager& providerManager)
} }
emit self_->NewDataAvailable( emit self_->NewDataAvailable(
providerManager.group_, providerManager.product_, latestTime); providerManager->group_, providerManager->product_, latestTime);
} }
else if (providerManager.refreshEnabled_ && totalObjects == 0) else if (providerManager->refreshEnabled_ && totalObjects == 0)
{ {
logger_->info("[{}] No data found, disabling refresh", logger_->info("[{}] No data found, disabling refresh",
providerManager.name()); providerManager->name());
providerManager.refreshEnabled_ = false; providerManager->refreshEnabled_ = false;
} }
if (providerManager.refreshEnabled_) if (providerManager->refreshEnabled_)
{ {
std::unique_lock lock(providerManager.refreshTimerMutex_); std::unique_lock lock(providerManager->refreshTimerMutex_);
logger_->debug( logger_->debug(
"[{}] Scheduled refresh in {:%M:%S}", "[{}] Scheduled refresh in {:%M:%S}",
providerManager.name(), providerManager->name(),
std::chrono::duration_cast<std::chrono::seconds>(interval)); std::chrono::duration_cast<std::chrono::seconds>(interval));
{ {
providerManager.refreshTimer_.expires_after(interval); providerManager->refreshTimer_.expires_after(interval);
providerManager.refreshTimer_.async_wait( providerManager->refreshTimer_.async_wait(
[&](const boost::system::error_code& e) [=](const boost::system::error_code& e)
{ {
if (e == boost::system::errc::success) if (e == boost::system::errc::success)
{ {
@ -460,12 +467,12 @@ void RadarProductManagerImpl::RefreshData(ProviderManager& providerManager)
else if (e == boost::asio::error::operation_aborted) else if (e == boost::asio::error::operation_aborted)
{ {
logger_->debug("[{}] Data refresh timer cancelled", logger_->debug("[{}] Data refresh timer cancelled",
providerManager.name()); providerManager->name());
} }
else else
{ {
logger_->warn("[{}] Data refresh timer error: {}", logger_->warn("[{}] Data refresh timer error: {}",
providerManager.name(), providerManager->name(),
e.message()); e.message());
} }
}); });
@ -476,18 +483,18 @@ void RadarProductManagerImpl::RefreshData(ProviderManager& providerManager)
void RadarProductManagerImpl::LoadProviderData( void RadarProductManagerImpl::LoadProviderData(
std::chrono::system_clock::time_point time, std::chrono::system_clock::time_point time,
RadarProductManagerImpl::ProviderManager& providerManager, std::shared_ptr<ProviderManager> providerManager,
RadarProductRecordMap& recordMap, RadarProductRecordMap& recordMap,
std::shared_mutex& recordMutex, std::shared_mutex& recordMutex,
std::mutex& loadDataMutex, std::mutex& loadDataMutex,
std::shared_ptr<request::NexradFileRequest> request) std::shared_ptr<request::NexradFileRequest> request)
{ {
logger_->debug("LoadProviderData: {}, {}", logger_->debug("LoadProviderData: {}, {}",
providerManager.name(), providerManager->name(),
util::TimeString(time)); util::TimeString(time));
RadarProductManagerImpl::LoadNexradFile( RadarProductManagerImpl::LoadNexradFile(
[=, &providerManager, &recordMap, &recordMutex, &loadDataMutex]() [=, &recordMap, &recordMutex, &loadDataMutex]()
-> std::shared_ptr<wsr88d::NexradFile> -> std::shared_ptr<wsr88d::NexradFile>
{ {
std::shared_ptr<types::RadarProductRecord> existingRecord = nullptr; std::shared_ptr<types::RadarProductRecord> existingRecord = nullptr;
@ -508,8 +515,8 @@ void RadarProductManagerImpl::LoadProviderData(
if (existingRecord == nullptr) if (existingRecord == nullptr)
{ {
std::string key = providerManager.provider_->FindKey(time); std::string key = providerManager->provider_->FindKey(time);
nexradFile = providerManager.provider_->LoadObjectByKey(key); nexradFile = providerManager->provider_->LoadObjectByKey(key);
} }
else else
{ {