Retrieve level 2 products from data provider

This commit is contained in:
Dan Paulat 2022-05-28 02:22:37 -05:00
parent 4b9d12a7ef
commit 000b6cbd86
2 changed files with 170 additions and 17 deletions

View file

@ -4,6 +4,7 @@
#include <scwx/util/logger.hpp> #include <scwx/util/logger.hpp>
#include <scwx/util/map.hpp> #include <scwx/util/map.hpp>
#include <scwx/util/threads.hpp> #include <scwx/util/threads.hpp>
#include <scwx/util/time.hpp>
#include <scwx/wsr88d/nexrad_file_factory.hpp> #include <scwx/wsr88d/nexrad_file_factory.hpp>
#include <deque> #include <deque>
@ -13,8 +14,10 @@
#include <boost/range/irange.hpp> #include <boost/range/irange.hpp>
#include <boost/timer/timer.hpp> #include <boost/timer/timer.hpp>
#include <fmt/chrono.h>
#include <GeographicLib/Geodesic.hpp> #include <GeographicLib/Geodesic.hpp>
#include <QMapbox> #include <QMapbox>
#include <QTimer>
namespace scwx namespace scwx
{ {
@ -42,34 +45,55 @@ static constexpr uint32_t NUM_COORIDNATES_0_5_DEGREE =
static constexpr uint32_t NUM_COORIDNATES_1_DEGREE = static constexpr uint32_t NUM_COORIDNATES_1_DEGREE =
NUM_RADIAL_GATES_1_DEGREE * 2; NUM_RADIAL_GATES_1_DEGREE * 2;
static constexpr std::chrono::seconds kRetryInterval_ {15};
// TODO: Find a way to garbage collect this // TODO: Find a way to garbage collect this
static std::unordered_map<std::string, std::shared_ptr<RadarProductManager>> static std::unordered_map<std::string, std::shared_ptr<RadarProductManager>>
instanceMap_; instanceMap_;
std::unordered_map<std::string, std::shared_ptr<types::RadarProductRecord>> static std::unordered_map<std::string,
fileIndex_; std::shared_ptr<types::RadarProductRecord>>
fileIndex_;
static std::shared_mutex fileLoadMutex_;
static std::shared_mutex fileIndexMutex_; static std::shared_mutex fileIndexMutex_;
static std::mutex fileLoadMutex_;
class RadarProductManagerImpl class RadarProductManagerImpl
{ {
public: public:
explicit RadarProductManagerImpl(const std::string& radarId) : explicit RadarProductManagerImpl(RadarProductManager* self,
const std::string& radarId) :
self_ {self},
radarId_ {radarId}, radarId_ {radarId},
initialized_ {false}, initialized_ {false},
radarSite_ {config::RadarSite::Get(radarId)}, radarSite_ {config::RadarSite::Get(radarId)},
coordinates0_5Degree_ {},
coordinates1Degree_ {},
level2ProductRecords_ {},
level3ProductRecords_ {},
level2ProductRecordMutex_ {},
level3ProductRecordMutex_ {},
level2DataRefreshEnabled_ {false},
level2DataRefreshTimer_ {std::make_shared<QTimer>()},
level2DataProvider_ { level2DataProvider_ {
provider::Level2DataProviderFactory::Create(radarId)} provider::Level2DataProviderFactory::Create(radarId)},
initializeMutex_ {},
loadLevel2DataMutex_ {}
{ {
if (radarSite_ == nullptr) if (radarSite_ == nullptr)
{ {
logger_->warn("Radar site not found: \"{}\"", radarId_); logger_->warn("Radar site not found: \"{}\"", radarId_);
radarSite_ = std::make_shared<config::RadarSite>(); radarSite_ = std::make_shared<config::RadarSite>();
} }
level2DataRefreshTimer_->setSingleShot(true);
} }
~RadarProductManagerImpl() = default; ~RadarProductManagerImpl() = default;
RadarProductManager* self_;
void RefreshLevel2Data();
std::shared_ptr<types::RadarProductRecord> std::shared_ptr<types::RadarProductRecord>
GetLevel2ProductRecord(std::chrono::system_clock::time_point time); GetLevel2ProductRecord(std::chrono::system_clock::time_point time);
std::shared_ptr<types::RadarProductRecord> std::shared_ptr<types::RadarProductRecord>
@ -80,7 +104,8 @@ public:
static void static void
LoadNexradFile(CreateNexradFileFunction load, LoadNexradFile(CreateNexradFileFunction load,
std::shared_ptr<request::NexradFileRequest> request); std::shared_ptr<request::NexradFileRequest> request,
std::mutex& mutex);
std::string radarId_; std::string radarId_;
bool initialized_; bool initialized_;
@ -93,11 +118,19 @@ public:
RadarProductRecordMap level2ProductRecords_; RadarProductRecordMap level2ProductRecords_;
std::unordered_map<std::string, RadarProductRecordMap> level3ProductRecords_; std::unordered_map<std::string, RadarProductRecordMap> level3ProductRecords_;
std::shared_mutex level2ProductRecordMutex_;
std::shared_mutex level3ProductRecordMutex_;
bool level2DataRefreshEnabled_;
std::shared_ptr<QTimer> level2DataRefreshTimer_;
std::shared_ptr<provider::Level2DataProvider> level2DataProvider_; std::shared_ptr<provider::Level2DataProvider> level2DataProvider_;
std::mutex initializeMutex_;
std::mutex loadLevel2DataMutex_;
}; };
RadarProductManager::RadarProductManager(const std::string& radarId) : RadarProductManager::RadarProductManager(const std::string& radarId) :
p(std::make_unique<RadarProductManagerImpl>(radarId)) p(std::make_unique<RadarProductManagerImpl>(this, radarId))
{ {
} }
RadarProductManager::~RadarProductManager() = default; RadarProductManager::~RadarProductManager() = default;
@ -128,6 +161,8 @@ std::shared_ptr<config::RadarSite> RadarProductManager::radar_site() const
void RadarProductManager::Initialize() void RadarProductManager::Initialize()
{ {
std::unique_lock lock {p->initializeMutex_};
if (p->initialized_) if (p->initialized_)
{ {
return; return;
@ -222,6 +257,103 @@ void RadarProductManager::Initialize()
p->initialized_ = true; p->initialized_ = true;
} }
void RadarProductManager::EnableLevel2Refresh(bool enabled)
{
if (p->level2DataRefreshEnabled_ != enabled)
{
p->level2DataRefreshEnabled_ = enabled;
if (enabled)
{
p->RefreshLevel2Data();
}
}
}
void RadarProductManagerImpl::RefreshLevel2Data()
{
logger_->debug("RefreshLevel2Data()");
level2DataRefreshTimer_->stop();
util::async(
[&]()
{
size_t newObjects = level2DataProvider_->Refresh();
std::chrono::milliseconds interval = kRetryInterval_;
if (newObjects > 0)
{
std::string key = level2DataProvider_->FindLatestKey();
auto latestTime = level2DataProvider_->GetTimePointByKey(key);
auto updatePeriod = level2DataProvider_->update_period();
auto lastModified = level2DataProvider_->last_modified();
interval = std::chrono::duration_cast<std::chrono::milliseconds>(
updatePeriod -
(std::chrono::system_clock::now() - lastModified));
if (interval < std::chrono::milliseconds {kRetryInterval_})
{
interval = kRetryInterval_;
}
emit self_->NewLevel2DataAvailable(latestTime);
}
if (level2DataRefreshEnabled_)
{
logger_->debug(
"Scheduled refresh in {:%M:%S}",
std::chrono::duration_cast<std::chrono::seconds>(interval));
// TODO: This doesn't work from an async thread
level2DataRefreshTimer_->start(interval);
}
});
}
void RadarProductManager::LoadLevel2Data(
std::chrono::system_clock::time_point time,
std::shared_ptr<request::NexradFileRequest> request)
{
logger_->debug("LoadLevel2Data: {}", util::TimeString(time));
RadarProductManagerImpl::LoadNexradFile(
[=]() -> std::shared_ptr<wsr88d::NexradFile>
{
std::shared_ptr<types::RadarProductRecord> existingRecord = nullptr;
std::shared_ptr<wsr88d::NexradFile> nexradFile = nullptr;
{
std::shared_lock sharedLock {p->level2ProductRecordMutex_};
auto it = p->level2ProductRecords_.find(time);
if (it != p->level2ProductRecords_.cend())
{
logger_->debug(
"Data previously loaded, loading from data cache");
existingRecord = it->second;
}
}
if (existingRecord == nullptr)
{
std::string key = p->level2DataProvider_->FindKey(time);
nexradFile = p->level2DataProvider_->LoadObjectByKey(key);
}
else
{
nexradFile = existingRecord->nexrad_file();
}
return nexradFile;
},
request,
p->loadLevel2DataMutex_);
}
void RadarProductManager::LoadData( void RadarProductManager::LoadData(
std::istream& is, std::shared_ptr<request::NexradFileRequest> request) std::istream& is, std::shared_ptr<request::NexradFileRequest> request)
{ {
@ -230,7 +362,8 @@ void RadarProductManager::LoadData(
RadarProductManagerImpl::LoadNexradFile( RadarProductManagerImpl::LoadNexradFile(
[=, &is]() -> std::shared_ptr<wsr88d::NexradFile> [=, &is]() -> std::shared_ptr<wsr88d::NexradFile>
{ return wsr88d::NexradFileFactory::Create(is); }, { return wsr88d::NexradFileFactory::Create(is); },
request); request,
fileLoadMutex_);
} }
void RadarProductManager::LoadFile( void RadarProductManager::LoadFile(
@ -270,7 +403,8 @@ void RadarProductManager::LoadFile(
RadarProductManagerImpl::LoadNexradFile( RadarProductManagerImpl::LoadNexradFile(
[=]() -> std::shared_ptr<wsr88d::NexradFile> [=]() -> std::shared_ptr<wsr88d::NexradFile>
{ return wsr88d::NexradFileFactory::Create(filename); }, { return wsr88d::NexradFileFactory::Create(filename); },
request); request,
fileLoadMutex_);
} }
else if (request != nullptr) else if (request != nullptr)
{ {
@ -281,12 +415,14 @@ void RadarProductManager::LoadFile(
void RadarProductManagerImpl::LoadNexradFile( void RadarProductManagerImpl::LoadNexradFile(
CreateNexradFileFunction load, CreateNexradFileFunction load,
std::shared_ptr<request::NexradFileRequest> request) std::shared_ptr<request::NexradFileRequest> request,
std::mutex& mutex)
{ {
scwx::util::async( scwx::util::async(
[=]() [=, &mutex]()
{ {
std::unique_lock lock(fileLoadMutex_); std::unique_lock lock {mutex};
std::shared_ptr<wsr88d::NexradFile> nexradFile = load(); std::shared_ptr<wsr88d::NexradFile> nexradFile = load();
std::shared_ptr<types::RadarProductRecord> record = nullptr; std::shared_ptr<types::RadarProductRecord> record = nullptr;
@ -356,9 +492,15 @@ RadarProductManagerImpl::StoreRadarProductRecord(
std::shared_ptr<types::RadarProductRecord> storedRecord = record; std::shared_ptr<types::RadarProductRecord> storedRecord = record;
auto timeInSeconds =
std::chrono::time_point_cast<std::chrono::seconds,
std::chrono::system_clock>(record->time());
if (record->radar_product_group() == common::RadarProductGroup::Level2) if (record->radar_product_group() == common::RadarProductGroup::Level2)
{ {
auto it = level2ProductRecords_.find(record->time()); std::unique_lock lock {level2ProductRecordMutex_};
auto it = level2ProductRecords_.find(timeInSeconds);
if (it != level2ProductRecords_.cend()) if (it != level2ProductRecords_.cend())
{ {
logger_->debug( logger_->debug(
@ -368,14 +510,16 @@ RadarProductManagerImpl::StoreRadarProductRecord(
} }
else else
{ {
level2ProductRecords_[record->time()] = record; level2ProductRecords_[timeInSeconds] = record;
} }
} }
else if (record->radar_product_group() == common::RadarProductGroup::Level3) else if (record->radar_product_group() == common::RadarProductGroup::Level3)
{ {
std::unique_lock lock {level3ProductRecordMutex_};
auto& productMap = level3ProductRecords_[record->radar_product()]; auto& productMap = level3ProductRecords_[record->radar_product()];
auto it = productMap.find(record->time()); auto it = productMap.find(timeInSeconds);
if (it != productMap.cend()) if (it != productMap.cend())
{ {
logger_->debug( logger_->debug(
@ -385,7 +529,7 @@ RadarProductManagerImpl::StoreRadarProductRecord(
} }
else else
{ {
productMap[record->time()] = record; productMap[timeInSeconds] = record;
} }
} }

View file

@ -34,6 +34,7 @@ public:
std::shared_ptr<config::RadarSite> radar_site() const; std::shared_ptr<config::RadarSite> radar_site() const;
void Initialize(); void Initialize();
void EnableLevel2Refresh(bool enabled);
std::tuple<std::shared_ptr<wsr88d::rda::ElevationScan>, std::tuple<std::shared_ptr<wsr88d::rda::ElevationScan>,
float, float,
@ -49,6 +50,10 @@ public:
static std::shared_ptr<RadarProductManager> static std::shared_ptr<RadarProductManager>
Instance(const std::string& radarSite); Instance(const std::string& radarSite);
void LoadLevel2Data(
std::chrono::system_clock::time_point time,
std::shared_ptr<request::NexradFileRequest> request = nullptr);
static void static void
LoadData(std::istream& is, LoadData(std::istream& is,
std::shared_ptr<request::NexradFileRequest> request = nullptr); std::shared_ptr<request::NexradFileRequest> request = nullptr);
@ -56,6 +61,10 @@ public:
LoadFile(const std::string& filename, LoadFile(const std::string& filename,
std::shared_ptr<request::NexradFileRequest> request = nullptr); std::shared_ptr<request::NexradFileRequest> request = nullptr);
signals:
void
NewLevel2DataAvailable(std::chrono::system_clock::time_point latestTime);
private: private:
std::unique_ptr<RadarProductManagerImpl> p; std::unique_ptr<RadarProductManagerImpl> p;