mirror of
https://github.com/ciphervance/supercell-wx.git
synced 2025-10-30 20:40:04 +00:00
Merge pull request #404 from AdenKoperczak/level_2_chunks
Level 2 chunks
This commit is contained in:
commit
58f2609fe7
22 changed files with 1509 additions and 202 deletions
|
|
@ -963,6 +963,13 @@ void MainWindowImpl::ConnectMapSignals()
|
|||
}
|
||||
},
|
||||
Qt::QueuedConnection);
|
||||
connect(
|
||||
mapWidget,
|
||||
&map::MapWidget::IncomingLevel2ElevationChanged,
|
||||
this,
|
||||
[this](std::optional<float>)
|
||||
{ level2SettingsWidget_->UpdateSettings(activeMap_); },
|
||||
Qt::QueuedConnection);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
#include <scwx/qt/types/time_types.hpp>
|
||||
#include <scwx/qt/util/geographic_lib.hpp>
|
||||
#include <scwx/common/constants.hpp>
|
||||
#include <scwx/provider/aws_level2_chunks_data_provider.hpp>
|
||||
#include <scwx/provider/nexrad_data_provider_factory.hpp>
|
||||
#include <scwx/util/logger.hpp>
|
||||
#include <scwx/util/map.hpp>
|
||||
|
|
@ -14,6 +15,7 @@
|
|||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
|
||||
#if defined(_MSC_VER)
|
||||
# pragma warning(push, 0)
|
||||
|
|
@ -66,7 +68,9 @@ static const std::string kDefaultLevel3Product_ {"N0B"};
|
|||
static constexpr std::size_t kTimerPlaces_ {6u};
|
||||
|
||||
static constexpr std::chrono::seconds kFastRetryInterval_ {15};
|
||||
static constexpr std::chrono::seconds kFastRetryIntervalChunks_ {3};
|
||||
static constexpr std::chrono::seconds kSlowRetryInterval_ {120};
|
||||
static constexpr std::chrono::seconds kSlowRetryIntervalChunks_ {20};
|
||||
|
||||
static std::unordered_map<std::string, std::weak_ptr<RadarProductManager>>
|
||||
instanceMap_;
|
||||
|
|
@ -84,21 +88,25 @@ class ProviderManager : public QObject
|
|||
Q_OBJECT
|
||||
public:
|
||||
explicit ProviderManager(RadarProductManager* self,
|
||||
const std::string& radarId,
|
||||
common::RadarProductGroup group) :
|
||||
ProviderManager(self, radarId, group, "???")
|
||||
{
|
||||
}
|
||||
explicit ProviderManager(RadarProductManager* self,
|
||||
const std::string& radarId,
|
||||
std::string radarId,
|
||||
common::RadarProductGroup group,
|
||||
const std::string& product) :
|
||||
radarId_ {radarId}, group_ {group}, product_ {product}
|
||||
std::string product = "???",
|
||||
bool isChunks = false) :
|
||||
radarId_ {std::move(radarId)},
|
||||
group_ {group},
|
||||
product_ {std::move(product)},
|
||||
isChunks_ {isChunks}
|
||||
{
|
||||
connect(this,
|
||||
&ProviderManager::NewDataAvailable,
|
||||
self,
|
||||
&RadarProductManager::NewDataAvailable);
|
||||
[this, self](common::RadarProductGroup group,
|
||||
const std::string& product,
|
||||
std::chrono::system_clock::time_point latestTime)
|
||||
{
|
||||
Q_EMIT self->NewDataAvailable(
|
||||
group, product, isChunks_, latestTime);
|
||||
});
|
||||
}
|
||||
~ProviderManager() { threadPool_.join(); };
|
||||
|
||||
|
|
@ -111,10 +119,12 @@ public:
|
|||
const std::string radarId_;
|
||||
const common::RadarProductGroup group_;
|
||||
const std::string product_;
|
||||
const bool isChunks_;
|
||||
bool refreshEnabled_ {false};
|
||||
boost::asio::steady_timer refreshTimer_ {threadPool_};
|
||||
std::mutex refreshTimerMutex_ {};
|
||||
std::shared_ptr<provider::NexradDataProvider> provider_ {nullptr};
|
||||
size_t refreshCount_ {0};
|
||||
|
||||
signals:
|
||||
void NewDataAvailable(common::RadarProductGroup group,
|
||||
|
|
@ -133,7 +143,9 @@ public:
|
|||
level3ProductsInitialized_ {false},
|
||||
radarSite_ {config::RadarSite::Get(radarId)},
|
||||
level2ProviderManager_ {std::make_shared<ProviderManager>(
|
||||
self_, radarId_, common::RadarProductGroup::Level2)}
|
||||
self_, radarId_, common::RadarProductGroup::Level2)},
|
||||
level2ChunksProviderManager_ {std::make_shared<ProviderManager>(
|
||||
self_, radarId_, common::RadarProductGroup::Level2, "???", true)}
|
||||
{
|
||||
if (radarSite_ == nullptr)
|
||||
{
|
||||
|
|
@ -143,10 +155,24 @@ public:
|
|||
|
||||
level2ProviderManager_->provider_ =
|
||||
provider::NexradDataProviderFactory::CreateLevel2DataProvider(radarId);
|
||||
level2ChunksProviderManager_->provider_ =
|
||||
provider::NexradDataProviderFactory::CreateLevel2ChunksDataProvider(
|
||||
radarId);
|
||||
|
||||
auto level2ChunksProvider =
|
||||
std::dynamic_pointer_cast<provider::AwsLevel2ChunksDataProvider>(
|
||||
level2ChunksProviderManager_->provider_);
|
||||
if (level2ChunksProvider != nullptr)
|
||||
{
|
||||
level2ChunksProvider->SetLevel2DataProvider(
|
||||
std::dynamic_pointer_cast<provider::AwsLevel2DataProvider>(
|
||||
level2ProviderManager_->provider_));
|
||||
}
|
||||
}
|
||||
~RadarProductManagerImpl()
|
||||
{
|
||||
level2ProviderManager_->Disable();
|
||||
level2ChunksProviderManager_->Disable();
|
||||
|
||||
std::shared_lock lock(level3ProviderManagerMutex_);
|
||||
std::for_each(std::execution::par_unseq,
|
||||
|
|
@ -172,8 +198,9 @@ public:
|
|||
std::shared_ptr<ProviderManager>
|
||||
GetLevel3ProviderManager(const std::string& product);
|
||||
|
||||
void EnableRefresh(boost::uuids::uuid uuid,
|
||||
std::shared_ptr<ProviderManager> providerManager,
|
||||
void EnableRefresh(
|
||||
boost::uuids::uuid uuid,
|
||||
const std::set<std::shared_ptr<ProviderManager>>& providerManagers,
|
||||
bool enabled);
|
||||
void RefreshData(std::shared_ptr<ProviderManager> providerManager);
|
||||
void RefreshDataSync(std::shared_ptr<ProviderManager> providerManager);
|
||||
|
|
@ -250,6 +277,7 @@ public:
|
|||
std::shared_mutex level3ProductRecordMutex_ {};
|
||||
|
||||
std::shared_ptr<ProviderManager> level2ProviderManager_;
|
||||
std::shared_ptr<ProviderManager> level2ChunksProviderManager_;
|
||||
std::unordered_map<std::string, std::shared_ptr<ProviderManager>>
|
||||
level3ProviderManagerMap_ {};
|
||||
std::shared_mutex level3ProviderManagerMutex_ {};
|
||||
|
|
@ -262,8 +290,10 @@ public:
|
|||
common::Level3ProductCategoryMap availableCategoryMap_ {};
|
||||
std::shared_mutex availableCategoryMutex_ {};
|
||||
|
||||
std::optional<float> incomingLevel2Elevation_ {};
|
||||
|
||||
std::unordered_map<boost::uuids::uuid,
|
||||
std::shared_ptr<ProviderManager>,
|
||||
std::set<std::shared_ptr<ProviderManager>>,
|
||||
boost::hash<boost::uuids::uuid>>
|
||||
refreshMap_ {};
|
||||
std::shared_mutex refreshMapMutex_ {};
|
||||
|
|
@ -441,6 +471,11 @@ float RadarProductManager::gate_size() const
|
|||
return (is_tdwr()) ? 150.0f : 250.0f;
|
||||
}
|
||||
|
||||
std::optional<float> RadarProductManager::incoming_level_2_elevation() const
|
||||
{
|
||||
return p->incomingLevel2Elevation_;
|
||||
}
|
||||
|
||||
std::string RadarProductManager::radar_id() const
|
||||
{
|
||||
return p->radarId_;
|
||||
|
|
@ -637,7 +672,10 @@ void RadarProductManager::EnableRefresh(common::RadarProductGroup group,
|
|||
{
|
||||
if (group == common::RadarProductGroup::Level2)
|
||||
{
|
||||
p->EnableRefresh(uuid, p->level2ProviderManager_, enabled);
|
||||
p->EnableRefresh(
|
||||
uuid,
|
||||
{p->level2ProviderManager_, p->level2ChunksProviderManager_},
|
||||
enabled);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
@ -660,7 +698,7 @@ void RadarProductManager::EnableRefresh(common::RadarProductGroup group,
|
|||
availableProducts.cend(),
|
||||
product) != availableProducts.cend())
|
||||
{
|
||||
p->EnableRefresh(uuid, providerManager, enabled);
|
||||
p->EnableRefresh(uuid, {providerManager}, enabled);
|
||||
}
|
||||
}
|
||||
catch (const std::exception& ex)
|
||||
|
|
@ -673,49 +711,44 @@ void RadarProductManager::EnableRefresh(common::RadarProductGroup group,
|
|||
|
||||
void RadarProductManagerImpl::EnableRefresh(
|
||||
boost::uuids::uuid uuid,
|
||||
std::shared_ptr<ProviderManager> providerManager,
|
||||
const std::set<std::shared_ptr<ProviderManager>>& providerManagers,
|
||||
bool enabled)
|
||||
{
|
||||
// Lock the refresh map
|
||||
std::unique_lock lock {refreshMapMutex_};
|
||||
|
||||
auto currentProviderManager = refreshMap_.find(uuid);
|
||||
if (currentProviderManager != refreshMap_.cend())
|
||||
auto currentProviderManagers = refreshMap_.find(uuid);
|
||||
if (currentProviderManagers != refreshMap_.cend())
|
||||
{
|
||||
// If the enabling refresh for a different product, or disabling refresh
|
||||
if (currentProviderManager->second != providerManager || !enabled)
|
||||
for (const auto& currentProviderManager : currentProviderManagers->second)
|
||||
{
|
||||
currentProviderManager->refreshCount_ -= 1;
|
||||
// If the enabling refresh for a different product, or disabling
|
||||
// refresh
|
||||
if (!providerManagers.contains(currentProviderManager) || !enabled)
|
||||
{
|
||||
// Determine number of entries in the map for the current provider
|
||||
// manager
|
||||
auto currentProviderManagerCount = std::count_if(
|
||||
refreshMap_.cbegin(),
|
||||
refreshMap_.cend(),
|
||||
[&](const auto& provider)
|
||||
{ return provider.second == currentProviderManager->second; });
|
||||
|
||||
// If this is the last reference to the provider in the refresh map
|
||||
if (currentProviderManagerCount == 1)
|
||||
if (currentProviderManager->refreshCount_ == 0)
|
||||
{
|
||||
// Disable current provider
|
||||
currentProviderManager->second->Disable();
|
||||
currentProviderManager->Disable();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dissociate uuid from current provider manager
|
||||
refreshMap_.erase(currentProviderManager);
|
||||
// Dissociate uuid from current provider managers
|
||||
refreshMap_.erase(currentProviderManagers);
|
||||
}
|
||||
|
||||
// If we are enabling a new provider manager
|
||||
if (enabled)
|
||||
{
|
||||
// Associate uuid to providerManager
|
||||
refreshMap_.emplace(uuid, providerManager);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (enabled)
|
||||
{
|
||||
// We are enabling a new provider manager
|
||||
// We are enabling provider managers
|
||||
// Associate uuid to provider manager
|
||||
refreshMap_.emplace(uuid, providerManager);
|
||||
refreshMap_.emplace(uuid, providerManagers);
|
||||
for (const auto& providerManager : providerManagers)
|
||||
{
|
||||
providerManager->refreshCount_ += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Release the refresh map mutex
|
||||
|
|
@ -723,21 +756,23 @@ void RadarProductManagerImpl::EnableRefresh(
|
|||
|
||||
// We have already handled a disable request by this point. If enabling, and
|
||||
// the provider manager refresh isn't already enabled, enable it.
|
||||
if (enabled && providerManager->refreshEnabled_ != enabled)
|
||||
{
|
||||
providerManager->refreshEnabled_ = enabled;
|
||||
|
||||
if (enabled)
|
||||
{
|
||||
for (const auto& providerManager : providerManagers)
|
||||
{
|
||||
if (providerManager->refreshEnabled_ != enabled)
|
||||
{
|
||||
providerManager->refreshEnabled_ = enabled;
|
||||
RefreshData(providerManager);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void RadarProductManagerImpl::RefreshData(
|
||||
std::shared_ptr<ProviderManager> providerManager)
|
||||
{
|
||||
logger_->debug("RefreshData: {}", providerManager->name());
|
||||
logger_->trace("RefreshData: {}", providerManager->name());
|
||||
|
||||
{
|
||||
std::unique_lock lock(providerManager->refreshTimerMutex_);
|
||||
|
|
@ -765,13 +800,18 @@ void RadarProductManagerImpl::RefreshDataSync(
|
|||
|
||||
auto [newObjects, totalObjects] = providerManager->provider_->Refresh();
|
||||
|
||||
std::chrono::milliseconds interval = kFastRetryInterval_;
|
||||
// Level2 chunked data is updated quickly and uses a faster interval
|
||||
const std::chrono::milliseconds fastRetryInterval =
|
||||
providerManager->isChunks_ ? kFastRetryIntervalChunks_ :
|
||||
kFastRetryInterval_;
|
||||
const std::chrono::milliseconds slowRetryInterval =
|
||||
providerManager->isChunks_ ? kSlowRetryIntervalChunks_ :
|
||||
kSlowRetryInterval_;
|
||||
std::chrono::milliseconds interval = fastRetryInterval;
|
||||
|
||||
if (totalObjects > 0)
|
||||
{
|
||||
std::string key = providerManager->provider_->FindLatestKey();
|
||||
auto latestTime = providerManager->provider_->GetTimePointByKey(key);
|
||||
|
||||
auto latestTime = providerManager->provider_->FindLatestTime();
|
||||
auto updatePeriod = providerManager->provider_->update_period();
|
||||
auto lastModified = providerManager->provider_->last_modified();
|
||||
auto sinceLastModified = std::chrono::system_clock::now() - lastModified;
|
||||
|
|
@ -786,12 +826,12 @@ void RadarProductManagerImpl::RefreshDataSync(
|
|||
{
|
||||
// If it has been at least 5 update periods since the file has
|
||||
// been last modified, slow the retry period
|
||||
interval = kSlowRetryInterval_;
|
||||
interval = slowRetryInterval;
|
||||
}
|
||||
else if (interval < std::chrono::milliseconds {kFastRetryInterval_})
|
||||
else if (interval < std::chrono::milliseconds {fastRetryInterval})
|
||||
{
|
||||
// The interval should be no quicker than the fast retry interval
|
||||
interval = kFastRetryInterval_;
|
||||
interval = fastRetryInterval;
|
||||
}
|
||||
|
||||
if (newObjects > 0)
|
||||
|
|
@ -805,14 +845,14 @@ void RadarProductManagerImpl::RefreshDataSync(
|
|||
logger_->info("[{}] No data found", providerManager->name());
|
||||
|
||||
// If no data is found, retry at the slow retry interval
|
||||
interval = kSlowRetryInterval_;
|
||||
interval = slowRetryInterval;
|
||||
}
|
||||
|
||||
std::unique_lock const lock(providerManager->refreshTimerMutex_);
|
||||
|
||||
if (providerManager->refreshEnabled_)
|
||||
{
|
||||
logger_->debug(
|
||||
logger_->trace(
|
||||
"[{}] Scheduled refresh in {:%M:%S}",
|
||||
providerManager->name(),
|
||||
std::chrono::duration_cast<std::chrono::seconds>(interval));
|
||||
|
|
@ -861,10 +901,13 @@ RadarProductManager::GetActiveVolumeTimes(
|
|||
std::shared_lock refreshLock {p->refreshMapMutex_};
|
||||
|
||||
// For each entry in the refresh map (refresh is enabled)
|
||||
for (auto& refreshEntry : p->refreshMap_)
|
||||
for (auto& refreshSet : p->refreshMap_)
|
||||
{
|
||||
for (const auto& refreshEntry : refreshSet.second)
|
||||
{
|
||||
// Add the provider for the current entry
|
||||
providers.insert(refreshEntry.second->provider_);
|
||||
providers.insert(refreshEntry->provider_);
|
||||
}
|
||||
}
|
||||
|
||||
// Unlock the refresh map
|
||||
|
|
@ -923,7 +966,7 @@ void RadarProductManagerImpl::LoadProviderData(
|
|||
std::mutex& loadDataMutex,
|
||||
const std::shared_ptr<request::NexradFileRequest>& request)
|
||||
{
|
||||
logger_->debug("LoadProviderData: {}, {}",
|
||||
logger_->trace("LoadProviderData: {}, {}",
|
||||
providerManager->name(),
|
||||
scwx::util::TimeString(time));
|
||||
|
||||
|
|
@ -943,7 +986,7 @@ void RadarProductManagerImpl::LoadProviderData(
|
|||
|
||||
if (existingRecord != nullptr)
|
||||
{
|
||||
logger_->debug(
|
||||
logger_->trace(
|
||||
"Data previously loaded, loading from data cache");
|
||||
}
|
||||
}
|
||||
|
|
@ -951,13 +994,8 @@ void RadarProductManagerImpl::LoadProviderData(
|
|||
|
||||
if (existingRecord == nullptr)
|
||||
{
|
||||
std::string key = providerManager->provider_->FindKey(time);
|
||||
|
||||
if (!key.empty())
|
||||
{
|
||||
nexradFile = providerManager->provider_->LoadObjectByKey(key);
|
||||
}
|
||||
else
|
||||
nexradFile = providerManager->provider_->LoadObjectByTime(time);
|
||||
if (nexradFile == nullptr)
|
||||
{
|
||||
logger_->warn("Attempting to load object without key: {}",
|
||||
scwx::util::TimeString(time));
|
||||
|
|
@ -979,7 +1017,7 @@ void RadarProductManager::LoadLevel2Data(
|
|||
std::chrono::system_clock::time_point time,
|
||||
const std::shared_ptr<request::NexradFileRequest>& request)
|
||||
{
|
||||
logger_->debug("LoadLevel2Data: {}", scwx::util::TimeString(time));
|
||||
logger_->trace("LoadLevel2Data: {}", scwx::util::TimeString(time));
|
||||
|
||||
p->LoadProviderData(time,
|
||||
p->level2ProviderManager_,
|
||||
|
|
@ -1163,6 +1201,10 @@ void RadarProductManagerImpl::PopulateLevel2ProductTimes(
|
|||
level2ProductRecords_,
|
||||
level2ProductRecordMutex_,
|
||||
time);
|
||||
PopulateProductTimes(level2ChunksProviderManager_,
|
||||
level2ProductRecords_,
|
||||
level2ProductRecordMutex_,
|
||||
time);
|
||||
}
|
||||
|
||||
void RadarProductManagerImpl::PopulateLevel3ProductTimes(
|
||||
|
|
@ -1399,7 +1441,7 @@ std::shared_ptr<types::RadarProductRecord>
|
|||
RadarProductManagerImpl::StoreRadarProductRecord(
|
||||
std::shared_ptr<types::RadarProductRecord> record)
|
||||
{
|
||||
logger_->debug("StoreRadarProductRecord()");
|
||||
logger_->trace("StoreRadarProductRecord()");
|
||||
|
||||
std::shared_ptr<types::RadarProductRecord> storedRecord = nullptr;
|
||||
|
||||
|
|
@ -1418,7 +1460,7 @@ RadarProductManagerImpl::StoreRadarProductRecord(
|
|||
|
||||
if (storedRecord != nullptr)
|
||||
{
|
||||
logger_->debug(
|
||||
logger_->error(
|
||||
"Level 2 product previously loaded, loading from cache");
|
||||
}
|
||||
}
|
||||
|
|
@ -1503,15 +1545,56 @@ RadarProductManager::GetLevel2Data(wsr88d::rda::DataBlockType dataBlockType,
|
|||
std::vector<float> elevationCuts {};
|
||||
std::chrono::system_clock::time_point foundTime {};
|
||||
|
||||
auto records = p->GetLevel2ProductRecords(time);
|
||||
const bool isEpox = time == std::chrono::system_clock::time_point {};
|
||||
bool needArchive = true;
|
||||
static const auto maxChunkDelay = std::chrono::minutes(10);
|
||||
const std::chrono::system_clock::time_point firstValidChunkTime =
|
||||
(isEpox ? std::chrono::system_clock::now() : time) - maxChunkDelay;
|
||||
|
||||
// See if we have this one in the chunk provider.
|
||||
auto chunkFile = std::dynamic_pointer_cast<wsr88d::Ar2vFile>(
|
||||
p->level2ChunksProviderManager_->provider_->LoadObjectByTime(time));
|
||||
if (chunkFile != nullptr)
|
||||
{
|
||||
std::tie(radarData, elevationCut, elevationCuts) =
|
||||
chunkFile->GetElevationScan(dataBlockType, elevation, time);
|
||||
|
||||
if (radarData != nullptr)
|
||||
{
|
||||
auto& radarData0 = (*radarData)[0];
|
||||
foundTime = std::chrono::floor<std::chrono::seconds>(
|
||||
scwx::util::TimePoint(radarData0->modified_julian_date(),
|
||||
radarData0->collection_time()));
|
||||
|
||||
const std::optional<float> incomingElevation =
|
||||
std::dynamic_pointer_cast<provider::AwsLevel2ChunksDataProvider>(
|
||||
p->level2ChunksProviderManager_->provider_)
|
||||
->GetCurrentElevation();
|
||||
if (incomingElevation != p->incomingLevel2Elevation_)
|
||||
{
|
||||
p->incomingLevel2Elevation_ = incomingElevation;
|
||||
Q_EMIT IncomingLevel2ElevationChanged(incomingElevation);
|
||||
}
|
||||
|
||||
if (foundTime >= firstValidChunkTime)
|
||||
{
|
||||
needArchive = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// It is not in the chunk provider, so get it from the archive
|
||||
if (needArchive)
|
||||
{
|
||||
auto records = p->GetLevel2ProductRecords(time);
|
||||
for (auto& recordPair : records)
|
||||
{
|
||||
auto& record = recordPair.second;
|
||||
|
||||
if (record != nullptr)
|
||||
{
|
||||
std::shared_ptr<wsr88d::rda::ElevationScan> recordRadarData = nullptr;
|
||||
std::shared_ptr<wsr88d::rda::ElevationScan> recordRadarData =
|
||||
nullptr;
|
||||
float recordElevationCut = 0.0f;
|
||||
std::vector<float> recordElevationCuts;
|
||||
|
||||
|
|
@ -1528,12 +1611,21 @@ RadarProductManager::GetLevel2Data(wsr88d::rda::DataBlockType dataBlockType,
|
|||
|
||||
// Find the newest radar data, not newer than the selected time
|
||||
if (radarData == nullptr ||
|
||||
(collectionTime <= time && foundTime < collectionTime))
|
||||
(collectionTime <= time && foundTime < collectionTime) ||
|
||||
(isEpox && foundTime < collectionTime))
|
||||
{
|
||||
radarData = recordRadarData;
|
||||
elevationCut = recordElevationCut;
|
||||
elevationCuts = std::move(recordElevationCuts);
|
||||
foundTime = collectionTime;
|
||||
|
||||
if (!p->incomingLevel2Elevation_.has_value())
|
||||
{
|
||||
p->incomingLevel2Elevation_ = {};
|
||||
Q_EMIT IncomingLevel2ElevationChanged(
|
||||
p->incomingLevel2Elevation_);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,8 +44,9 @@ public:
|
|||
[[nodiscard]] const std::vector<float>&
|
||||
coordinates(common::RadialSize radialSize, bool smoothingEnabled) const;
|
||||
[[nodiscard]] const scwx::util::time_zone* default_time_zone() const;
|
||||
[[nodiscard]] bool is_tdwr() const;
|
||||
[[nodiscard]] float gate_size() const;
|
||||
[[nodiscard]] std::optional<float> incoming_level_2_elevation() const;
|
||||
[[nodiscard]] bool is_tdwr() const;
|
||||
[[nodiscard]] std::string radar_id() const;
|
||||
[[nodiscard]] std::shared_ptr<config::RadarSite> radar_site() const;
|
||||
|
||||
|
|
@ -147,7 +148,9 @@ signals:
|
|||
void Level3ProductsChanged();
|
||||
void NewDataAvailable(common::RadarProductGroup group,
|
||||
const std::string& product,
|
||||
bool isChunks,
|
||||
std::chrono::system_clock::time_point latestTime);
|
||||
void IncomingLevel2ElevationChanged(std::optional<float> incomingElevation);
|
||||
|
||||
private:
|
||||
std::unique_ptr<RadarProductManagerImpl> p;
|
||||
|
|
|
|||
|
|
@ -656,6 +656,11 @@ std::vector<float> MapWidget::GetElevationCuts() const
|
|||
}
|
||||
}
|
||||
|
||||
std::optional<float> MapWidget::GetIncomingLevel2Elevation() const
|
||||
{
|
||||
return p->radarProductManager_->incoming_level_2_elevation();
|
||||
}
|
||||
|
||||
common::Level2Product
|
||||
MapWidgetImpl::GetLevel2ProductOrDefault(const std::string& productName) const
|
||||
{
|
||||
|
|
@ -1796,6 +1801,14 @@ void MapWidgetImpl::RadarProductManagerConnect()
|
|||
{
|
||||
if (radarProductManager_ != nullptr)
|
||||
{
|
||||
connect(radarProductManager_.get(),
|
||||
&manager::RadarProductManager::IncomingLevel2ElevationChanged,
|
||||
this,
|
||||
[this](std::optional<float> incomingElevation)
|
||||
{
|
||||
Q_EMIT widget_->IncomingLevel2ElevationChanged(
|
||||
incomingElevation);
|
||||
});
|
||||
connect(radarProductManager_.get(),
|
||||
&manager::RadarProductManager::Level3ProductsChanged,
|
||||
this,
|
||||
|
|
@ -1830,15 +1843,24 @@ void MapWidgetImpl::RadarProductManagerConnect()
|
|||
this,
|
||||
[this](common::RadarProductGroup group,
|
||||
const std::string& product,
|
||||
bool isChunks,
|
||||
std::chrono::system_clock::time_point latestTime)
|
||||
{
|
||||
if (autoRefreshEnabled_ &&
|
||||
context_->radar_product_group() == group &&
|
||||
(group == common::RadarProductGroup::Level2 ||
|
||||
context_->radar_product() == product))
|
||||
{
|
||||
if (isChunks && autoUpdateEnabled_)
|
||||
{
|
||||
// Level 2 products may have multiple time points,
|
||||
// ensure the latest is selected
|
||||
widget_->SelectRadarProduct(group, product);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Create file request
|
||||
std::shared_ptr<request::NexradFileRequest> request =
|
||||
const std::shared_ptr<request::NexradFileRequest> request =
|
||||
std::make_shared<request::NexradFileRequest>(
|
||||
radarProductManager_->radar_id());
|
||||
|
||||
|
|
@ -1849,8 +1871,9 @@ void MapWidgetImpl::RadarProductManagerConnect()
|
|||
request.get(),
|
||||
&request::NexradFileRequest::RequestComplete,
|
||||
this,
|
||||
[=,
|
||||
this](std::shared_ptr<request::NexradFileRequest> request)
|
||||
[group, product, this](
|
||||
const std::shared_ptr<request::NexradFileRequest>&
|
||||
request)
|
||||
{
|
||||
// Select loaded record
|
||||
auto record = request->radar_product_record();
|
||||
|
|
@ -1867,8 +1890,8 @@ void MapWidgetImpl::RadarProductManagerConnect()
|
|||
{
|
||||
if (group == common::RadarProductGroup::Level2)
|
||||
{
|
||||
// Level 2 products may have multiple time points,
|
||||
// ensure the latest is selected
|
||||
// Level 2 products may have multiple time
|
||||
// points, ensure the latest is selected
|
||||
widget_->SelectRadarProduct(group, product);
|
||||
}
|
||||
else
|
||||
|
|
@ -1882,7 +1905,7 @@ void MapWidgetImpl::RadarProductManagerConnect()
|
|||
// Load file
|
||||
boost::asio::post(
|
||||
threadPool_,
|
||||
[=, this]()
|
||||
[group, latestTime, request, product, this]()
|
||||
{
|
||||
try
|
||||
{
|
||||
|
|
@ -1903,6 +1926,7 @@ void MapWidgetImpl::RadarProductManagerConnect()
|
|||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
},
|
||||
Qt::QueuedConnection);
|
||||
}
|
||||
|
|
@ -1916,6 +1940,10 @@ void MapWidgetImpl::RadarProductManagerDisconnect()
|
|||
&manager::RadarProductManager::NewDataAvailable,
|
||||
this,
|
||||
nullptr);
|
||||
disconnect(radarProductManager_.get(),
|
||||
&manager::RadarProductManager::IncomingLevel2ElevationChanged,
|
||||
this,
|
||||
nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ public:
|
|||
GetAvailableLevel3Categories();
|
||||
[[nodiscard]] std::optional<float> GetElevation() const;
|
||||
[[nodiscard]] std::vector<float> GetElevationCuts() const;
|
||||
[[nodiscard]] std::optional<float> GetIncomingLevel2Elevation() const;
|
||||
[[nodiscard]] std::vector<std::string> GetLevel3Products();
|
||||
[[nodiscard]] std::string GetMapStyle() const;
|
||||
[[nodiscard]] common::RadarProductGroup GetRadarProductGroup() const;
|
||||
|
|
@ -184,6 +185,7 @@ signals:
|
|||
void RadarSweepUpdated();
|
||||
void RadarSweepNotUpdated(types::NoUpdateReason reason);
|
||||
void WidgetPainted();
|
||||
void IncomingLevel2ElevationChanged(std::optional<float> incomingElevation);
|
||||
};
|
||||
|
||||
} // namespace map
|
||||
|
|
|
|||
|
|
@ -159,8 +159,6 @@ void RadarProductLayer::Initialize()
|
|||
|
||||
void RadarProductLayer::UpdateSweep()
|
||||
{
|
||||
logger_->debug("UpdateSweep()");
|
||||
|
||||
gl::OpenGLFunctions& gl = context()->gl();
|
||||
|
||||
boost::timer::cpu_timer timer;
|
||||
|
|
@ -172,9 +170,10 @@ void RadarProductLayer::UpdateSweep()
|
|||
std::try_to_lock);
|
||||
if (!sweepLock.owns_lock())
|
||||
{
|
||||
logger_->debug("Sweep locked, deferring update");
|
||||
logger_->trace("Sweep locked, deferring update");
|
||||
return;
|
||||
}
|
||||
logger_->debug("UpdateSweep()");
|
||||
|
||||
p->sweepNeedsUpdate_ = false;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
#include <qlabel.h>
|
||||
#include <scwx/qt/ui/level2_settings_widget.hpp>
|
||||
#include <scwx/qt/ui/flow_layout.hpp>
|
||||
#include <scwx/qt/manager/hotkey_manager.hpp>
|
||||
|
|
@ -29,16 +30,15 @@ public:
|
|||
explicit Level2SettingsWidgetImpl(Level2SettingsWidget* self) :
|
||||
self_ {self},
|
||||
layout_ {new QVBoxLayout(self)},
|
||||
elevationGroupBox_ {},
|
||||
elevationButtons_ {},
|
||||
elevationCuts_ {},
|
||||
elevationButtonsChanged_ {false},
|
||||
resizeElevationButtons_ {false},
|
||||
settingsGroupBox_ {},
|
||||
declutterCheckBox_ {}
|
||||
elevationCuts_ {}
|
||||
{
|
||||
// NOLINTBEGIN(cppcoreguidelines-owning-memory) Qt takes care of this
|
||||
layout_->setContentsMargins(0, 0, 0, 0);
|
||||
|
||||
incomingElevationLabel_ = new QLabel("", self);
|
||||
layout_->addWidget(incomingElevationLabel_);
|
||||
|
||||
elevationGroupBox_ = new QGroupBox(tr("Elevation"), self);
|
||||
new ui::FlowLayout(elevationGroupBox_);
|
||||
layout_->addWidget(elevationGroupBox_);
|
||||
|
|
@ -51,6 +51,7 @@ public:
|
|||
settingsLayout->addWidget(declutterCheckBox_);
|
||||
|
||||
settingsGroupBox_->setVisible(false);
|
||||
// NOLINTEND(cppcoreguidelines-owning-memory) Qt takes care of this
|
||||
|
||||
QObject::connect(hotkeyManager_.get(),
|
||||
&manager::HotkeyManager::HotkeyPressed,
|
||||
|
|
@ -66,14 +67,15 @@ public:
|
|||
Level2SettingsWidget* self_;
|
||||
QLayout* layout_;
|
||||
|
||||
QGroupBox* elevationGroupBox_;
|
||||
QGroupBox* elevationGroupBox_ {};
|
||||
QLabel* incomingElevationLabel_ {};
|
||||
std::list<QToolButton*> elevationButtons_;
|
||||
std::vector<float> elevationCuts_;
|
||||
bool elevationButtonsChanged_;
|
||||
bool resizeElevationButtons_;
|
||||
bool elevationButtonsChanged_ {};
|
||||
bool resizeElevationButtons_ {};
|
||||
|
||||
QGroupBox* settingsGroupBox_;
|
||||
QCheckBox* declutterCheckBox_;
|
||||
QGroupBox* settingsGroupBox_ {};
|
||||
QCheckBox* declutterCheckBox_ {};
|
||||
|
||||
float currentElevation_ {};
|
||||
QToolButton* currentElevationButton_ {nullptr};
|
||||
|
|
@ -240,12 +242,29 @@ void Level2SettingsWidget::UpdateElevationSelection(float elevation)
|
|||
p->currentElevationButton_ = newElevationButton;
|
||||
}
|
||||
|
||||
void Level2SettingsWidget::UpdateIncomingElevation(
|
||||
std::optional<float> incomingElevation)
|
||||
{
|
||||
if (incomingElevation.has_value())
|
||||
{
|
||||
p->incomingElevationLabel_->setText(
|
||||
"Incoming Elevation: " + QString::number(*incomingElevation, 'f', 1) +
|
||||
common::Characters::DEGREE);
|
||||
}
|
||||
else
|
||||
{
|
||||
p->incomingElevationLabel_->setText("Incoming Elevation: None");
|
||||
}
|
||||
}
|
||||
|
||||
void Level2SettingsWidget::UpdateSettings(map::MapWidget* activeMap)
|
||||
{
|
||||
std::optional<float> currentElevationOption = activeMap->GetElevation();
|
||||
const float currentElevation =
|
||||
currentElevationOption.has_value() ? *currentElevationOption : 0.0f;
|
||||
std::vector<float> elevationCuts = activeMap->GetElevationCuts();
|
||||
const std::vector<float> elevationCuts = activeMap->GetElevationCuts();
|
||||
const std::optional<float> incomingElevation =
|
||||
activeMap->GetIncomingLevel2Elevation();
|
||||
|
||||
if (p->elevationCuts_ != elevationCuts)
|
||||
{
|
||||
|
|
@ -279,6 +298,7 @@ void Level2SettingsWidget::UpdateSettings(map::MapWidget* activeMap)
|
|||
}
|
||||
|
||||
UpdateElevationSelection(currentElevation);
|
||||
UpdateIncomingElevation(incomingElevation);
|
||||
}
|
||||
|
||||
} // namespace ui
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
#include <scwx/qt/map/map_widget.hpp>
|
||||
|
||||
#include <optional>
|
||||
|
||||
namespace scwx
|
||||
{
|
||||
namespace qt
|
||||
|
|
@ -23,6 +25,7 @@ public:
|
|||
void showEvent(QShowEvent* event) override;
|
||||
|
||||
void UpdateElevationSelection(float elevation);
|
||||
void UpdateIncomingElevation(std::optional<float> incomingElevation);
|
||||
void UpdateSettings(map::MapWidget* activeMap);
|
||||
|
||||
signals:
|
||||
|
|
|
|||
|
|
@ -561,7 +561,7 @@ void Level2ProductView::ComputeSweep()
|
|||
Q_EMIT SweepNotComputed(types::NoUpdateReason::NotLoaded);
|
||||
return;
|
||||
}
|
||||
if (radarData == p->elevationScan_ &&
|
||||
if ((radarData == p->elevationScan_) &&
|
||||
smoothingEnabled == p->lastSmoothingEnabled_ &&
|
||||
(showSmoothedRangeFolding == p->lastShowSmoothedRangeFolding_ ||
|
||||
!smoothingEnabled))
|
||||
|
|
|
|||
|
|
@ -118,6 +118,7 @@ void OverlayProductView::Impl::ConnectRadarProductManager()
|
|||
self_,
|
||||
[this](common::RadarProductGroup group,
|
||||
const std::string& product,
|
||||
bool /*isChunks*/,
|
||||
std::chrono::system_clock::time_point latestTime)
|
||||
{
|
||||
if (autoRefreshEnabled_ &&
|
||||
|
|
|
|||
|
|
@ -0,0 +1,71 @@
|
|||
#pragma once
|
||||
|
||||
#include <scwx/provider/nexrad_data_provider.hpp>
|
||||
#include <scwx/provider/aws_level2_data_provider.hpp>
|
||||
|
||||
#include <optional>
|
||||
|
||||
namespace Aws::S3
|
||||
{
|
||||
class S3Client;
|
||||
} // namespace Aws::S3
|
||||
|
||||
namespace scwx::provider
|
||||
{
|
||||
|
||||
/**
|
||||
* @brief AWS Level 2 Data Provider
|
||||
*/
|
||||
class AwsLevel2ChunksDataProvider : public NexradDataProvider
|
||||
{
|
||||
public:
|
||||
explicit AwsLevel2ChunksDataProvider(const std::string& radarSite);
|
||||
explicit AwsLevel2ChunksDataProvider(const std::string& radarSite,
|
||||
const std::string& bucketName,
|
||||
const std::string& region);
|
||||
~AwsLevel2ChunksDataProvider() override;
|
||||
|
||||
AwsLevel2ChunksDataProvider(const AwsLevel2ChunksDataProvider&) = delete;
|
||||
AwsLevel2ChunksDataProvider&
|
||||
operator=(const AwsLevel2ChunksDataProvider&) = delete;
|
||||
|
||||
AwsLevel2ChunksDataProvider(AwsLevel2ChunksDataProvider&&) noexcept;
|
||||
AwsLevel2ChunksDataProvider&
|
||||
operator=(AwsLevel2ChunksDataProvider&&) noexcept;
|
||||
|
||||
[[nodiscard]] std::chrono::system_clock::time_point
|
||||
GetTimePointByKey(const std::string& key) const override;
|
||||
|
||||
[[nodiscard]] size_t cache_size() const override;
|
||||
|
||||
[[nodiscard]] std::chrono::system_clock::time_point
|
||||
last_modified() const override;
|
||||
[[nodiscard]] std::chrono::seconds update_period() const override;
|
||||
|
||||
std::string FindKey(std::chrono::system_clock::time_point time) override;
|
||||
std::string FindLatestKey() override;
|
||||
std::chrono::system_clock::time_point FindLatestTime() override;
|
||||
std::vector<std::chrono::system_clock::time_point>
|
||||
GetTimePointsByDate(std::chrono::system_clock::time_point date) override;
|
||||
std::tuple<bool, size_t, size_t>
|
||||
ListObjects(std::chrono::system_clock::time_point date) override;
|
||||
std::shared_ptr<wsr88d::NexradFile>
|
||||
LoadObjectByKey(const std::string& key) override;
|
||||
std::shared_ptr<wsr88d::NexradFile>
|
||||
LoadObjectByTime(std::chrono::system_clock::time_point time) override;
|
||||
std::pair<size_t, size_t> Refresh() override;
|
||||
|
||||
void RequestAvailableProducts() override;
|
||||
std::vector<std::string> GetAvailableProducts() override;
|
||||
|
||||
std::optional<float> GetCurrentElevation();
|
||||
|
||||
void SetLevel2DataProvider(
|
||||
const std::shared_ptr<AwsLevel2DataProvider>& provider);
|
||||
|
||||
private:
|
||||
class Impl;
|
||||
std::unique_ptr<Impl> p;
|
||||
};
|
||||
|
||||
} // namespace scwx::provider
|
||||
|
|
@ -39,12 +39,15 @@ public:
|
|||
|
||||
std::string FindKey(std::chrono::system_clock::time_point time) override;
|
||||
std::string FindLatestKey() override;
|
||||
std::chrono::system_clock::time_point FindLatestTime() override;
|
||||
std::vector<std::chrono::system_clock::time_point>
|
||||
GetTimePointsByDate(std::chrono::system_clock::time_point date) override;
|
||||
std::tuple<bool, size_t, size_t>
|
||||
ListObjects(std::chrono::system_clock::time_point date) override;
|
||||
std::shared_ptr<wsr88d::NexradFile>
|
||||
LoadObjectByKey(const std::string& key) override;
|
||||
std::shared_ptr<wsr88d::NexradFile>
|
||||
LoadObjectByTime(std::chrono::system_clock::time_point time) override;
|
||||
std::pair<size_t, size_t> Refresh() override;
|
||||
|
||||
protected:
|
||||
|
|
|
|||
|
|
@ -59,6 +59,13 @@ public:
|
|||
*/
|
||||
virtual std::string FindLatestKey() = 0;
|
||||
|
||||
/**
|
||||
* Finds the most recent time in the cache.
|
||||
*
|
||||
* @return NEXRAD data key
|
||||
*/
|
||||
virtual std::chrono::system_clock::time_point FindLatestTime() = 0;
|
||||
|
||||
/**
|
||||
* Lists NEXRAD objects for the date supplied, and adds them to the cache.
|
||||
*
|
||||
|
|
@ -81,6 +88,16 @@ public:
|
|||
virtual std::shared_ptr<wsr88d::NexradFile>
|
||||
LoadObjectByKey(const std::string& key) = 0;
|
||||
|
||||
/**
|
||||
* Loads a NEXRAD file object at the given time
|
||||
*
|
||||
* @param time NEXRAD time
|
||||
*
|
||||
* @return NEXRAD data
|
||||
*/
|
||||
virtual std::shared_ptr<wsr88d::NexradFile>
|
||||
LoadObjectByTime(std::chrono::system_clock::time_point time) = 0;
|
||||
|
||||
/**
|
||||
* Lists NEXRAD objects for the current date, and adds them to the cache. If
|
||||
* no objects have been added to the cache for the current date, the previous
|
||||
|
|
|
|||
|
|
@ -27,6 +27,9 @@ public:
|
|||
static std::shared_ptr<NexradDataProvider>
|
||||
CreateLevel2DataProvider(const std::string& radarSite);
|
||||
|
||||
static std::shared_ptr<NexradDataProvider>
|
||||
CreateLevel2ChunksDataProvider(const std::string& radarSite);
|
||||
|
||||
static std::shared_ptr<NexradDataProvider>
|
||||
CreateLevel3DataProvider(const std::string& radarSite,
|
||||
const std::string& product);
|
||||
|
|
|
|||
|
|
@ -32,6 +32,9 @@ public:
|
|||
Ar2vFile(Ar2vFile&&) noexcept;
|
||||
Ar2vFile& operator=(Ar2vFile&&) noexcept;
|
||||
|
||||
Ar2vFile(const std::shared_ptr<Ar2vFile>& current,
|
||||
const std::shared_ptr<Ar2vFile>& last);
|
||||
|
||||
std::uint32_t julian_date() const;
|
||||
std::uint32_t milliseconds() const;
|
||||
std::string icao() const;
|
||||
|
|
@ -53,6 +56,9 @@ public:
|
|||
bool LoadFile(const std::string& filename);
|
||||
bool LoadData(std::istream& is);
|
||||
|
||||
bool LoadLDMRecords(std::istream& is);
|
||||
bool IndexFile();
|
||||
|
||||
private:
|
||||
std::unique_ptr<Ar2vFileImpl> p;
|
||||
};
|
||||
|
|
|
|||
783
wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp
Normal file
783
wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp
Normal file
|
|
@ -0,0 +1,783 @@
|
|||
#include "scwx/wsr88d/rda/digital_radar_data.hpp"
|
||||
#include <scwx/provider/aws_level2_chunks_data_provider.hpp>
|
||||
#include <scwx/util/environment.hpp>
|
||||
#include <scwx/util/map.hpp>
|
||||
#include <scwx/util/logger.hpp>
|
||||
#include <scwx/util/time.hpp>
|
||||
#include <scwx/wsr88d/ar2v_file.hpp>
|
||||
|
||||
#include <shared_mutex>
|
||||
#include <utility>
|
||||
|
||||
#include <aws/core/auth/AWSCredentials.h>
|
||||
#include <aws/s3/S3Client.h>
|
||||
#include <aws/s3/model/GetObjectRequest.h>
|
||||
#include <aws/s3/model/ListObjectsV2Request.h>
|
||||
#include <fmt/chrono.h>
|
||||
#include <fmt/format.h>
|
||||
|
||||
// Avoid circular refrence errors in boost
|
||||
// NOLINTBEGIN(misc-header-include-cycle)
|
||||
#if defined(_MSC_VER)
|
||||
# pragma warning(push, 0)
|
||||
#endif
|
||||
|
||||
#include <boost/asio/thread_pool.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/timer/timer.hpp>
|
||||
|
||||
#if defined(_MSC_VER)
|
||||
# pragma warning(pop)
|
||||
#endif
|
||||
// NOLINTEND(misc-header-include-cycle)
|
||||
|
||||
#if (__cpp_lib_chrono < 201907L)
|
||||
# include <date/date.h>
|
||||
#endif
|
||||
|
||||
namespace scwx::provider
|
||||
{
|
||||
static const std::string logPrefix_ =
|
||||
"scwx::provider::aws_level2_chunks_data_provider";
|
||||
static const auto logger_ = util::Logger::Create(logPrefix_);
|
||||
|
||||
static const std::string kDefaultBucketName_ = "unidata-nexrad-level2-chunks";
|
||||
static const std::string kDefaultRegion_ = "us-east-1";
|
||||
|
||||
class AwsLevel2ChunksDataProvider::Impl
|
||||
{
|
||||
public:
|
||||
struct ScanRecord
|
||||
{
|
||||
explicit ScanRecord(std::string prefix, bool valid = true) :
|
||||
valid_ {valid},
|
||||
prefix_ {std::move(prefix)},
|
||||
nexradFile_ {},
|
||||
lastModified_ {},
|
||||
lastKey_ {""}
|
||||
{
|
||||
}
|
||||
~ScanRecord() = default;
|
||||
ScanRecord(const ScanRecord&) = default;
|
||||
ScanRecord(ScanRecord&&) = default;
|
||||
ScanRecord& operator=(const ScanRecord&) = default;
|
||||
ScanRecord& operator=(ScanRecord&&) = default;
|
||||
|
||||
bool valid_;
|
||||
std::string prefix_;
|
||||
std::shared_ptr<wsr88d::Ar2vFile> nexradFile_;
|
||||
std::chrono::system_clock::time_point time_;
|
||||
std::chrono::system_clock::time_point lastModified_;
|
||||
std::chrono::system_clock::time_point secondLastModified_;
|
||||
std::string lastKey_;
|
||||
int nextFile_ {1};
|
||||
bool hasAllFiles_ {false};
|
||||
};
|
||||
|
||||
explicit Impl(AwsLevel2ChunksDataProvider* self,
|
||||
std::string radarSite,
|
||||
std::string bucketName,
|
||||
std::string region) :
|
||||
radarSite_ {std::move(radarSite)},
|
||||
bucketName_ {std::move(bucketName)},
|
||||
region_ {std::move(region)},
|
||||
client_ {nullptr},
|
||||
scanTimes_ {},
|
||||
lastScan_ {"", false},
|
||||
currentScan_ {"", false},
|
||||
scansMutex_ {},
|
||||
lastTimeListed_ {},
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers) about average
|
||||
updatePeriod_ {7},
|
||||
level2DataProvider_ {},
|
||||
self_ {self}
|
||||
{
|
||||
// Disable HTTP request for region
|
||||
util::SetEnvironment("AWS_EC2_METADATA_DISABLED", "true");
|
||||
|
||||
// Use anonymous credentials
|
||||
const Aws::Auth::AWSCredentials credentials {};
|
||||
|
||||
Aws::Client::ClientConfiguration config;
|
||||
config.region = region_;
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers) arbitrary
|
||||
config.connectTimeoutMs = 10000;
|
||||
|
||||
client_ = std::make_shared<Aws::S3::S3Client>(
|
||||
credentials,
|
||||
Aws::MakeShared<Aws::S3::S3EndpointProvider>(
|
||||
Aws::S3::S3Client::GetAllocationTag()),
|
||||
config);
|
||||
}
|
||||
~Impl() = default;
|
||||
Impl(const Impl&) = delete;
|
||||
Impl(Impl&&) = delete;
|
||||
Impl& operator=(const Impl&) = delete;
|
||||
Impl& operator=(Impl&&) = delete;
|
||||
|
||||
std::chrono::system_clock::time_point GetScanTime(const std::string& prefix);
|
||||
int GetScanNumber(const std::string& prefix);
|
||||
|
||||
bool LoadScan(Impl::ScanRecord& scanRecord);
|
||||
std::tuple<bool, size_t, size_t> ListObjects();
|
||||
|
||||
std::string radarSite_;
|
||||
std::string bucketName_;
|
||||
std::string region_;
|
||||
std::shared_ptr<Aws::S3::S3Client> client_;
|
||||
|
||||
std::mutex refreshMutex_;
|
||||
|
||||
std::unordered_map<std::string, std::chrono::system_clock::time_point>
|
||||
scanTimes_;
|
||||
ScanRecord lastScan_;
|
||||
ScanRecord currentScan_;
|
||||
std::shared_mutex scansMutex_;
|
||||
std::chrono::system_clock::time_point lastTimeListed_;
|
||||
|
||||
std::chrono::seconds updatePeriod_;
|
||||
|
||||
std::weak_ptr<AwsLevel2DataProvider> level2DataProvider_;
|
||||
|
||||
AwsLevel2ChunksDataProvider* self_;
|
||||
};
|
||||
|
||||
AwsLevel2ChunksDataProvider::AwsLevel2ChunksDataProvider(
|
||||
const std::string& radarSite) :
|
||||
AwsLevel2ChunksDataProvider(radarSite, kDefaultBucketName_, kDefaultRegion_)
|
||||
{
|
||||
}
|
||||
|
||||
AwsLevel2ChunksDataProvider::AwsLevel2ChunksDataProvider(
|
||||
const std::string& radarSite,
|
||||
const std::string& bucketName,
|
||||
const std::string& region) :
|
||||
p(std::make_unique<Impl>(this, radarSite, bucketName, region))
|
||||
{
|
||||
}
|
||||
|
||||
AwsLevel2ChunksDataProvider::~AwsLevel2ChunksDataProvider() = default;
|
||||
|
||||
std::chrono::system_clock::time_point
|
||||
AwsLevel2ChunksDataProvider::GetTimePointByKey(const std::string& key) const
|
||||
{
|
||||
std::chrono::system_clock::time_point time {};
|
||||
|
||||
const size_t lastSeparator = key.rfind('/');
|
||||
const size_t offset =
|
||||
(lastSeparator == std::string::npos) ? 0 : lastSeparator + 1;
|
||||
|
||||
// Filename format is YYYYMMDD-TTTTTT-AAA-B
|
||||
static const size_t formatSize = std::string("YYYYMMDD-TTTTTT").size();
|
||||
|
||||
if (key.size() >= offset + formatSize)
|
||||
{
|
||||
using namespace std::chrono;
|
||||
|
||||
#if (__cpp_lib_chrono < 201907L)
|
||||
using namespace date;
|
||||
#endif
|
||||
|
||||
static const std::string timeFormat {"%Y%m%d-%H%M%S"};
|
||||
|
||||
std::string timeStr {key.substr(offset, formatSize)};
|
||||
std::istringstream in {timeStr};
|
||||
in >> parse(timeFormat, time);
|
||||
|
||||
if (in.fail())
|
||||
{
|
||||
logger_->warn("Invalid time: \"{}\"", timeStr);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
logger_->warn("Time not parsable from key: \"{}\"", key);
|
||||
}
|
||||
|
||||
return time;
|
||||
}
|
||||
|
||||
size_t AwsLevel2ChunksDataProvider::cache_size() const
|
||||
{
|
||||
return 2;
|
||||
}
|
||||
|
||||
std::chrono::system_clock::time_point
|
||||
AwsLevel2ChunksDataProvider::last_modified() const
|
||||
{
|
||||
// There is a slight delay between the "modified time" and when it is
|
||||
// actually available. Radar product manager uses this as available time
|
||||
static const auto extra = std::chrono::seconds(2);
|
||||
|
||||
const std::shared_lock lock(p->scansMutex_);
|
||||
if (p->currentScan_.valid_ && p->currentScan_.lastModified_ !=
|
||||
std::chrono::system_clock::time_point {})
|
||||
{
|
||||
return p->currentScan_.lastModified_ + extra;
|
||||
}
|
||||
else if (p->lastScan_.valid_ && p->lastScan_.lastModified_ !=
|
||||
std::chrono::system_clock::time_point {})
|
||||
{
|
||||
return p->lastScan_.lastModified_ + extra;
|
||||
}
|
||||
else
|
||||
{
|
||||
return {};
|
||||
}
|
||||
}
|
||||
std::chrono::seconds AwsLevel2ChunksDataProvider::update_period() const
|
||||
{
|
||||
const std::shared_lock lock(p->scansMutex_);
|
||||
// get update period from time between chunks
|
||||
if (p->currentScan_.valid_ && p->currentScan_.nextFile_ > 2)
|
||||
{
|
||||
auto delta =
|
||||
p->currentScan_.lastModified_ - p->currentScan_.secondLastModified_;
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(delta);
|
||||
}
|
||||
else if (p->lastScan_.valid_ && p->lastScan_.nextFile_ > 2)
|
||||
{
|
||||
auto delta =
|
||||
p->lastScan_.lastModified_ - p->lastScan_.secondLastModified_;
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(delta);
|
||||
}
|
||||
|
||||
// default to a set update period
|
||||
return p->updatePeriod_;
|
||||
}
|
||||
|
||||
std::string
|
||||
AwsLevel2ChunksDataProvider::FindKey(std::chrono::system_clock::time_point time)
|
||||
{
|
||||
logger_->debug("FindKey: {}", util::TimeString(time));
|
||||
|
||||
const std::shared_lock lock(p->scansMutex_);
|
||||
if (p->currentScan_.valid_ && time >= p->currentScan_.time_)
|
||||
{
|
||||
return p->currentScan_.prefix_;
|
||||
}
|
||||
else if (p->lastScan_.valid_ && time >= p->lastScan_.time_)
|
||||
{
|
||||
return p->lastScan_.prefix_;
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
std::string AwsLevel2ChunksDataProvider::FindLatestKey()
|
||||
{
|
||||
const std::shared_lock lock(p->scansMutex_);
|
||||
if (!p->currentScan_.valid_)
|
||||
{
|
||||
return "";
|
||||
}
|
||||
|
||||
return p->currentScan_.prefix_;
|
||||
}
|
||||
|
||||
std::chrono::system_clock::time_point
|
||||
AwsLevel2ChunksDataProvider::FindLatestTime()
|
||||
{
|
||||
const std::shared_lock lock(p->scansMutex_);
|
||||
if (!p->currentScan_.valid_)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
return p->currentScan_.time_;
|
||||
}
|
||||
|
||||
std::vector<std::chrono::system_clock::time_point>
|
||||
AwsLevel2ChunksDataProvider::GetTimePointsByDate(
|
||||
std::chrono::system_clock::time_point /*date*/)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
std::chrono::system_clock::time_point
|
||||
AwsLevel2ChunksDataProvider::Impl::GetScanTime(const std::string& prefix)
|
||||
{
|
||||
using namespace std::chrono;
|
||||
const auto& scanTimeIt = scanTimes_.find(prefix); // O(log(n))
|
||||
if (scanTimeIt != scanTimes_.cend())
|
||||
{
|
||||
// If the time is greater than 2 hours ago, it may be a new scan
|
||||
auto replaceBy = system_clock::now() - hours {2};
|
||||
if (scanTimeIt->second > replaceBy)
|
||||
{
|
||||
return scanTimeIt->second;
|
||||
}
|
||||
}
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
request.SetBucket(bucketName_);
|
||||
request.SetPrefix(prefix);
|
||||
request.SetDelimiter("/");
|
||||
request.SetMaxKeys(1);
|
||||
|
||||
auto outcome = client_->ListObjectsV2(request);
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
auto timePoint = self_->GetTimePointByKey(
|
||||
outcome.GetResult().GetContents().at(0).GetKey());
|
||||
scanTimes_.insert_or_assign(prefix, timePoint);
|
||||
return timePoint;
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
std::tuple<bool, size_t, size_t>
|
||||
AwsLevel2ChunksDataProvider::Impl::ListObjects()
|
||||
{
|
||||
size_t newObjects = 0;
|
||||
const size_t totalObjects = 0;
|
||||
|
||||
const std::chrono::system_clock::time_point now =
|
||||
std::chrono::system_clock::now();
|
||||
|
||||
if (currentScan_.valid_ && !currentScan_.hasAllFiles_ &&
|
||||
lastTimeListed_ + std::chrono::minutes(2) > now)
|
||||
{
|
||||
return {true, newObjects, totalObjects};
|
||||
}
|
||||
logger_->trace("ListObjects");
|
||||
lastTimeListed_ = now;
|
||||
|
||||
const std::string prefix = radarSite_ + "/";
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
request.SetBucket(bucketName_);
|
||||
request.SetPrefix(prefix);
|
||||
request.SetDelimiter("/");
|
||||
|
||||
auto outcome = client_->ListObjectsV2(request);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
auto& scans = outcome.GetResult().GetCommonPrefixes();
|
||||
logger_->trace("Found {} scans", scans.size());
|
||||
|
||||
if (scans.size() > 0)
|
||||
{
|
||||
// find latest scan
|
||||
auto scanNumberMap = std::map<int, std::string>();
|
||||
|
||||
for (auto& scan : scans) // O(n log(n)) n <= 999
|
||||
{
|
||||
const std::string& scanPrefix = scan.GetPrefix();
|
||||
scanNumberMap.insert_or_assign(GetScanNumber(scanPrefix),
|
||||
scanPrefix);
|
||||
}
|
||||
|
||||
// Start with last scan
|
||||
int previousScanNumber = scanNumberMap.crbegin()->first;
|
||||
const int firstScanNumber = scanNumberMap.cbegin()->first;
|
||||
|
||||
// Look for a gap in scan numbers. This indicates that is the latest
|
||||
// scan.
|
||||
|
||||
auto possibleLastNumbers = std::unordered_set<int>();
|
||||
// This indicates that highest number scan may be the last scan
|
||||
// (including if there is only 1 scan)
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers)
|
||||
if (previousScanNumber != 999 || firstScanNumber != 1)
|
||||
{
|
||||
possibleLastNumbers.emplace(previousScanNumber);
|
||||
}
|
||||
// Have already checked scan with highest number, so skip first
|
||||
previousScanNumber = firstScanNumber;
|
||||
bool first = true;
|
||||
for (const auto& scan : scanNumberMap)
|
||||
{
|
||||
if (first)
|
||||
{
|
||||
first = false;
|
||||
continue;
|
||||
}
|
||||
if (scan.first != previousScanNumber + 1)
|
||||
{
|
||||
possibleLastNumbers.emplace(previousScanNumber);
|
||||
}
|
||||
previousScanNumber = scan.first;
|
||||
}
|
||||
|
||||
if (possibleLastNumbers.empty())
|
||||
{
|
||||
logger_->warn("Could not find last scan");
|
||||
return {false, 0, 0};
|
||||
}
|
||||
|
||||
int lastScanNumber = -1;
|
||||
std::chrono::system_clock::time_point lastScanTime = {};
|
||||
std::string lastScanPrefix;
|
||||
|
||||
for (const int scanNumber : possibleLastNumbers)
|
||||
{
|
||||
const std::string& scanPrefix = scanNumberMap.at(scanNumber);
|
||||
auto scanTime = GetScanTime(scanPrefix);
|
||||
if (scanTime > lastScanTime)
|
||||
{
|
||||
lastScanTime = scanTime;
|
||||
lastScanPrefix = scanPrefix;
|
||||
lastScanNumber = scanNumber;
|
||||
}
|
||||
}
|
||||
|
||||
const int secondLastScanNumber =
|
||||
// 999 is the last file possible
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers)
|
||||
lastScanNumber == 1 ? 999 : lastScanNumber - 1;
|
||||
|
||||
const auto& secondLastScanPrefix =
|
||||
scanNumberMap.find(secondLastScanNumber);
|
||||
|
||||
if (!currentScan_.valid_ || currentScan_.prefix_ != lastScanPrefix)
|
||||
{
|
||||
if (currentScan_.valid_ &&
|
||||
(secondLastScanPrefix == scanNumberMap.cend() ||
|
||||
currentScan_.prefix_ == secondLastScanPrefix->second))
|
||||
{
|
||||
lastScan_ = currentScan_;
|
||||
}
|
||||
else if (secondLastScanPrefix != scanNumberMap.cend())
|
||||
{
|
||||
lastScan_.valid_ = true;
|
||||
lastScan_.prefix_ = secondLastScanPrefix->second;
|
||||
lastScan_.nexradFile_ = nullptr;
|
||||
lastScan_.time_ = GetScanTime(secondLastScanPrefix->second);
|
||||
lastScan_.lastModified_ = {};
|
||||
lastScan_.secondLastModified_ = {};
|
||||
lastScan_.lastKey_ = "";
|
||||
lastScan_.nextFile_ = 1;
|
||||
lastScan_.hasAllFiles_ = false;
|
||||
newObjects += 1;
|
||||
}
|
||||
|
||||
currentScan_.valid_ = true;
|
||||
currentScan_.prefix_ = lastScanPrefix;
|
||||
currentScan_.nexradFile_ = nullptr;
|
||||
currentScan_.time_ = lastScanTime;
|
||||
currentScan_.lastModified_ = {};
|
||||
currentScan_.secondLastModified_ = {};
|
||||
currentScan_.lastKey_ = "";
|
||||
currentScan_.nextFile_ = 1;
|
||||
currentScan_.hasAllFiles_ = false;
|
||||
newObjects += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return {true, newObjects, totalObjects};
|
||||
}
|
||||
|
||||
std::tuple<bool, size_t, size_t>
|
||||
AwsLevel2ChunksDataProvider::ListObjects(std::chrono::system_clock::time_point)
|
||||
{
|
||||
return {true, 0, 0};
|
||||
}
|
||||
|
||||
std::shared_ptr<wsr88d::NexradFile>
|
||||
AwsLevel2ChunksDataProvider::LoadObjectByKey(const std::string& /*prefix*/)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
bool AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord)
|
||||
{
|
||||
if (!scanRecord.valid_)
|
||||
{
|
||||
logger_->warn("Tried to load scan which was not listed yet");
|
||||
return false;
|
||||
}
|
||||
else if (scanRecord.hasAllFiles_)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request listRequest;
|
||||
listRequest.SetBucket(bucketName_);
|
||||
listRequest.SetPrefix(scanRecord.prefix_);
|
||||
listRequest.SetDelimiter("/");
|
||||
if (!scanRecord.lastKey_.empty())
|
||||
{
|
||||
listRequest.SetStartAfter(scanRecord.lastKey_);
|
||||
}
|
||||
|
||||
auto listOutcome = client_->ListObjectsV2(listRequest);
|
||||
if (!listOutcome.IsSuccess())
|
||||
{
|
||||
logger_->warn("Could not find scan at {}", scanRecord.prefix_);
|
||||
return false;
|
||||
}
|
||||
|
||||
bool hasNew = false;
|
||||
auto& chunks = listOutcome.GetResult().GetContents();
|
||||
logger_->trace("Found {} new chunks.", chunks.size());
|
||||
for (const auto& chunk : chunks)
|
||||
{
|
||||
const std::string& key = chunk.GetKey();
|
||||
|
||||
// KIND/585/20250324-134727-001-S
|
||||
// KIND/5/20250324-134727-001-S
|
||||
static const size_t firstSlash = std::string("KIND/").size();
|
||||
const size_t secondSlash = key.find('/', firstSlash);
|
||||
static const size_t startNumberPosOffset =
|
||||
std::string("/20250324-134727-").size();
|
||||
const size_t startNumberPos = secondSlash + startNumberPosOffset;
|
||||
const std::string& keyNumberStr = key.substr(startNumberPos, 3);
|
||||
const int keyNumber = std::stoi(keyNumberStr);
|
||||
// As far as order goes, only the first one matters. This may cause some
|
||||
// issues if keys come in out of order, but usually they just skip chunks
|
||||
if (scanRecord.nextFile_ == 1 && keyNumber != scanRecord.nextFile_)
|
||||
{
|
||||
logger_->warn("Chunk found that was not in order {} {}",
|
||||
scanRecord.lastKey_,
|
||||
key);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Now we want the ending char
|
||||
// KIND/585/20250324-134727-001-S
|
||||
static const size_t charPos = std::string("/20250324-134727-001-").size();
|
||||
if (secondSlash + charPos >= key.size())
|
||||
{
|
||||
logger_->warn("Chunk key was not long enough");
|
||||
continue;
|
||||
}
|
||||
const char keyChar = key[secondSlash + charPos];
|
||||
|
||||
Aws::S3::Model::GetObjectRequest objectRequest;
|
||||
objectRequest.SetBucket(bucketName_);
|
||||
objectRequest.SetKey(key);
|
||||
|
||||
auto outcome = client_->GetObject(objectRequest);
|
||||
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
logger_->warn("Could not get object: {}",
|
||||
outcome.GetError().GetMessage());
|
||||
return hasNew;
|
||||
}
|
||||
|
||||
auto& body = outcome.GetResultWithOwnership().GetBody();
|
||||
|
||||
switch (keyChar)
|
||||
{
|
||||
case 'S':
|
||||
{ // First chunk
|
||||
scanRecord.nexradFile_ = std::make_shared<wsr88d::Ar2vFile>();
|
||||
if (!scanRecord.nexradFile_->LoadData(body))
|
||||
{
|
||||
logger_->warn("Failed to load first chunk");
|
||||
return hasNew;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'I':
|
||||
{ // Middle chunk
|
||||
if (!scanRecord.nexradFile_->LoadLDMRecords(body))
|
||||
{
|
||||
logger_->warn("Failed to load middle chunk");
|
||||
return hasNew;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'E':
|
||||
{ // Last chunk
|
||||
if (!scanRecord.nexradFile_->LoadLDMRecords(body))
|
||||
{
|
||||
logger_->warn("Failed to load last chunk");
|
||||
return hasNew;
|
||||
}
|
||||
scanRecord.hasAllFiles_ = true;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
logger_->warn("Could not load chunk with unknown char");
|
||||
return hasNew;
|
||||
}
|
||||
hasNew = true;
|
||||
|
||||
const std::chrono::seconds lastModifiedSeconds {
|
||||
outcome.GetResult().GetLastModified().Seconds()};
|
||||
const std::chrono::system_clock::time_point lastModified {
|
||||
lastModifiedSeconds};
|
||||
|
||||
scanRecord.secondLastModified_ = scanRecord.lastModified_;
|
||||
scanRecord.lastModified_ = lastModified;
|
||||
|
||||
scanRecord.nextFile_ = keyNumber + 1;
|
||||
scanRecord.lastKey_ = key;
|
||||
}
|
||||
|
||||
if (scanRecord.nexradFile_ == nullptr)
|
||||
{
|
||||
logger_->warn("Could not load file");
|
||||
}
|
||||
else if (hasNew)
|
||||
{
|
||||
scanRecord.nexradFile_->IndexFile();
|
||||
}
|
||||
|
||||
return hasNew;
|
||||
}
|
||||
|
||||
std::shared_ptr<wsr88d::NexradFile>
|
||||
AwsLevel2ChunksDataProvider::LoadObjectByTime(
|
||||
std::chrono::system_clock::time_point time)
|
||||
{
|
||||
const std::unique_lock lock(p->scansMutex_);
|
||||
static const std::chrono::system_clock::time_point epoch {};
|
||||
|
||||
if (p->currentScan_.valid_ &&
|
||||
(time == epoch || time >= p->currentScan_.time_))
|
||||
{
|
||||
return std::make_shared<wsr88d::Ar2vFile>(p->currentScan_.nexradFile_,
|
||||
p->lastScan_.nexradFile_);
|
||||
}
|
||||
else if (p->lastScan_.valid_ && time >= p->lastScan_.time_)
|
||||
{
|
||||
return p->lastScan_.nexradFile_;
|
||||
}
|
||||
else
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
int AwsLevel2ChunksDataProvider::Impl::GetScanNumber(const std::string& prefix)
|
||||
{
|
||||
// KIND/585/20250324-134727-001-S
|
||||
static const size_t firstSlash = std::string("KIND/").size();
|
||||
const std::string& prefixNumberStr = prefix.substr(firstSlash, 3);
|
||||
return std::stoi(prefixNumberStr);
|
||||
}
|
||||
|
||||
std::pair<size_t, size_t> AwsLevel2ChunksDataProvider::Refresh()
|
||||
{
|
||||
using namespace std::chrono;
|
||||
|
||||
boost::timer::cpu_timer timer {};
|
||||
timer.start();
|
||||
|
||||
const std::unique_lock lock(p->refreshMutex_);
|
||||
const std::unique_lock scanLock(p->scansMutex_);
|
||||
|
||||
auto [success, newObjects, totalObjects] = p->ListObjects();
|
||||
|
||||
auto threadPool = boost::asio::thread_pool(3);
|
||||
bool newCurrent = false;
|
||||
bool newLast = false;
|
||||
if (p->currentScan_.valid_)
|
||||
{
|
||||
boost::asio::post(threadPool,
|
||||
[this, &newCurrent]()
|
||||
{ newCurrent = p->LoadScan(p->currentScan_); });
|
||||
totalObjects += 1;
|
||||
}
|
||||
|
||||
if (p->lastScan_.valid_)
|
||||
{
|
||||
totalObjects += 1;
|
||||
boost::asio::post(
|
||||
threadPool,
|
||||
[this, &newLast]()
|
||||
{
|
||||
if (!p->lastScan_.hasAllFiles_)
|
||||
{
|
||||
// If we have chunks, use chunks
|
||||
if (p->lastScan_.nextFile_ != 1)
|
||||
{
|
||||
newLast = p->LoadScan(p->lastScan_);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto level2DataProvider = p->level2DataProvider_.lock();
|
||||
if (level2DataProvider != nullptr)
|
||||
{
|
||||
level2DataProvider->ListObjects(p->lastScan_.time_);
|
||||
p->lastScan_.nexradFile_ =
|
||||
std::dynamic_pointer_cast<wsr88d::Ar2vFile>(
|
||||
level2DataProvider->LoadObjectByTime(
|
||||
p->lastScan_.time_));
|
||||
if (p->lastScan_.nexradFile_ != nullptr)
|
||||
{
|
||||
p->lastScan_.hasAllFiles_ = true;
|
||||
}
|
||||
}
|
||||
// Fall back to chunks if files did not load
|
||||
newLast = p->lastScan_.nexradFile_ != nullptr ||
|
||||
p->LoadScan(p->lastScan_);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
threadPool.join();
|
||||
if (newCurrent)
|
||||
{
|
||||
newObjects += 1;
|
||||
}
|
||||
if (newLast)
|
||||
{
|
||||
newObjects += 1;
|
||||
}
|
||||
|
||||
timer.stop();
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers) format to 6 digits
|
||||
logger_->debug("Refresh() in {}", timer.format(6, "%ws"));
|
||||
return std::make_pair(newObjects, totalObjects);
|
||||
}
|
||||
|
||||
void AwsLevel2ChunksDataProvider::RequestAvailableProducts() {}
|
||||
std::vector<std::string> AwsLevel2ChunksDataProvider::GetAvailableProducts()
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
AwsLevel2ChunksDataProvider::AwsLevel2ChunksDataProvider(
|
||||
AwsLevel2ChunksDataProvider&&) noexcept = default;
|
||||
AwsLevel2ChunksDataProvider& AwsLevel2ChunksDataProvider::operator=(
|
||||
AwsLevel2ChunksDataProvider&&) noexcept = default;
|
||||
|
||||
std::optional<float> AwsLevel2ChunksDataProvider::GetCurrentElevation()
|
||||
{
|
||||
if (!p->currentScan_.valid_ || p->currentScan_.nexradFile_ == nullptr)
|
||||
{
|
||||
// Does not have any scan elevation.
|
||||
return {};
|
||||
}
|
||||
|
||||
auto vcpData = p->currentScan_.nexradFile_->vcp_data();
|
||||
auto radarData = p->currentScan_.nexradFile_->radar_data();
|
||||
if (radarData.size() == 0)
|
||||
{
|
||||
// Does not have any scan elevation.
|
||||
return {};
|
||||
}
|
||||
|
||||
const auto& lastElevation = radarData.crbegin();
|
||||
const std::shared_ptr<wsr88d::rda::DigitalRadarData> digitalRadarData0 =
|
||||
std::dynamic_pointer_cast<wsr88d::rda::DigitalRadarData>(
|
||||
lastElevation->second->cbegin()->second);
|
||||
|
||||
if (vcpData != nullptr)
|
||||
{
|
||||
return static_cast<float>(vcpData->elevation_angle(lastElevation->first));
|
||||
}
|
||||
else if (digitalRadarData0 != nullptr)
|
||||
{
|
||||
return digitalRadarData0->elevation_angle().value();
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
void AwsLevel2ChunksDataProvider::SetLevel2DataProvider(
|
||||
const std::shared_ptr<AwsLevel2DataProvider>& provider)
|
||||
{
|
||||
p->level2DataProvider_ = provider;
|
||||
}
|
||||
|
||||
} // namespace scwx::provider
|
||||
|
|
@ -170,6 +170,11 @@ std::string AwsNexradDataProvider::FindLatestKey()
|
|||
return key;
|
||||
}
|
||||
|
||||
std::chrono::system_clock::time_point AwsNexradDataProvider::FindLatestTime()
|
||||
{
|
||||
return GetTimePointByKey(FindLatestKey());
|
||||
}
|
||||
|
||||
std::vector<std::chrono::system_clock::time_point>
|
||||
AwsNexradDataProvider::GetTimePointsByDate(
|
||||
std::chrono::system_clock::time_point date)
|
||||
|
|
@ -327,6 +332,20 @@ AwsNexradDataProvider::LoadObjectByKey(const std::string& key)
|
|||
return nexradFile;
|
||||
}
|
||||
|
||||
std::shared_ptr<wsr88d::NexradFile> AwsNexradDataProvider::LoadObjectByTime(
|
||||
std::chrono::system_clock::time_point time)
|
||||
{
|
||||
const std::string key = FindKey(time);
|
||||
if (key.empty())
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
else
|
||||
{
|
||||
return LoadObjectByKey(key);
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<size_t, size_t> AwsNexradDataProvider::Refresh()
|
||||
{
|
||||
using namespace std::chrono;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
#include <scwx/provider/nexrad_data_provider_factory.hpp>
|
||||
#include <scwx/provider/aws_level2_data_provider.hpp>
|
||||
#include <scwx/provider/aws_level2_chunks_data_provider.hpp>
|
||||
#include <scwx/provider/aws_level3_data_provider.hpp>
|
||||
|
||||
namespace scwx
|
||||
|
|
@ -17,6 +18,13 @@ NexradDataProviderFactory::CreateLevel2DataProvider(
|
|||
return std::make_unique<AwsLevel2DataProvider>(radarSite);
|
||||
}
|
||||
|
||||
std::shared_ptr<NexradDataProvider>
|
||||
NexradDataProviderFactory::CreateLevel2ChunksDataProvider(
|
||||
const std::string& radarSite)
|
||||
{
|
||||
return std::make_unique<AwsLevel2ChunksDataProvider>(radarSite);
|
||||
}
|
||||
|
||||
std::shared_ptr<NexradDataProvider>
|
||||
NexradDataProviderFactory::CreateLevel3DataProvider(
|
||||
const std::string& radarSite, const std::string& product)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
#include <scwx/util/logger.hpp>
|
||||
#include <scwx/util/rangebuf.hpp>
|
||||
#include <scwx/util/time.hpp>
|
||||
#include <scwx/common/geographic.hpp>
|
||||
|
||||
#include <fstream>
|
||||
#include <sstream>
|
||||
|
|
@ -137,7 +138,7 @@ Ar2vFile::GetElevationScan(rda::DataBlockType dataBlockType,
|
|||
float elevation,
|
||||
std::chrono::system_clock::time_point time) const
|
||||
{
|
||||
logger_->debug("GetElevationScan: {} degrees", elevation);
|
||||
logger_->trace("GetElevationScan: {} degrees", elevation);
|
||||
|
||||
std::shared_ptr<rda::ElevationScan> elevationScan = nullptr;
|
||||
float elevationCut = 0.0f;
|
||||
|
|
@ -272,7 +273,7 @@ bool Ar2vFile::LoadData(std::istream& is)
|
|||
|
||||
std::size_t Ar2vFileImpl::DecompressLDMRecords(std::istream& is)
|
||||
{
|
||||
logger_->debug("Decompressing LDM Records");
|
||||
logger_->trace("Decompressing LDM Records");
|
||||
|
||||
std::size_t numRecords = 0;
|
||||
|
||||
|
|
@ -320,14 +321,14 @@ std::size_t Ar2vFileImpl::DecompressLDMRecords(std::istream& is)
|
|||
++numRecords;
|
||||
}
|
||||
|
||||
logger_->debug("Decompressed {} LDM Records", numRecords);
|
||||
logger_->trace("Decompressed {} LDM Records", numRecords);
|
||||
|
||||
return numRecords;
|
||||
}
|
||||
|
||||
void Ar2vFileImpl::ParseLDMRecords()
|
||||
{
|
||||
logger_->debug("Parsing LDM Records");
|
||||
logger_->trace("Parsing LDM Records");
|
||||
|
||||
std::size_t count = 0;
|
||||
|
||||
|
|
@ -444,13 +445,11 @@ void Ar2vFileImpl::ProcessRadarData(
|
|||
|
||||
void Ar2vFileImpl::IndexFile()
|
||||
{
|
||||
logger_->debug("Indexing file");
|
||||
|
||||
constexpr float scaleFactor = 8.0f / 0.043945f;
|
||||
logger_->trace("Indexing file");
|
||||
|
||||
for (auto& elevationCut : radarData_)
|
||||
{
|
||||
std::uint16_t elevationAngle {};
|
||||
float elevationAngle {};
|
||||
rda::WaveformType waveformType = rda::WaveformType::Unknown;
|
||||
|
||||
std::shared_ptr<rda::GenericRadarData>& radial0 =
|
||||
|
|
@ -466,14 +465,15 @@ void Ar2vFileImpl::IndexFile()
|
|||
|
||||
if (vcpData_ != nullptr)
|
||||
{
|
||||
elevationAngle = vcpData_->elevation_angle_raw(elevationCut.first);
|
||||
elevationAngle =
|
||||
static_cast<float>(vcpData_->elevation_angle(elevationCut.first));
|
||||
waveformType = vcpData_->waveform_type(elevationCut.first);
|
||||
}
|
||||
else if ((digitalRadarData0 =
|
||||
std::dynamic_pointer_cast<rda::DigitalRadarData>(radial0)) !=
|
||||
nullptr)
|
||||
{
|
||||
elevationAngle = digitalRadarData0->elevation_angle_raw();
|
||||
elevationAngle = digitalRadarData0->elevation_angle().value();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
@ -501,19 +501,236 @@ void Ar2vFileImpl::IndexFile()
|
|||
auto time = util::TimePoint(radial0->modified_julian_date(),
|
||||
radial0->collection_time());
|
||||
|
||||
// NOLINTNEXTLINE This conversion is accurate
|
||||
float elevationAngleConverted = elevationAngle / scaleFactor;
|
||||
// Any elevation above 90 degrees should be interpreted as a
|
||||
// negative angle
|
||||
// NOLINTBEGIN(cppcoreguidelines-avoid-magic-numbers)
|
||||
if (elevationAngleConverted > 90)
|
||||
{
|
||||
elevationAngleConverted -= 360;
|
||||
index_[dataBlockType][elevationAngle][time] = elevationCut.second;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// NOLINTEND(cppcoreguidelines-avoid-magic-numbers)
|
||||
|
||||
index_[dataBlockType][elevationAngleConverted][time] =
|
||||
elevationCut.second;
|
||||
bool Ar2vFile::LoadLDMRecords(std::istream& is)
|
||||
{
|
||||
const size_t decompressedRecords = p->DecompressLDMRecords(is);
|
||||
if (decompressedRecords == 0)
|
||||
{
|
||||
p->ParseLDMRecord(is);
|
||||
}
|
||||
else
|
||||
{
|
||||
p->ParseLDMRecords();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Ar2vFile::IndexFile()
|
||||
{
|
||||
p->IndexFile();
|
||||
return true;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE
|
||||
bool IsRadarDataIncomplete(
|
||||
const std::shared_ptr<const rda::ElevationScan>& radarData)
|
||||
{
|
||||
// Assume the data is incomplete when the delta between the first and last
|
||||
// angles is greater than 2.5 degrees.
|
||||
constexpr units::degrees<float> kIncompleteDataAngleThreshold_ {2.5};
|
||||
|
||||
const units::degrees<float> firstAngle =
|
||||
radarData->cbegin()->second->azimuth_angle();
|
||||
const units::degrees<float> lastAngle =
|
||||
radarData->crbegin()->second->azimuth_angle();
|
||||
const units::degrees<float> angleDelta =
|
||||
common::GetAngleDelta(firstAngle, lastAngle);
|
||||
|
||||
return angleDelta > kIncompleteDataAngleThreshold_;
|
||||
}
|
||||
|
||||
Ar2vFile::Ar2vFile(const std::shared_ptr<Ar2vFile>& current,
|
||||
const std::shared_ptr<Ar2vFile>& last) :
|
||||
Ar2vFile()
|
||||
{
|
||||
// This is only used to index right now, so not a huge deal
|
||||
p->vcpData_ = nullptr;
|
||||
|
||||
// Reconstruct index from the other's indexes
|
||||
if (current != nullptr)
|
||||
{
|
||||
for (const auto& type : current->p->index_)
|
||||
{
|
||||
for (const auto& elevation : type.second)
|
||||
{
|
||||
// Get the most recent scan
|
||||
const auto& mostRecent = elevation.second.crbegin();
|
||||
if (mostRecent == elevation.second.crend())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// Add previous scans for stepping back in time
|
||||
for (auto scan = ++(elevation.second.rbegin());
|
||||
scan != elevation.second.rend();
|
||||
++scan)
|
||||
{
|
||||
p->index_[type.first][elevation.first][scan->first] =
|
||||
scan->second;
|
||||
}
|
||||
|
||||
// Merge this scan with the last one if it is incomplete
|
||||
if (IsRadarDataIncomplete(mostRecent->second))
|
||||
{
|
||||
std::shared_ptr<rda::ElevationScan> secondMostRecent = nullptr;
|
||||
|
||||
// check if this volume scan has an earlier elevation scan
|
||||
auto possibleSecondMostRecent = elevation.second.rbegin();
|
||||
++possibleSecondMostRecent;
|
||||
|
||||
if (possibleSecondMostRecent == elevation.second.rend())
|
||||
{
|
||||
if (last == nullptr)
|
||||
{
|
||||
// Nothing to merge with
|
||||
p->index_[type.first][elevation.first][mostRecent->first] =
|
||||
mostRecent->second;
|
||||
continue;
|
||||
}
|
||||
|
||||
// get the scan from the last scan
|
||||
auto elevationScan =
|
||||
std::get<std::shared_ptr<rda::ElevationScan>>(
|
||||
last->GetElevationScan(
|
||||
type.first, elevation.first, {}));
|
||||
if (elevationScan == nullptr)
|
||||
{
|
||||
// Nothing to merge with
|
||||
p->index_[type.first][elevation.first][mostRecent->first] =
|
||||
mostRecent->second;
|
||||
continue;
|
||||
}
|
||||
|
||||
secondMostRecent = elevationScan;
|
||||
}
|
||||
else
|
||||
{
|
||||
secondMostRecent = possibleSecondMostRecent->second;
|
||||
}
|
||||
|
||||
// Make the new scan
|
||||
auto newScan = std::make_shared<rda::ElevationScan>();
|
||||
|
||||
// Copy over the new radials
|
||||
for (const auto& radial : *(mostRecent->second))
|
||||
{
|
||||
(*newScan)[radial.first] = radial.second;
|
||||
}
|
||||
|
||||
/* Correctly order the old radials. The radials need to be in
|
||||
* order for the rendering to work, and the index needs to start
|
||||
* at 0 and increase by one from there. Since the new radial
|
||||
* should have index 0, the old radial needs to be reshaped to
|
||||
* match the new radials indexing.
|
||||
*/
|
||||
|
||||
const double lowestAzm =
|
||||
mostRecent->second->cbegin()->second->azimuth_angle().value();
|
||||
const double heighestAzm = mostRecent->second->crbegin()
|
||||
->second->azimuth_angle()
|
||||
.value();
|
||||
std::uint16_t index = mostRecent->second->crbegin()->first + 1;
|
||||
|
||||
// Sort by the azimuth. Makes the rest of this way easier
|
||||
auto secondMostRecentAzmMap =
|
||||
std::map<float, std::shared_ptr<rda::GenericRadarData>>();
|
||||
for (const auto& radial : *secondMostRecent)
|
||||
{
|
||||
secondMostRecentAzmMap[radial.second->azimuth_angle()
|
||||
.value()] = radial.second;
|
||||
}
|
||||
|
||||
if (lowestAzm <= heighestAzm) // New scan does not contain 0/360
|
||||
{
|
||||
// Get the radials following the new radials
|
||||
for (const auto& radial : secondMostRecentAzmMap)
|
||||
{
|
||||
if (radial.first > heighestAzm)
|
||||
{
|
||||
(*newScan)[index] = radial.second;
|
||||
++index;
|
||||
}
|
||||
}
|
||||
// Get the radials before the new radials
|
||||
for (const auto& radial : secondMostRecentAzmMap)
|
||||
{
|
||||
if (radial.first < lowestAzm)
|
||||
{
|
||||
(*newScan)[index] = radial.second;
|
||||
++index;
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
else // New scan includes 0/360
|
||||
{
|
||||
// The radials will already be in the right order
|
||||
for (const auto& radial : secondMostRecentAzmMap)
|
||||
{
|
||||
if (radial.first > heighestAzm && radial.first < lowestAzm)
|
||||
{
|
||||
(*newScan)[index] = radial.second;
|
||||
++index;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
p->index_[type.first][elevation.first][mostRecent->first] =
|
||||
newScan;
|
||||
}
|
||||
else
|
||||
{
|
||||
p->index_[type.first][elevation.first][mostRecent->first] =
|
||||
mostRecent->second;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Go though last, adding other elevations
|
||||
if (last != nullptr)
|
||||
{
|
||||
for (const auto& type : last->p->index_)
|
||||
{
|
||||
// Find the highest elevation this type has for the current scan
|
||||
// Start below any reasonable elevation
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers)
|
||||
float highestCurrentElevation = -90;
|
||||
const auto& elevationScans = p->index_.find(type.first);
|
||||
if (elevationScans != p->index_.cend())
|
||||
{
|
||||
const auto& highestElevation = elevationScans->second.crbegin();
|
||||
if (highestElevation != elevationScans->second.crend())
|
||||
{
|
||||
// Add a slight offset to ensure good floating point compare.
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers)
|
||||
highestCurrentElevation = highestElevation->first + 0.01f;
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& elevation : type.second)
|
||||
{
|
||||
// Only add elevations above the current scan's elevation
|
||||
if (elevation.first > highestCurrentElevation)
|
||||
{
|
||||
const auto& mostRecent = elevation.second.crbegin();
|
||||
if (mostRecent == elevation.second.crend())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
p->index_[type.first][elevation.first][mostRecent->first] =
|
||||
mostRecent->second;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -154,7 +154,18 @@ std::uint16_t DigitalRadarData::elevation_angle_raw() const
|
|||
|
||||
units::degrees<float> DigitalRadarData::elevation_angle() const
|
||||
{
|
||||
return units::degrees<float> {p->elevationAngle_ * kAngleDataScale};
|
||||
// NOLINTNEXTLINE This conversion is accurate
|
||||
float elevationAngleConverted = p->elevationAngle_ * kAngleDataScale;
|
||||
// Any elevation above 90 degrees should be interpreted as a
|
||||
// negative angle
|
||||
// NOLINTBEGIN(cppcoreguidelines-avoid-magic-numbers)
|
||||
if (elevationAngleConverted > 90)
|
||||
{
|
||||
elevationAngleConverted -= 360;
|
||||
}
|
||||
// NOLINTEND(cppcoreguidelines-avoid-magic-numbers)
|
||||
|
||||
return units::degrees<float> {elevationAngleConverted};
|
||||
}
|
||||
|
||||
std::uint16_t DigitalRadarData::elevation_number() const
|
||||
|
|
|
|||
|
|
@ -220,7 +220,19 @@ uint16_t VolumeCoveragePatternData::number_of_base_tilts() const
|
|||
|
||||
double VolumeCoveragePatternData::elevation_angle(uint16_t e) const
|
||||
{
|
||||
return p->elevationCuts_[e].elevationAngle_ * ANGLE_DATA_SCALE;
|
||||
|
||||
double elevationAngleConverted =
|
||||
p->elevationCuts_[e].elevationAngle_ * ANGLE_DATA_SCALE;
|
||||
// Any elevation above 90 degrees should be interpreted as a
|
||||
// negative angle
|
||||
// NOLINTBEGIN(cppcoreguidelines-avoid-magic-numbers)
|
||||
if (elevationAngleConverted > 90)
|
||||
{
|
||||
elevationAngleConverted -= 360;
|
||||
}
|
||||
// NOLINTEND(cppcoreguidelines-avoid-magic-numbers)
|
||||
|
||||
return elevationAngleConverted;
|
||||
}
|
||||
|
||||
uint16_t VolumeCoveragePatternData::elevation_angle_raw(uint16_t e) const
|
||||
|
|
|
|||
|
|
@ -60,6 +60,7 @@ set(HDR_NETWORK include/scwx/network/cpr.hpp
|
|||
set(SRC_NETWORK source/scwx/network/cpr.cpp
|
||||
source/scwx/network/dir_list.cpp)
|
||||
set(HDR_PROVIDER include/scwx/provider/aws_level2_data_provider.hpp
|
||||
include/scwx/provider/aws_level2_chunks_data_provider.hpp
|
||||
include/scwx/provider/aws_level3_data_provider.hpp
|
||||
include/scwx/provider/aws_nexrad_data_provider.hpp
|
||||
include/scwx/provider/iem_api_provider.hpp
|
||||
|
|
@ -68,6 +69,7 @@ set(HDR_PROVIDER include/scwx/provider/aws_level2_data_provider.hpp
|
|||
include/scwx/provider/nexrad_data_provider_factory.hpp
|
||||
include/scwx/provider/warnings_provider.hpp)
|
||||
set(SRC_PROVIDER source/scwx/provider/aws_level2_data_provider.cpp
|
||||
source/scwx/provider/aws_level2_chunks_data_provider.cpp
|
||||
source/scwx/provider/aws_level3_data_provider.cpp
|
||||
source/scwx/provider/aws_nexrad_data_provider.cpp
|
||||
source/scwx/provider/iem_api_provider.cpp
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue