diff --git a/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp b/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp index 5ef36853..4d61b912 100644 --- a/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp +++ b/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -13,8 +14,10 @@ #include #include +#include #include #include +#include namespace scwx { @@ -42,34 +45,55 @@ static constexpr uint32_t NUM_COORIDNATES_0_5_DEGREE = static constexpr uint32_t NUM_COORIDNATES_1_DEGREE = NUM_RADIAL_GATES_1_DEGREE * 2; +static constexpr std::chrono::seconds kRetryInterval_ {15}; + // TODO: Find a way to garbage collect this static std::unordered_map> instanceMap_; -std::unordered_map> - fileIndex_; - -static std::shared_mutex fileLoadMutex_; +static std::unordered_map> + fileIndex_; static std::shared_mutex fileIndexMutex_; +static std::mutex fileLoadMutex_; + class RadarProductManagerImpl { public: - explicit RadarProductManagerImpl(const std::string& radarId) : + explicit RadarProductManagerImpl(RadarProductManager* self, + const std::string& radarId) : + self_ {self}, radarId_ {radarId}, initialized_ {false}, radarSite_ {config::RadarSite::Get(radarId)}, + coordinates0_5Degree_ {}, + coordinates1Degree_ {}, + level2ProductRecords_ {}, + level3ProductRecords_ {}, + level2ProductRecordMutex_ {}, + level3ProductRecordMutex_ {}, + level2DataRefreshEnabled_ {false}, + level2DataRefreshTimer_ {std::make_shared()}, level2DataProvider_ { - provider::Level2DataProviderFactory::Create(radarId)} + provider::Level2DataProviderFactory::Create(radarId)}, + initializeMutex_ {}, + loadLevel2DataMutex_ {} { if (radarSite_ == nullptr) { logger_->warn("Radar site not found: \"{}\"", radarId_); radarSite_ = std::make_shared(); } + + level2DataRefreshTimer_->setSingleShot(true); } ~RadarProductManagerImpl() = default; + RadarProductManager* self_; + + void RefreshLevel2Data(); + std::shared_ptr GetLevel2ProductRecord(std::chrono::system_clock::time_point time); std::shared_ptr @@ -80,7 +104,8 @@ public: static void LoadNexradFile(CreateNexradFileFunction load, - std::shared_ptr request); + std::shared_ptr request, + std::mutex& mutex); std::string radarId_; bool initialized_; @@ -93,11 +118,19 @@ public: RadarProductRecordMap level2ProductRecords_; std::unordered_map level3ProductRecords_; + std::shared_mutex level2ProductRecordMutex_; + std::shared_mutex level3ProductRecordMutex_; + + bool level2DataRefreshEnabled_; + std::shared_ptr level2DataRefreshTimer_; std::shared_ptr level2DataProvider_; + + std::mutex initializeMutex_; + std::mutex loadLevel2DataMutex_; }; RadarProductManager::RadarProductManager(const std::string& radarId) : - p(std::make_unique(radarId)) + p(std::make_unique(this, radarId)) { } RadarProductManager::~RadarProductManager() = default; @@ -128,6 +161,8 @@ std::shared_ptr RadarProductManager::radar_site() const void RadarProductManager::Initialize() { + std::unique_lock lock {p->initializeMutex_}; + if (p->initialized_) { return; @@ -222,6 +257,103 @@ void RadarProductManager::Initialize() p->initialized_ = true; } +void RadarProductManager::EnableLevel2Refresh(bool enabled) +{ + if (p->level2DataRefreshEnabled_ != enabled) + { + p->level2DataRefreshEnabled_ = enabled; + + if (enabled) + { + p->RefreshLevel2Data(); + } + } +} + +void RadarProductManagerImpl::RefreshLevel2Data() +{ + logger_->debug("RefreshLevel2Data()"); + + level2DataRefreshTimer_->stop(); + + util::async( + [&]() + { + size_t newObjects = level2DataProvider_->Refresh(); + + std::chrono::milliseconds interval = kRetryInterval_; + + if (newObjects > 0) + { + std::string key = level2DataProvider_->FindLatestKey(); + auto latestTime = level2DataProvider_->GetTimePointByKey(key); + + auto updatePeriod = level2DataProvider_->update_period(); + auto lastModified = level2DataProvider_->last_modified(); + interval = std::chrono::duration_cast( + updatePeriod - + (std::chrono::system_clock::now() - lastModified)); + if (interval < std::chrono::milliseconds {kRetryInterval_}) + { + interval = kRetryInterval_; + } + + emit self_->NewLevel2DataAvailable(latestTime); + } + + if (level2DataRefreshEnabled_) + { + logger_->debug( + "Scheduled refresh in {:%M:%S}", + std::chrono::duration_cast(interval)); + + // TODO: This doesn't work from an async thread + level2DataRefreshTimer_->start(interval); + } + }); +} + +void RadarProductManager::LoadLevel2Data( + std::chrono::system_clock::time_point time, + std::shared_ptr request) +{ + logger_->debug("LoadLevel2Data: {}", util::TimeString(time)); + + RadarProductManagerImpl::LoadNexradFile( + [=]() -> std::shared_ptr + { + std::shared_ptr existingRecord = nullptr; + std::shared_ptr nexradFile = nullptr; + + { + std::shared_lock sharedLock {p->level2ProductRecordMutex_}; + + auto it = p->level2ProductRecords_.find(time); + if (it != p->level2ProductRecords_.cend()) + { + logger_->debug( + "Data previously loaded, loading from data cache"); + + existingRecord = it->second; + } + } + + if (existingRecord == nullptr) + { + std::string key = p->level2DataProvider_->FindKey(time); + nexradFile = p->level2DataProvider_->LoadObjectByKey(key); + } + else + { + nexradFile = existingRecord->nexrad_file(); + } + + return nexradFile; + }, + request, + p->loadLevel2DataMutex_); +} + void RadarProductManager::LoadData( std::istream& is, std::shared_ptr request) { @@ -230,7 +362,8 @@ void RadarProductManager::LoadData( RadarProductManagerImpl::LoadNexradFile( [=, &is]() -> std::shared_ptr { return wsr88d::NexradFileFactory::Create(is); }, - request); + request, + fileLoadMutex_); } void RadarProductManager::LoadFile( @@ -270,7 +403,8 @@ void RadarProductManager::LoadFile( RadarProductManagerImpl::LoadNexradFile( [=]() -> std::shared_ptr { return wsr88d::NexradFileFactory::Create(filename); }, - request); + request, + fileLoadMutex_); } else if (request != nullptr) { @@ -281,12 +415,14 @@ void RadarProductManager::LoadFile( void RadarProductManagerImpl::LoadNexradFile( CreateNexradFileFunction load, - std::shared_ptr request) + std::shared_ptr request, + std::mutex& mutex) { scwx::util::async( - [=]() + [=, &mutex]() { - std::unique_lock lock(fileLoadMutex_); + std::unique_lock lock {mutex}; + std::shared_ptr nexradFile = load(); std::shared_ptr record = nullptr; @@ -356,9 +492,15 @@ RadarProductManagerImpl::StoreRadarProductRecord( std::shared_ptr storedRecord = record; + auto timeInSeconds = + std::chrono::time_point_cast(record->time()); + if (record->radar_product_group() == common::RadarProductGroup::Level2) { - auto it = level2ProductRecords_.find(record->time()); + std::unique_lock lock {level2ProductRecordMutex_}; + + auto it = level2ProductRecords_.find(timeInSeconds); if (it != level2ProductRecords_.cend()) { logger_->debug( @@ -368,14 +510,16 @@ RadarProductManagerImpl::StoreRadarProductRecord( } else { - level2ProductRecords_[record->time()] = record; + level2ProductRecords_[timeInSeconds] = record; } } else if (record->radar_product_group() == common::RadarProductGroup::Level3) { + std::unique_lock lock {level3ProductRecordMutex_}; + auto& productMap = level3ProductRecords_[record->radar_product()]; - auto it = productMap.find(record->time()); + auto it = productMap.find(timeInSeconds); if (it != productMap.cend()) { logger_->debug( @@ -385,7 +529,7 @@ RadarProductManagerImpl::StoreRadarProductRecord( } else { - productMap[record->time()] = record; + productMap[timeInSeconds] = record; } } diff --git a/scwx-qt/source/scwx/qt/manager/radar_product_manager.hpp b/scwx-qt/source/scwx/qt/manager/radar_product_manager.hpp index c58bb705..cb80a950 100644 --- a/scwx-qt/source/scwx/qt/manager/radar_product_manager.hpp +++ b/scwx-qt/source/scwx/qt/manager/radar_product_manager.hpp @@ -34,6 +34,7 @@ public: std::shared_ptr radar_site() const; void Initialize(); + void EnableLevel2Refresh(bool enabled); std::tuple, float, @@ -49,6 +50,10 @@ public: static std::shared_ptr Instance(const std::string& radarSite); + void LoadLevel2Data( + std::chrono::system_clock::time_point time, + std::shared_ptr request = nullptr); + static void LoadData(std::istream& is, std::shared_ptr request = nullptr); @@ -56,6 +61,10 @@ public: LoadFile(const std::string& filename, std::shared_ptr request = nullptr); +signals: + void + NewLevel2DataAvailable(std::chrono::system_clock::time_point latestTime); + private: std::unique_ptr p;