Refactoring refresh capability to provider manager in preparation for level 3 refresh

This commit is contained in:
Dan Paulat 2022-05-30 17:09:56 -05:00
parent 5cfab59977
commit 780d13cefa
3 changed files with 102 additions and 54 deletions

View file

@ -62,6 +62,56 @@ static std::mutex fileLoadMutex_;
class RadarProductManagerImpl class RadarProductManagerImpl
{ {
public: public:
struct ProviderManager
{
explicit ProviderManager(common::RadarProductGroup group) :
ProviderManager(group, "???")
{
}
explicit ProviderManager(common::RadarProductGroup group,
const std::string& product) :
group_ {group},
product_ {product},
refreshEnabled_ {false},
refreshTimer_ {util::io_context()},
refreshTimerMutex_ {},
provider_ {nullptr}
{
}
~ProviderManager() = default;
void Disable()
{
std::unique_lock lock(refreshTimerMutex_);
refreshEnabled_ = false;
refreshTimer_.cancel();
}
std::string name() const
{
std::string name;
if (group_ == common::RadarProductGroup::Level3)
{
name = std::format(
"{}, {}", common::GetRadarProductGroupName(group_), product_);
}
else
{
name = common::GetRadarProductGroupName(group_);
}
return name;
}
const common::RadarProductGroup group_;
const std::string product_;
bool refreshEnabled_;
boost::asio::steady_timer refreshTimer_;
std::mutex refreshTimerMutex_;
std::shared_ptr<provider::NexradDataProvider> provider_;
};
explicit RadarProductManagerImpl(RadarProductManager* self, explicit RadarProductManagerImpl(RadarProductManager* self,
const std::string& radarId) : const std::string& radarId) :
self_ {self}, self_ {self},
@ -74,12 +124,8 @@ public:
level3ProductRecords_ {}, level3ProductRecords_ {},
level2ProductRecordMutex_ {}, level2ProductRecordMutex_ {},
level3ProductRecordMutex_ {}, level3ProductRecordMutex_ {},
level2DataRefreshEnabled_ {false}, level2Data_ {
level2DataRefreshTimer_ {util::io_context()}, std::make_shared<ProviderManager>(common::RadarProductGroup::Level2)},
level2DataRefreshTimerMutex_ {},
level2DataProvider_ {
provider::NexradDataProviderFactory::CreateLevel2DataProvider(
radarId)},
initializeMutex_ {}, initializeMutex_ {},
loadLevel2DataMutex_ {} loadLevel2DataMutex_ {}
{ {
@ -88,19 +134,15 @@ public:
logger_->warn("Radar site not found: \"{}\"", radarId_); logger_->warn("Radar site not found: \"{}\"", radarId_);
radarSite_ = std::make_shared<config::RadarSite>(); radarSite_ = std::make_shared<config::RadarSite>();
} }
level2Data_->provider_ =
provider::NexradDataProviderFactory::CreateLevel2DataProvider(radarId);
} }
~RadarProductManagerImpl() ~RadarProductManagerImpl() { level2Data_->Disable(); }
{
{
std::unique_lock lock(level2DataRefreshTimerMutex_);
level2DataRefreshEnabled_ = false;
level2DataRefreshTimer_.cancel();
}
}
RadarProductManager* self_; RadarProductManager* self_;
void RefreshLevel2Data(); void RefreshData(std::shared_ptr<ProviderManager> provider);
std::shared_ptr<types::RadarProductRecord> std::shared_ptr<types::RadarProductRecord>
GetLevel2ProductRecord(std::chrono::system_clock::time_point time); GetLevel2ProductRecord(std::chrono::system_clock::time_point time);
@ -129,10 +171,7 @@ public:
std::shared_mutex level2ProductRecordMutex_; std::shared_mutex level2ProductRecordMutex_;
std::shared_mutex level3ProductRecordMutex_; std::shared_mutex level3ProductRecordMutex_;
bool level2DataRefreshEnabled_; std::shared_ptr<ProviderManager> level2Data_;
boost::asio::steady_timer level2DataRefreshTimer_;
std::mutex level2DataRefreshTimerMutex_;
std::shared_ptr<provider::NexradDataProvider> level2DataProvider_;
std::mutex initializeMutex_; std::mutex initializeMutex_;
std::mutex loadLevel2DataMutex_; std::mutex loadLevel2DataMutex_;
@ -281,40 +320,41 @@ void RadarProductManager::Initialize()
void RadarProductManager::EnableLevel2Refresh(bool enabled) void RadarProductManager::EnableLevel2Refresh(bool enabled)
{ {
if (p->level2DataRefreshEnabled_ != enabled) if (p->level2Data_->refreshEnabled_ != enabled)
{ {
p->level2DataRefreshEnabled_ = enabled; p->level2Data_->refreshEnabled_ = enabled;
if (enabled) if (enabled)
{ {
p->RefreshLevel2Data(); p->RefreshData(p->level2Data_);
} }
} }
} }
void RadarProductManagerImpl::RefreshLevel2Data() void RadarProductManagerImpl::RefreshData(
std::shared_ptr<ProviderManager> provider)
{ {
logger_->debug("RefreshLevel2Data()"); logger_->debug("RefreshData: {}", provider->name());
{ {
std::unique_lock lock(level2DataRefreshTimerMutex_); std::unique_lock lock(provider->refreshTimerMutex_);
level2DataRefreshTimer_.cancel(); provider->refreshTimer_.cancel();
} }
util::async( util::async(
[&]() [=]()
{ {
auto [newObjects, totalObjects] = level2DataProvider_->Refresh(); auto [newObjects, totalObjects] = provider->provider_->Refresh();
std::chrono::milliseconds interval = kRetryInterval_; std::chrono::milliseconds interval = kRetryInterval_;
if (newObjects > 0) if (newObjects > 0)
{ {
std::string key = level2DataProvider_->FindLatestKey(); std::string key = provider->provider_->FindLatestKey();
auto latestTime = level2DataProvider_->GetTimePointByKey(key); auto latestTime = provider->provider_->GetTimePointByKey(key);
auto updatePeriod = level2DataProvider_->update_period(); auto updatePeriod = provider->provider_->update_period();
auto lastModified = level2DataProvider_->last_modified(); auto lastModified = provider->provider_->last_modified();
interval = std::chrono::duration_cast<std::chrono::milliseconds>( interval = std::chrono::duration_cast<std::chrono::milliseconds>(
updatePeriod - updatePeriod -
(std::chrono::system_clock::now() - lastModified)); (std::chrono::system_clock::now() - lastModified));
@ -323,39 +363,44 @@ void RadarProductManagerImpl::RefreshLevel2Data()
interval = kRetryInterval_; interval = kRetryInterval_;
} }
emit self_->NewLevel2DataAvailable(latestTime); emit self_->NewDataAvailable(
provider->group_, provider->product_, latestTime);
} }
else if (level2DataRefreshEnabled_ && totalObjects == 0) else if (provider->refreshEnabled_ && totalObjects == 0)
{ {
logger_->info("No level 2 data found, disabling refresh"); logger_->info("[{}] No data found, disabling refresh",
provider->name());
level2DataRefreshEnabled_ = false; provider->refreshEnabled_ = false;
} }
if (level2DataRefreshEnabled_) if (provider->refreshEnabled_)
{ {
std::unique_lock lock(level2DataRefreshTimerMutex_); std::unique_lock lock(provider->refreshTimerMutex_);
logger_->debug( logger_->debug(
"Scheduled refresh in {:%M:%S}", "[{}] Scheduled refresh in {:%M:%S}",
provider->name(),
std::chrono::duration_cast<std::chrono::seconds>(interval)); std::chrono::duration_cast<std::chrono::seconds>(interval));
{ {
level2DataRefreshTimer_.expires_after(interval); provider->refreshTimer_.expires_after(interval);
level2DataRefreshTimer_.async_wait( provider->refreshTimer_.async_wait(
[this](const boost::system::error_code& e) [=](const boost::system::error_code& e)
{ {
if (e == boost::system::errc::success) if (e == boost::system::errc::success)
{ {
RefreshLevel2Data(); RefreshData(provider);
} }
else if (e == boost::asio::error::operation_aborted) else if (e == boost::asio::error::operation_aborted)
{ {
logger_->debug("Level 2 data refresh timer cancelled"); logger_->debug("[{}] Data refresh timer cancelled",
provider->name());
} }
else else
{ {
logger_->warn("Level 2 data refresh timer error: {}", logger_->warn("[{}] Data refresh timer error: {}",
provider->name(),
e.message()); e.message());
} }
}); });
@ -391,8 +436,8 @@ void RadarProductManager::LoadLevel2Data(
if (existingRecord == nullptr) if (existingRecord == nullptr)
{ {
std::string key = p->level2DataProvider_->FindKey(time); std::string key = p->level2Data_->provider_->FindKey(time);
nexradFile = p->level2DataProvider_->LoadObjectByKey(key); nexradFile = p->level2Data_->provider_->LoadObjectByKey(key);
} }
else else
{ {

View file

@ -64,8 +64,9 @@ public:
std::shared_ptr<request::NexradFileRequest> request = nullptr); std::shared_ptr<request::NexradFileRequest> request = nullptr);
signals: signals:
void void NewDataAvailable(common::RadarProductGroup group,
NewLevel2DataAvailable(std::chrono::system_clock::time_point latestTime); const std::string& product,
std::chrono::system_clock::time_point latestTime);
private: private:
std::unique_ptr<RadarProductManagerImpl> p; std::unique_ptr<RadarProductManagerImpl> p;

View file

@ -603,13 +603,15 @@ void MapWidgetImpl::AutoRefreshConnect()
{ {
connect( connect(
radarProductManager_.get(), radarProductManager_.get(),
&manager::RadarProductManager::NewLevel2DataAvailable, &manager::RadarProductManager::NewDataAvailable,
this, this,
[&](std::chrono::system_clock::time_point latestTime) [&](common::RadarProductGroup group,
const std::string& product,
std::chrono::system_clock::time_point latestTime)
{ {
if (autoRefreshEnabled_ && context_->radarProductView_ != nullptr && if (autoRefreshEnabled_ && context_->radarProductView_ != nullptr &&
context_->radarProductView_->GetRadarProductGroup() == group == common::RadarProductGroup::Level2 &&
common::RadarProductGroup::Level2) context_->radarProductView_->GetRadarProductGroup() == group)
{ {
// Create file request // Create file request
std::shared_ptr<request::NexradFileRequest> request = std::shared_ptr<request::NexradFileRequest> request =
@ -646,7 +648,7 @@ void MapWidgetImpl::AutoRefreshDisconnect()
if (radarProductManager_ != nullptr) if (radarProductManager_ != nullptr)
{ {
disconnect(radarProductManager_.get(), disconnect(radarProductManager_.get(),
&manager::RadarProductManager::NewLevel2DataAvailable, &manager::RadarProductManager::NewDataAvailable,
this, this,
nullptr); nullptr);
} }