Merge pull request #297 from dpaulat/feature/level2-sweeps

Intermediate Level 2 Sweeps
This commit is contained in:
Dan Paulat 2024-11-28 06:51:04 -06:00 committed by GitHub
commit dbfacdfd28
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 248 additions and 221 deletions

View file

@ -91,13 +91,7 @@ public:
const std::string& radarId,
common::RadarProductGroup group,
const std::string& product) :
radarId_ {radarId},
group_ {group},
product_ {product},
refreshEnabled_ {false},
refreshTimer_ {threadPool_},
refreshTimerMutex_ {},
provider_ {nullptr}
radarId_ {radarId}, group_ {group}, product_ {product}
{
connect(this,
&ProviderManager::NewDataAvailable,
@ -115,10 +109,10 @@ public:
const std::string radarId_;
const common::RadarProductGroup group_;
const std::string product_;
bool refreshEnabled_;
boost::asio::steady_timer refreshTimer_;
std::mutex refreshTimerMutex_;
std::shared_ptr<provider::NexradDataProvider> provider_;
bool refreshEnabled_ {false};
boost::asio::steady_timer refreshTimer_ {threadPool_};
std::mutex refreshTimerMutex_ {};
std::shared_ptr<provider::NexradDataProvider> provider_ {nullptr};
signals:
void NewDataAvailable(common::RadarProductGroup group,
@ -136,24 +130,8 @@ public:
initialized_ {false},
level3ProductsInitialized_ {false},
radarSite_ {config::RadarSite::Get(radarId)},
coordinates0_5Degree_ {},
coordinates1Degree_ {},
level2ProductRecords_ {},
level2ProductRecentRecords_ {},
level3ProductRecordsMap_ {},
level3ProductRecentRecordsMap_ {},
level2ProductRecordMutex_ {},
level3ProductRecordMutex_ {},
level2ProviderManager_ {std::make_shared<ProviderManager>(
self_, radarId_, common::RadarProductGroup::Level2)},
level3ProviderManagerMap_ {},
level3ProviderManagerMutex_ {},
initializeMutex_ {},
level3ProductsInitializeMutex_ {},
loadLevel2DataMutex_ {},
loadLevel3DataMutex_ {},
availableCategoryMap_ {},
availableCategoryMutex_ {}
self_, radarId_, common::RadarProductGroup::Level2)}
{
if (radarSite_ == nullptr)
{
@ -198,9 +176,9 @@ public:
void RefreshData(std::shared_ptr<ProviderManager> providerManager);
void RefreshDataSync(std::shared_ptr<ProviderManager> providerManager);
std::tuple<std::shared_ptr<types::RadarProductRecord>,
std::chrono::system_clock::time_point>
GetLevel2ProductRecord(std::chrono::system_clock::time_point time);
std::map<std::chrono::system_clock::time_point,
std::shared_ptr<types::RadarProductRecord>>
GetLevel2ProductRecords(std::chrono::system_clock::time_point time);
std::tuple<std::shared_ptr<types::RadarProductRecord>,
std::chrono::system_clock::time_point>
GetLevel3ProductRecord(const std::string& product,
@ -247,30 +225,30 @@ public:
std::shared_ptr<config::RadarSite> radarSite_;
std::size_t cacheLimit_ {6u};
std::vector<float> coordinates0_5Degree_;
std::vector<float> coordinates1Degree_;
std::vector<float> coordinates0_5Degree_ {};
std::vector<float> coordinates1Degree_ {};
RadarProductRecordMap level2ProductRecords_;
RadarProductRecordList level2ProductRecentRecords_;
RadarProductRecordMap level2ProductRecords_ {};
RadarProductRecordList level2ProductRecentRecords_ {};
std::unordered_map<std::string, RadarProductRecordMap>
level3ProductRecordsMap_;
level3ProductRecordsMap_ {};
std::unordered_map<std::string, RadarProductRecordList>
level3ProductRecentRecordsMap_;
std::shared_mutex level2ProductRecordMutex_;
std::shared_mutex level3ProductRecordMutex_;
level3ProductRecentRecordsMap_ {};
std::shared_mutex level2ProductRecordMutex_ {};
std::shared_mutex level3ProductRecordMutex_ {};
std::shared_ptr<ProviderManager> level2ProviderManager_;
std::unordered_map<std::string, std::shared_ptr<ProviderManager>>
level3ProviderManagerMap_;
std::shared_mutex level3ProviderManagerMutex_;
level3ProviderManagerMap_ {};
std::shared_mutex level3ProviderManagerMutex_ {};
std::mutex initializeMutex_;
std::mutex level3ProductsInitializeMutex_;
std::mutex loadLevel2DataMutex_;
std::mutex loadLevel3DataMutex_;
std::mutex initializeMutex_ {};
std::mutex level3ProductsInitializeMutex_ {};
std::mutex loadLevel2DataMutex_ {};
std::mutex loadLevel3DataMutex_ {};
common::Level3ProductCategoryMap availableCategoryMap_;
std::shared_mutex availableCategoryMutex_;
common::Level3ProductCategoryMap availableCategoryMap_ {};
std::shared_mutex availableCategoryMutex_ {};
std::unordered_map<boost::uuids::uuid,
std::shared_ptr<ProviderManager>,
@ -1173,60 +1151,91 @@ void RadarProductManagerImpl::PopulateProductTimes(
});
}
std::tuple<std::shared_ptr<types::RadarProductRecord>,
std::chrono::system_clock::time_point>
RadarProductManagerImpl::GetLevel2ProductRecord(
std::map<std::chrono::system_clock::time_point,
std::shared_ptr<types::RadarProductRecord>>
RadarProductManagerImpl::GetLevel2ProductRecords(
std::chrono::system_clock::time_point time)
{
std::shared_ptr<types::RadarProductRecord> record {nullptr};
RadarProductRecordMap::const_pointer recordPtr {nullptr};
std::chrono::system_clock::time_point recordTime {time};
std::map<std::chrono::system_clock::time_point,
std::shared_ptr<types::RadarProductRecord>>
records {};
std::vector<RadarProductRecordMap::const_pointer> recordPtrs {};
// Ensure Level 2 product records are updated
PopulateLevel2ProductTimes(time);
if (!level2ProductRecords_.empty() &&
time == std::chrono::system_clock::time_point {})
{
// If a default-initialized time point is given, return the latest record
recordPtr = &(*level2ProductRecords_.rbegin());
}
else
{
recordPtr =
scwx::util::GetBoundedElementPointer(level2ProductRecords_, time);
}
std::shared_lock lock {level2ProductRecordMutex_};
if (recordPtr != nullptr)
{
// Don't check for an exact time match for level 2 products
recordTime = recordPtr->first;
record = recordPtr->second.lock();
}
if (!level2ProductRecords_.empty() &&
time == std::chrono::system_clock::time_point {})
{
// If a default-initialized time point is given, return the latest
// record
recordPtrs.push_back(&(*level2ProductRecords_.rbegin()));
}
else
{
// Get the requested record
auto recordIt =
scwx::util::GetBoundedElementIterator(level2ProductRecords_, time);
if (recordPtr != nullptr && record == nullptr &&
recordTime != std::chrono::system_clock::time_point {})
{
// Product is expired, reload it
std::shared_ptr<request::NexradFileRequest> request =
std::make_shared<request::NexradFileRequest>(radarId_);
QObject::connect(
request.get(),
&request::NexradFileRequest::RequestComplete,
self_,
[this](std::shared_ptr<request::NexradFileRequest> request)
if (recordIt != level2ProductRecords_.cend())
{
if (request->radar_product_record() != nullptr)
{
Q_EMIT self_->DataReloaded(request->radar_product_record());
}
});
recordPtrs.push_back(&(*(recordIt)));
self_->LoadLevel2Data(recordTime, request);
// The requested time may be in the previous record, so get that too
if (recordIt != level2ProductRecords_.cbegin())
{
recordPtrs.push_back(&(*(--recordIt)));
}
}
}
}
return {record, recordTime};
// For each record pointer
for (auto& recordPtr : recordPtrs)
{
std::shared_ptr<types::RadarProductRecord> record {nullptr};
std::chrono::system_clock::time_point recordTime {time};
if (recordPtr != nullptr)
{
// Don't check for an exact time match for level 2 products
recordTime = recordPtr->first;
record = recordPtr->second.lock();
}
if (recordPtr != nullptr && record == nullptr &&
recordTime != std::chrono::system_clock::time_point {})
{
// Product is expired, reload it
std::shared_ptr<request::NexradFileRequest> request =
std::make_shared<request::NexradFileRequest>(radarId_);
QObject::connect(
request.get(),
&request::NexradFileRequest::RequestComplete,
self_,
[this](std::shared_ptr<request::NexradFileRequest> request)
{
if (request->radar_product_record() != nullptr)
{
Q_EMIT self_->DataReloaded(request->radar_product_record());
}
});
self_->LoadLevel2Data(recordTime, request);
}
if (record != nullptr)
{
// Return valid records
records.insert_or_assign(recordTime, record);
}
}
return records;
}
std::tuple<std::shared_ptr<types::RadarProductRecord>,
@ -1399,19 +1408,46 @@ RadarProductManager::GetLevel2Data(wsr88d::rda::DataBlockType dataBlockType,
{
std::shared_ptr<wsr88d::rda::ElevationScan> radarData = nullptr;
float elevationCut = 0.0f;
std::vector<float> elevationCuts;
std::vector<float> elevationCuts {};
std::chrono::system_clock::time_point foundTime {};
std::shared_ptr<types::RadarProductRecord> record;
std::tie(record, time) = p->GetLevel2ProductRecord(time);
auto records = p->GetLevel2ProductRecords(time);
if (record != nullptr)
for (auto& recordPair : records)
{
std::tie(radarData, elevationCut, elevationCuts) =
record->level2_file()->GetElevationScan(
dataBlockType, elevation, time);
auto& record = recordPair.second;
if (record != nullptr)
{
std::shared_ptr<wsr88d::rda::ElevationScan> recordRadarData = nullptr;
float recordElevationCut = 0.0f;
std::vector<float> recordElevationCuts;
std::tie(recordRadarData, recordElevationCut, recordElevationCuts) =
record->level2_file()->GetElevationScan(
dataBlockType, elevation, time);
if (recordRadarData != nullptr)
{
auto& radarData0 = (*recordRadarData)[0];
auto collectionTime =
scwx::util::TimePoint(radarData0->modified_julian_date(),
radarData0->collection_time());
// Find the newest radar data, not newer than the selected time
if (radarData == nullptr ||
(collectionTime <= time && foundTime < collectionTime))
{
radarData = recordRadarData;
elevationCut = recordElevationCut;
elevationCuts = std::move(recordElevationCuts);
foundTime = collectionTime;
}
}
}
}
return {radarData, elevationCut, elevationCuts, time};
return {radarData, elevationCut, elevationCuts, foundTime};
}
std::tuple<std::shared_ptr<wsr88d::rpg::Level3Message>,
@ -1449,7 +1485,7 @@ std::vector<std::string> RadarProductManager::GetLevel3Products()
void RadarProductManager::SetCacheLimit(size_t cacheLimit)
{
p->cacheLimit_ = cacheLimit;
p->cacheLimit_ = std::max<std::size_t>(cacheLimit, 6u);
}
void RadarProductManager::UpdateAvailableProducts()

View file

@ -132,6 +132,7 @@ public:
/**
* @brief Set the maximum number of products of each type that may be cached.
* The cache limit cannot be set lower than 6.
*
* @param [in] cacheLimit The maximum number of products of each type
*/

View file

@ -336,10 +336,10 @@ void TimelineManager::ReceiveMapWidgetPainted(std::size_t mapIndex)
std::unique_lock lock {p->radarSweepMonitorMutex_};
// If the radar sweep has been updated
if (p->radarSweepsUpdated_.contains(mapIndex))
if (p->radarSweepsUpdated_.contains(mapIndex) &&
!p->radarSweepsComplete_.contains(mapIndex))
{
// Mark the radar sweep complete
p->radarSweepsUpdated_.erase(mapIndex);
p->radarSweepsComplete_.insert(mapIndex);
// If all sweeps have completed rendering
@ -466,20 +466,12 @@ void TimelineManager::Impl::PlaySync()
// Select the time
auto selectTimeStart = std::chrono::steady_clock::now();
auto [volumeTimeUpdated, selectedTimeUpdated] = SelectTime(newTime);
SelectTime(newTime);
auto selectTimeEnd = std::chrono::steady_clock::now();
auto elapsedTime = selectTimeEnd - selectTimeStart;
if (volumeTimeUpdated)
{
// Wait for radar sweeps to update
RadarSweepMonitorWait(radarSweepMonitorLock);
}
else
{
// Disable radar sweep monitor
RadarSweepMonitorDisable();
}
// Wait for radar sweeps to update
RadarSweepMonitorWait(radarSweepMonitorLock);
// Calculate the interval until the next update, prior to selecting
std::chrono::milliseconds interval;
@ -639,79 +631,63 @@ void TimelineManager::Impl::Step(Direction direction)
// Take a lock for time selection
std::unique_lock lock {selectTimeMutex_};
// Determine time to get active volume times
std::chrono::system_clock::time_point queryTime = adjustedTime_;
if (queryTime == std::chrono::system_clock::time_point {})
std::chrono::system_clock::time_point newTime = selectedTime_;
if (newTime == std::chrono::system_clock::time_point {})
{
queryTime = std::chrono::system_clock::now();
}
// Request active volume times
auto radarProductManager =
manager::RadarProductManager::Instance(radarSite_);
auto volumeTimes = radarProductManager->GetActiveVolumeTimes(queryTime);
if (volumeTimes.empty())
{
logger_->debug("No products to step through");
return;
}
// Dynamically update maximum cached volume scans
UpdateCacheLimit(radarProductManager, volumeTimes);
std::set<std::chrono::system_clock::time_point>::const_iterator it;
if (adjustedTime_ == std::chrono::system_clock::time_point {})
{
// If the adjusted time is live, get the last element in the set
it = std::prev(volumeTimes.cend());
}
else
{
// Get the current element in the set
it = scwx::util::GetBoundedElementIterator(volumeTimes, adjustedTime_);
}
if (it == volumeTimes.cend())
{
// Should not get here, but protect against an error
logger_->error("No suitable volume time found");
return;
}
if (direction == Direction::Back)
{
// Only if we aren't at the beginning of the volume times set
if (it != volumeTimes.cbegin())
if (direction == Direction::Back)
{
// Select the previous time
adjustedTime_ = *(--it);
selectedTime_ = adjustedTime_;
logger_->debug("Volume time updated: {}",
scwx::util::TimeString(adjustedTime_));
Q_EMIT self_->LiveStateUpdated(false);
Q_EMIT self_->VolumeTimeUpdated(adjustedTime_);
Q_EMIT self_->SelectedTimeUpdated(adjustedTime_);
newTime = std::chrono::floor<std::chrono::minutes>(
std::chrono::system_clock::now());
}
else
{
// Cannot step forward any further
return;
}
}
else
// Unlock prior to selecting time
lock.unlock();
// Lock radar sweep monitor
std::unique_lock radarSweepMonitorLock {radarSweepMonitorMutex_};
// Attempt to step forward or backward up to 30 minutes until an update is
// received on at least one map
for (std::size_t i = 0; i < 30; ++i)
{
// Only if we aren't at the end of the volume times set
if (it != std::prev(volumeTimes.cend()))
using namespace std::chrono_literals;
// Increment/decrement selected time by one minute
if (direction == Direction::Back)
{
// Select the next time
adjustedTime_ = *(++it);
selectedTime_ = adjustedTime_;
newTime -= 1min;
}
else
{
newTime += 1min;
logger_->debug("Volume time updated: {}",
scwx::util::TimeString(adjustedTime_));
// If the new time is more than 2 minutes in the future, stop stepping
if (newTime > std::chrono::system_clock::now() + 2min)
{
break;
}
}
Q_EMIT self_->LiveStateUpdated(false);
Q_EMIT self_->VolumeTimeUpdated(adjustedTime_);
Q_EMIT self_->SelectedTimeUpdated(adjustedTime_);
// Reset radar sweep monitor in preparation for update
RadarSweepMonitorReset();
// Select the time
SelectTime(newTime);
// Wait for radar sweeps to update
RadarSweepMonitorWait(radarSweepMonitorLock);
// Check for updates
if (!radarSweepsUpdated_.empty())
{
break;
}
}
}