Load level 3 data from AWS provider

This commit is contained in:
Dan Paulat 2022-06-03 22:47:40 -05:00
parent 780d13cefa
commit 655e9d0634
6 changed files with 266 additions and 128 deletions

View file

@ -80,29 +80,9 @@ public:
}
~ProviderManager() = default;
void Disable()
{
std::unique_lock lock(refreshTimerMutex_);
refreshEnabled_ = false;
refreshTimer_.cancel();
}
std::string name() const;
std::string name() const
{
std::string name;
if (group_ == common::RadarProductGroup::Level3)
{
name = std::format(
"{}, {}", common::GetRadarProductGroupName(group_), product_);
}
else
{
name = common::GetRadarProductGroupName(group_);
}
return name;
}
void Disable();
const common::RadarProductGroup group_;
const std::string product_;
@ -121,13 +101,15 @@ public:
coordinates0_5Degree_ {},
coordinates1Degree_ {},
level2ProductRecords_ {},
level3ProductRecords_ {},
level3ProductRecordsMap_ {},
level2ProductRecordMutex_ {},
level3ProductRecordMutex_ {},
level2Data_ {
std::make_shared<ProviderManager>(common::RadarProductGroup::Level2)},
level2ProviderManager_ {common::RadarProductGroup::Level2},
level3ProviderManagerMap_ {},
level3ProviderManagerMutex_ {},
initializeMutex_ {},
loadLevel2DataMutex_ {}
loadLevel2DataMutex_ {},
loadLevel3DataMutex_ {}
{
if (radarSite_ == nullptr)
{
@ -135,14 +117,29 @@ public:
radarSite_ = std::make_shared<config::RadarSite>();
}
level2Data_->provider_ =
level2ProviderManager_.provider_ =
provider::NexradDataProviderFactory::CreateLevel2DataProvider(radarId);
}
~RadarProductManagerImpl() { level2Data_->Disable(); }
~RadarProductManagerImpl()
{
level2ProviderManager_.Disable();
std::shared_lock lock(level3ProviderManagerMutex_);
std::for_each(std::execution::par_unseq,
level3ProviderManagerMap_.begin(),
level3ProviderManagerMap_.end(),
[](auto& p)
{
auto& [key, providerManager] = p;
providerManager.Disable();
});
}
RadarProductManager* self_;
void RefreshData(std::shared_ptr<ProviderManager> provider);
void EnableRefresh(RadarProductManagerImpl::ProviderManager& providerManager,
bool enabled);
void RefreshData(ProviderManager& providerManager);
std::shared_ptr<types::RadarProductRecord>
GetLevel2ProductRecord(std::chrono::system_clock::time_point time);
@ -152,6 +149,14 @@ public:
std::shared_ptr<types::RadarProductRecord>
StoreRadarProductRecord(std::shared_ptr<types::RadarProductRecord> record);
void
LoadProviderData(std::chrono::system_clock::time_point time,
RadarProductManagerImpl::ProviderManager& providerManager,
RadarProductRecordMap& recordMap,
std::shared_mutex& recordMutex,
std::mutex& loadDataMutex,
std::shared_ptr<request::NexradFileRequest> request);
static void
LoadNexradFile(CreateNexradFileFunction load,
std::shared_ptr<request::NexradFileRequest> request,
@ -165,16 +170,20 @@ public:
std::vector<float> coordinates0_5Degree_;
std::vector<float> coordinates1Degree_;
RadarProductRecordMap level2ProductRecords_;
std::unordered_map<std::string, RadarProductRecordMap> level3ProductRecords_;
RadarProductRecordMap level2ProductRecords_;
std::unordered_map<std::string, RadarProductRecordMap>
level3ProductRecordsMap_;
std::shared_mutex level2ProductRecordMutex_;
std::shared_mutex level3ProductRecordMutex_;
std::shared_ptr<ProviderManager> level2Data_;
ProviderManager level2ProviderManager_;
std::unordered_map<std::string, ProviderManager> level3ProviderManagerMap_;
std::shared_mutex level3ProviderManagerMutex_;
std::mutex initializeMutex_;
std::mutex loadLevel2DataMutex_;
std::mutex loadLevel3DataMutex_;
};
RadarProductManager::RadarProductManager(const std::string& radarId) :
@ -183,6 +192,30 @@ RadarProductManager::RadarProductManager(const std::string& radarId) :
}
RadarProductManager::~RadarProductManager() = default;
std::string RadarProductManagerImpl::ProviderManager::name() const
{
std::string name;
if (group_ == common::RadarProductGroup::Level3)
{
name = std::format(
"{}, {}", common::GetRadarProductGroupName(group_), product_);
}
else
{
name = common::GetRadarProductGroupName(group_);
}
return name;
}
void RadarProductManagerImpl::ProviderManager::Disable()
{
std::unique_lock lock(refreshTimerMutex_);
refreshEnabled_ = false;
refreshTimer_.cancel();
}
void RadarProductManager::Cleanup()
{
{
@ -318,43 +351,75 @@ void RadarProductManager::Initialize()
p->initialized_ = true;
}
void RadarProductManager::EnableLevel2Refresh(bool enabled)
void RadarProductManager::EnableRefresh(common::RadarProductGroup group,
const std::string& product,
bool enabled)
{
if (p->level2Data_->refreshEnabled_ != enabled)
if (group == common::RadarProductGroup::Level2)
{
p->level2Data_->refreshEnabled_ = enabled;
p->EnableRefresh(p->level2ProviderManager_, enabled);
}
else
{
std::unique_lock lock(p->level3ProviderManagerMutex_);
if (!p->level3ProviderManagerMap_.contains(product))
{
p->level3ProviderManagerMap_.emplace(
std::piecewise_construct,
std::forward_as_tuple(product),
std::forward_as_tuple(common::RadarProductGroup::Level3, product));
p->level3ProviderManagerMap_.at(product).provider_ =
provider::NexradDataProviderFactory::CreateLevel3DataProvider(
p->radarId_, product);
}
RadarProductManagerImpl::ProviderManager& providerManager =
p->level3ProviderManagerMap_.at(product);
lock.unlock();
p->EnableRefresh(providerManager, enabled);
}
}
void RadarProductManagerImpl::EnableRefresh(ProviderManager& providerManager,
bool enabled)
{
if (providerManager.refreshEnabled_ != enabled)
{
providerManager.refreshEnabled_ = enabled;
if (enabled)
{
p->RefreshData(p->level2Data_);
RefreshData(providerManager);
}
}
}
void RadarProductManagerImpl::RefreshData(
std::shared_ptr<ProviderManager> provider)
void RadarProductManagerImpl::RefreshData(ProviderManager& providerManager)
{
logger_->debug("RefreshData: {}", provider->name());
logger_->debug("RefreshData: {}", providerManager.name());
{
std::unique_lock lock(provider->refreshTimerMutex_);
provider->refreshTimer_.cancel();
std::unique_lock lock(providerManager.refreshTimerMutex_);
providerManager.refreshTimer_.cancel();
}
util::async(
[=]()
[&]()
{
auto [newObjects, totalObjects] = provider->provider_->Refresh();
auto [newObjects, totalObjects] = providerManager.provider_->Refresh();
std::chrono::milliseconds interval = kRetryInterval_;
if (newObjects > 0)
{
std::string key = provider->provider_->FindLatestKey();
auto latestTime = provider->provider_->GetTimePointByKey(key);
std::string key = providerManager.provider_->FindLatestKey();
auto latestTime = providerManager.provider_->GetTimePointByKey(key);
auto updatePeriod = provider->provider_->update_period();
auto lastModified = provider->provider_->last_modified();
auto updatePeriod = providerManager.provider_->update_period();
auto lastModified = providerManager.provider_->last_modified();
interval = std::chrono::duration_cast<std::chrono::milliseconds>(
updatePeriod -
(std::chrono::system_clock::now() - lastModified));
@ -364,43 +429,43 @@ void RadarProductManagerImpl::RefreshData(
}
emit self_->NewDataAvailable(
provider->group_, provider->product_, latestTime);
providerManager.group_, providerManager.product_, latestTime);
}
else if (provider->refreshEnabled_ && totalObjects == 0)
else if (providerManager.refreshEnabled_ && totalObjects == 0)
{
logger_->info("[{}] No data found, disabling refresh",
provider->name());
providerManager.name());
provider->refreshEnabled_ = false;
providerManager.refreshEnabled_ = false;
}
if (provider->refreshEnabled_)
if (providerManager.refreshEnabled_)
{
std::unique_lock lock(provider->refreshTimerMutex_);
std::unique_lock lock(providerManager.refreshTimerMutex_);
logger_->debug(
"[{}] Scheduled refresh in {:%M:%S}",
provider->name(),
providerManager.name(),
std::chrono::duration_cast<std::chrono::seconds>(interval));
{
provider->refreshTimer_.expires_after(interval);
provider->refreshTimer_.async_wait(
[=](const boost::system::error_code& e)
providerManager.refreshTimer_.expires_after(interval);
providerManager.refreshTimer_.async_wait(
[&](const boost::system::error_code& e)
{
if (e == boost::system::errc::success)
{
RefreshData(provider);
RefreshData(providerManager);
}
else if (e == boost::asio::error::operation_aborted)
{
logger_->debug("[{}] Data refresh timer cancelled",
provider->name());
providerManager.name());
}
else
{
logger_->warn("[{}] Data refresh timer error: {}",
provider->name(),
providerManager.name(),
e.message());
}
});
@ -409,23 +474,30 @@ void RadarProductManagerImpl::RefreshData(
});
}
void RadarProductManager::LoadLevel2Data(
void RadarProductManagerImpl::LoadProviderData(
std::chrono::system_clock::time_point time,
RadarProductManagerImpl::ProviderManager& providerManager,
RadarProductRecordMap& recordMap,
std::shared_mutex& recordMutex,
std::mutex& loadDataMutex,
std::shared_ptr<request::NexradFileRequest> request)
{
logger_->debug("LoadLevel2Data: {}", util::TimeString(time));
logger_->debug("LoadProviderData: {}, {}",
providerManager.name(),
util::TimeString(time));
RadarProductManagerImpl::LoadNexradFile(
[=]() -> std::shared_ptr<wsr88d::NexradFile>
[=, &providerManager, &recordMap, &recordMutex, &loadDataMutex]()
-> std::shared_ptr<wsr88d::NexradFile>
{
std::shared_ptr<types::RadarProductRecord> existingRecord = nullptr;
std::shared_ptr<wsr88d::NexradFile> nexradFile = nullptr;
{
std::shared_lock sharedLock {p->level2ProductRecordMutex_};
std::shared_lock sharedLock {recordMutex};
auto it = p->level2ProductRecords_.find(time);
if (it != p->level2ProductRecords_.cend())
auto it = recordMap.find(time);
if (it != recordMap.cend())
{
logger_->debug(
"Data previously loaded, loading from data cache");
@ -436,8 +508,8 @@ void RadarProductManager::LoadLevel2Data(
if (existingRecord == nullptr)
{
std::string key = p->level2Data_->provider_->FindKey(time);
nexradFile = p->level2Data_->provider_->LoadObjectByKey(key);
std::string key = providerManager.provider_->FindKey(time);
nexradFile = providerManager.provider_->LoadObjectByKey(key);
}
else
{
@ -447,7 +519,53 @@ void RadarProductManager::LoadLevel2Data(
return nexradFile;
},
request,
p->loadLevel2DataMutex_);
loadDataMutex);
}
void RadarProductManager::LoadLevel2Data(
std::chrono::system_clock::time_point time,
std::shared_ptr<request::NexradFileRequest> request)
{
logger_->debug("LoadLevel2Data: {}", util::TimeString(time));
p->LoadProviderData(time,
p->level2ProviderManager_,
p->level2ProductRecords_,
p->level2ProductRecordMutex_,
p->loadLevel2DataMutex_,
request);
}
void RadarProductManager::LoadLevel3Data(
const std::string& product,
std::chrono::system_clock::time_point time,
std::shared_ptr<request::NexradFileRequest> request)
{
logger_->debug("LoadLevel3Data: {}", util::TimeString(time));
// Look up provider manager
std::shared_lock providerManagerLock(p->level3ProviderManagerMutex_);
auto level3ProviderManager = p->level3ProviderManagerMap_.find(product);
if (level3ProviderManager == p->level3ProviderManagerMap_.cend())
{
logger_->debug("No level 3 provider manager for product: {}", product);
return;
}
providerManagerLock.unlock();
// Look up product record
std::unique_lock productRecordLock(p->level3ProductRecordMutex_);
RadarProductRecordMap& level3ProductRecords =
p->level3ProductRecordsMap_[product];
productRecordLock.unlock();
// Load provider data
p->LoadProviderData(time,
level3ProviderManager->second,
level3ProductRecords,
p->level3ProductRecordMutex_,
p->loadLevel3DataMutex_,
request);
}
void RadarProductManager::LoadData(
@ -564,8 +682,7 @@ RadarProductManagerImpl::GetLevel2ProductRecord(
record = util::GetBoundedElementValue(level2ProductRecords_, time);
// Does the record contain the time we are looking for?
if (record != nullptr && (time < record->level2_file()->start_time() ||
record->level2_file()->end_time() < time))
if (record != nullptr && (time < record->level2_file()->start_time()))
{
record = nullptr;
}
@ -580,9 +697,11 @@ RadarProductManagerImpl::GetLevel3ProductRecord(
{
std::shared_ptr<types::RadarProductRecord> record = nullptr;
auto it = level3ProductRecords_.find(product);
std::unique_lock lock {level3ProductRecordMutex_};
if (it != level3ProductRecords_.cend())
auto it = level3ProductRecordsMap_.find(product);
if (it != level3ProductRecordsMap_.cend())
{
record = util::GetBoundedElementValue(it->second, time);
}
@ -623,7 +742,7 @@ RadarProductManagerImpl::StoreRadarProductRecord(
{
std::unique_lock lock {level3ProductRecordMutex_};
auto& productMap = level3ProductRecords_[record->radar_product()];
auto& productMap = level3ProductRecordsMap_[record->radar_product()];
auto it = productMap.find(timeInSeconds);
if (it != productMap.cend())

View file

@ -36,7 +36,9 @@ public:
std::shared_ptr<config::RadarSite> radar_site() const;
void Initialize();
void EnableLevel2Refresh(bool enabled);
void EnableRefresh(common::RadarProductGroup group,
const std::string& product,
bool enabled);
std::tuple<std::shared_ptr<wsr88d::rda::ElevationScan>,
float,
@ -55,6 +57,10 @@ public:
void LoadLevel2Data(
std::chrono::system_clock::time_point time,
std::shared_ptr<request::NexradFileRequest> request = nullptr);
void LoadLevel3Data(
const std::string& product,
std::chrono::system_clock::time_point time,
std::shared_ptr<request::NexradFileRequest> request = nullptr);
static void
LoadData(std::istream& is,