From 759a9e43798b98d7b0ef8c514e58f1e3254a36a3 Mon Sep 17 00:00:00 2001 From: AdenKoperczak Date: Sun, 20 Apr 2025 13:02:02 -0400 Subject: [PATCH] Parallelize the chunks loading and load from archive when possible --- .../scwx/qt/manager/radar_product_manager.cpp | 10 +++ .../aws_level2_chunks_data_provider.hpp | 4 + .../aws_level2_chunks_data_provider.cpp | 86 ++++++++++++++++--- 3 files changed, 88 insertions(+), 12 deletions(-) diff --git a/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp b/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp index 1814905d..e56be177 100644 --- a/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp +++ b/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp @@ -151,6 +151,16 @@ public: level2ChunksProviderManager_->provider_ = provider::NexradDataProviderFactory::CreateLevel2ChunksDataProvider( radarId); + + auto level2ChunksProvider = + std::dynamic_pointer_cast( + level2ChunksProviderManager_->provider_); + if (level2ChunksProvider != nullptr) + { + level2ChunksProvider->SetLevel2DataProvider( + std::dynamic_pointer_cast( + level2ProviderManager_->provider_)); + } } ~RadarProductManagerImpl() { diff --git a/wxdata/include/scwx/provider/aws_level2_chunks_data_provider.hpp b/wxdata/include/scwx/provider/aws_level2_chunks_data_provider.hpp index 976f0663..476ff111 100644 --- a/wxdata/include/scwx/provider/aws_level2_chunks_data_provider.hpp +++ b/wxdata/include/scwx/provider/aws_level2_chunks_data_provider.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include @@ -61,6 +62,9 @@ public: std::optional GetCurrentElevation(); + void SetLevel2DataProvider( + const std::shared_ptr& provider); + private: class Impl; std::unique_ptr p; diff --git a/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp b/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp index ddac82e5..bb1b31c1 100644 --- a/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp +++ b/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp @@ -13,10 +13,24 @@ #include #include #include -#include #include #include +// Avoid circular refrence errors in boost +// NOLINTBEGIN(misc-header-include-cycle) +#if defined(_MSC_VER) +# pragma warning(push, 0) +#endif + +#include +#include +#include + +#if defined(_MSC_VER) +# pragma warning(pop) +#endif +// NOLINTEND(misc-header-include-cycle) + #if (__cpp_lib_chrono < 201907L) # include #endif @@ -75,6 +89,7 @@ public: lastTimeListed_ {}, // NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers) about average updatePeriod_ {7}, + level2DataProvider_ {}, self_ {self} { // Disable HTTP request for region @@ -122,6 +137,8 @@ public: std::chrono::seconds updatePeriod_; + std::weak_ptr level2DataProvider_; + AwsLevel2ChunksDataProvider* self_; }; @@ -634,7 +651,6 @@ AwsLevel2ChunksDataProvider::LoadLatestObject() const std::unique_lock lock(p->scansMutex_); return std::make_shared(p->currentScan_.nexradFile_, p->lastScan_.nexradFile_); - // return p->currentScan_.nexradFile_; } std::shared_ptr @@ -663,23 +679,63 @@ std::pair AwsLevel2ChunksDataProvider::Refresh() auto [success, newObjects, totalObjects] = p->ListObjects(); + auto threadPool = boost::asio::thread_pool(3); + bool newCurrent = false; + bool newLast = false; if (p->currentScan_.valid_) { - if (p->LoadScan(p->currentScan_)) - { - newObjects += 1; - } + boost::asio::post(threadPool, + [this, &newCurrent]() + { newCurrent = p->LoadScan(p->currentScan_); }); totalObjects += 1; } + if (p->lastScan_.valid_) { - // TODO this is slow when initially loading data. If possible, loading - // this from the archive may speed it up a lot. - if (p->LoadScan(p->lastScan_)) - { - newObjects += 1; - } totalObjects += 1; + boost::asio::post( + threadPool, + [this, &newLast]() + { + if (!p->lastScan_.hasAllFiles_) + { + // If we have chunks, use chunks + if (p->lastScan_.nextFile_ != 1) + { + newLast = p->LoadScan(p->lastScan_); + } + else + { + auto level2DataProvider = p->level2DataProvider_.lock(); + if (level2DataProvider != nullptr) + { + level2DataProvider->ListObjects(p->lastScan_.time_); + p->lastScan_.nexradFile_ = + std::dynamic_pointer_cast( + level2DataProvider->LoadObjectByTime( + p->lastScan_.time_)); + if (p->lastScan_.nexradFile_ != nullptr) + { + p->lastScan_.hasAllFiles_ = true; + // TODO maybe set lastModified for timing + } + } + // Fall back to chunks if files did not load + newLast = p->lastScan_.nexradFile_ != nullptr || + p->LoadScan(p->lastScan_); + } + } + }); + } + + threadPool.wait(); + if (newCurrent) + { + newObjects += 1; + } + if (newLast) + { + newObjects += 1; } timer.stop(); @@ -732,4 +788,10 @@ std::optional AwsLevel2ChunksDataProvider::GetCurrentElevation() return {}; } +void AwsLevel2ChunksDataProvider::SetLevel2DataProvider( + const std::shared_ptr& provider) +{ + p->level2DataProvider_ = provider; +} + } // namespace scwx::provider