Get level2 chunks and archive working together, reduce logging of level2 chunks

This commit is contained in:
AdenKoperczak 2025-04-07 18:23:29 -04:00
parent 094d286b41
commit 63585af26d
No known key found for this signature in database
GPG key ID: 9843017036F62EE7
4 changed files with 43 additions and 25 deletions

View file

@ -68,6 +68,7 @@ static constexpr std::size_t kTimerPlaces_ {6u};
static constexpr std::chrono::seconds kFastRetryInterval_ {15}; static constexpr std::chrono::seconds kFastRetryInterval_ {15};
static constexpr std::chrono::seconds kFastRetryIntervalChunks_ {3}; static constexpr std::chrono::seconds kFastRetryIntervalChunks_ {3};
static constexpr std::chrono::seconds kSlowRetryInterval_ {120}; static constexpr std::chrono::seconds kSlowRetryInterval_ {120};
static constexpr std::chrono::seconds kSlowRetryIntervalChunks_ {20};
static std::unordered_map<std::string, std::weak_ptr<RadarProductManager>> static std::unordered_map<std::string, std::weak_ptr<RadarProductManager>>
instanceMap_; instanceMap_;
@ -774,11 +775,15 @@ void RadarProductManagerImpl::RefreshDataSync(
auto [newObjects, totalObjects] = providerManager->provider_->Refresh(); auto [newObjects, totalObjects] = providerManager->provider_->Refresh();
// Level2 chunked data is updated quickly and uses a fater interval // Level2 chunked data is updated quickly and uses a faster interval
const std::chrono::milliseconds fastRetryInterval = const std::chrono::milliseconds fastRetryInterval =
providerManager->group_ == common::RadarProductGroup::Level2 ? providerManager == level2ChunksProviderManager_ ?
kFastRetryIntervalChunks_ : kFastRetryIntervalChunks_ :
kFastRetryInterval_; kFastRetryInterval_;
const std::chrono::milliseconds slowRetryInterval =
providerManager == level2ChunksProviderManager_ ?
kSlowRetryIntervalChunks_ :
kSlowRetryInterval_;
std::chrono::milliseconds interval = fastRetryInterval; std::chrono::milliseconds interval = fastRetryInterval;
if (totalObjects > 0) if (totalObjects > 0)
@ -798,7 +803,7 @@ void RadarProductManagerImpl::RefreshDataSync(
{ {
// If it has been at least 5 update periods since the file has // If it has been at least 5 update periods since the file has
// been last modified, slow the retry period // been last modified, slow the retry period
interval = kSlowRetryInterval_; interval = slowRetryInterval;
} }
else if (interval < std::chrono::milliseconds {fastRetryInterval}) else if (interval < std::chrono::milliseconds {fastRetryInterval})
{ {
@ -817,7 +822,7 @@ void RadarProductManagerImpl::RefreshDataSync(
logger_->info("[{}] No data found", providerManager->name()); logger_->info("[{}] No data found", providerManager->name());
// If no data is found, retry at the slow retry interval // If no data is found, retry at the slow retry interval
interval = kSlowRetryInterval_; interval = slowRetryInterval;
} }
std::unique_lock const lock(providerManager->refreshTimerMutex_); std::unique_lock const lock(providerManager->refreshTimerMutex_);
@ -953,11 +958,13 @@ void RadarProductManagerImpl::LoadProviderData(
{ {
existingRecord = it->second.lock(); existingRecord = it->second.lock();
/*
if (existingRecord != nullptr) if (existingRecord != nullptr)
{ {
logger_->debug( logger_->debug(
"Data previously loaded, loading from data cache"); "Data previously loaded, loading from data cache");
} }
*/
} }
} }
@ -1416,7 +1423,7 @@ std::shared_ptr<types::RadarProductRecord>
RadarProductManagerImpl::StoreRadarProductRecord( RadarProductManagerImpl::StoreRadarProductRecord(
std::shared_ptr<types::RadarProductRecord> record) std::shared_ptr<types::RadarProductRecord> record)
{ {
logger_->debug("StoreRadarProductRecord()"); //logger_->debug("StoreRadarProductRecord()");
std::shared_ptr<types::RadarProductRecord> storedRecord = nullptr; std::shared_ptr<types::RadarProductRecord> storedRecord = nullptr;
@ -1433,11 +1440,12 @@ RadarProductManagerImpl::StoreRadarProductRecord(
{ {
storedRecord = it->second.lock(); storedRecord = it->second.lock();
/*
if (storedRecord != nullptr) if (storedRecord != nullptr)
{ {
logger_->debug( logger_->debug(
"Level 2 product previously loaded, loading from cache"); "Level 2 product previously loaded, loading from cache");
} }*/
} }
if (storedRecord == nullptr) if (storedRecord == nullptr)
@ -1520,15 +1528,23 @@ RadarProductManager::GetLevel2Data(wsr88d::rda::DataBlockType dataBlockType,
std::vector<float> elevationCuts {}; std::vector<float> elevationCuts {};
std::chrono::system_clock::time_point foundTime {}; std::chrono::system_clock::time_point foundTime {};
//TODO decide when to use chunked vs archived data. // See if we have this one in the chunk provider.
if constexpr (true) auto chunkFile = std::dynamic_pointer_cast<wsr88d::Ar2vFile>(
p->level2ChunksProviderManager_->provider_->LoadObjectByTime(time));
if (chunkFile != nullptr)
{ {
auto currentFile = std::dynamic_pointer_cast<wsr88d::Ar2vFile>(
p->level2ChunksProviderManager_->provider_->LoadLatestObject());
std::tie(radarData, elevationCut, elevationCuts) = std::tie(radarData, elevationCut, elevationCuts) =
currentFile->GetElevationScan(dataBlockType, elevation, time); chunkFile->GetElevationScan(dataBlockType, elevation, time);
if (radarData != nullptr)
{
auto& radarData0 = (*radarData)[0];
foundTime = std::chrono::floor<std::chrono::seconds>(
scwx::util::TimePoint(radarData0->modified_julian_date(),
radarData0->collection_time()));
}
} }
else else // It is not in the chunk provider, so get it from the archive
{ {
auto records = p->GetLevel2ProductRecords(time); auto records = p->GetLevel2ProductRecords(time);
for (auto& recordPair : records) for (auto& recordPair : records)

View file

@ -159,8 +159,6 @@ void RadarProductLayer::Initialize()
void RadarProductLayer::UpdateSweep() void RadarProductLayer::UpdateSweep()
{ {
logger_->debug("UpdateSweep()");
gl::OpenGLFunctions& gl = context()->gl(); gl::OpenGLFunctions& gl = context()->gl();
boost::timer::cpu_timer timer; boost::timer::cpu_timer timer;
@ -172,9 +170,10 @@ void RadarProductLayer::UpdateSweep()
std::try_to_lock); std::try_to_lock);
if (!sweepLock.owns_lock()) if (!sweepLock.owns_lock())
{ {
logger_->debug("Sweep locked, deferring update"); //logger_->debug("Sweep locked, deferring update");
return; return;
} }
logger_->debug("UpdateSweep()");
p->sweepNeedsUpdate_ = false; p->sweepNeedsUpdate_ = false;

View file

@ -618,10 +618,13 @@ AwsLevel2ChunksDataProvider::LoadObjectByTime(
std::chrono::system_clock::time_point time) std::chrono::system_clock::time_point time)
{ {
std::unique_lock lock(p->scansMutex_); std::unique_lock lock(p->scansMutex_);
static const std::chrono::system_clock::time_point epoch {};
if (p->currentScan_.valid_ && time >= p->currentScan_.time_) if (p->currentScan_.valid_ &&
(time == epoch || time >= p->currentScan_.time_))
{ {
return p->currentScan_.nexradFile_; return std::make_shared<wsr88d::Ar2vFile>(p->currentScan_.nexradFile_,
p->lastScan_.nexradFile_);
} }
else if (p->lastScan_.valid_ && time >= p->lastScan_.time_) else if (p->lastScan_.valid_ && time >= p->lastScan_.time_)
{ {
@ -629,7 +632,6 @@ AwsLevel2ChunksDataProvider::LoadObjectByTime(
} }
else else
{ {
logger_->warn("Could not find scan with time");
return nullptr; return nullptr;
} }
} }
@ -637,6 +639,7 @@ AwsLevel2ChunksDataProvider::LoadObjectByTime(
std::shared_ptr<wsr88d::NexradFile> std::shared_ptr<wsr88d::NexradFile>
AwsLevel2ChunksDataProvider::LoadLatestObject() AwsLevel2ChunksDataProvider::LoadLatestObject()
{ {
std::unique_lock lock(p->scansMutex_);
return std::make_shared<wsr88d::Ar2vFile>(p->currentScan_.nexradFile_, return std::make_shared<wsr88d::Ar2vFile>(p->currentScan_.nexradFile_,
p->lastScan_.nexradFile_); p->lastScan_.nexradFile_);
//return p->currentScan_.nexradFile_; //return p->currentScan_.nexradFile_;

View file

@ -138,7 +138,7 @@ Ar2vFile::GetElevationScan(rda::DataBlockType dataBlockType,
float elevation, float elevation,
std::chrono::system_clock::time_point time) const std::chrono::system_clock::time_point time) const
{ {
logger_->debug("GetElevationScan: {} degrees", elevation); //logger_->debug("GetElevationScan: {} degrees", elevation);
std::shared_ptr<rda::ElevationScan> elevationScan = nullptr; std::shared_ptr<rda::ElevationScan> elevationScan = nullptr;
float elevationCut = 0.0f; float elevationCut = 0.0f;
@ -273,7 +273,7 @@ bool Ar2vFile::LoadData(std::istream& is)
std::size_t Ar2vFileImpl::DecompressLDMRecords(std::istream& is) std::size_t Ar2vFileImpl::DecompressLDMRecords(std::istream& is)
{ {
logger_->debug("Decompressing LDM Records"); //logger_->debug("Decompressing LDM Records");
std::size_t numRecords = 0; std::size_t numRecords = 0;
@ -321,22 +321,22 @@ std::size_t Ar2vFileImpl::DecompressLDMRecords(std::istream& is)
++numRecords; ++numRecords;
} }
logger_->debug("Decompressed {} LDM Records", numRecords); //logger_->debug("Decompressed {} LDM Records", numRecords);
return numRecords; return numRecords;
} }
void Ar2vFileImpl::ParseLDMRecords() void Ar2vFileImpl::ParseLDMRecords()
{ {
logger_->debug("Parsing LDM Records"); //logger_->debug("Parsing LDM Records");
std::size_t count = 0; //std::size_t count = 0;
for (auto it = rawRecords_.begin(); it != rawRecords_.end(); it++) for (auto it = rawRecords_.begin(); it != rawRecords_.end(); it++)
{ {
std::stringstream& ss = *it; std::stringstream& ss = *it;
logger_->trace("Record {}", count++); //logger_->trace("Record {}", count++);
ParseLDMRecord(ss); ParseLDMRecord(ss);
} }
@ -445,7 +445,7 @@ void Ar2vFileImpl::ProcessRadarData(
void Ar2vFileImpl::IndexFile() void Ar2vFileImpl::IndexFile()
{ {
logger_->debug("Indexing file"); //logger_->debug("Indexing file");
constexpr float scaleFactor = 8.0f / 0.043945f; constexpr float scaleFactor = 8.0f / 0.043945f;