diff --git a/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp b/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp index dc350e96..d6b0d20b 100644 --- a/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp +++ b/scwx-qt/source/scwx/qt/manager/radar_product_manager.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include @@ -78,7 +78,8 @@ public: level2DataRefreshTimer_ {util::io_context()}, level2DataRefreshTimerMutex_ {}, level2DataProvider_ { - provider::Level2DataProviderFactory::Create(radarId)}, + provider::NexradDataProviderFactory::CreateLevel2DataProvider( + radarId)}, initializeMutex_ {}, loadLevel2DataMutex_ {} { @@ -131,7 +132,7 @@ public: bool level2DataRefreshEnabled_; boost::asio::steady_timer level2DataRefreshTimer_; std::mutex level2DataRefreshTimerMutex_; - std::shared_ptr level2DataProvider_; + std::shared_ptr level2DataProvider_; std::mutex initializeMutex_; std::mutex loadLevel2DataMutex_; diff --git a/wxdata/include/scwx/provider/aws_level2_data_provider.hpp b/wxdata/include/scwx/provider/aws_level2_data_provider.hpp index b0f89d50..fd9682eb 100644 --- a/wxdata/include/scwx/provider/aws_level2_data_provider.hpp +++ b/wxdata/include/scwx/provider/aws_level2_data_provider.hpp @@ -1,18 +1,16 @@ #pragma once -#include +#include namespace scwx { namespace provider { -class AwsLevel2DataProviderImpl; - /** * @brief AWS Level 2 Data Provider */ -class AwsLevel2DataProvider : public Level2DataProvider +class AwsLevel2DataProvider : public AwsNexradDataProvider { public: explicit AwsLevel2DataProvider(const std::string& radarSite); @@ -27,26 +25,18 @@ public: AwsLevel2DataProvider(AwsLevel2DataProvider&&) noexcept; AwsLevel2DataProvider& operator=(AwsLevel2DataProvider&&) noexcept; - size_t cache_size() const; - - std::chrono::system_clock::time_point last_modified() const; - std::chrono::seconds update_period() const; - - std::string FindKey(std::chrono::system_clock::time_point time); - std::string FindLatestKey(); - std::pair - ListObjects(std::chrono::system_clock::time_point date); - std::shared_ptr LoadObjectByKey(const std::string& key); - size_t Refresh(); - std::chrono::system_clock::time_point GetTimePointByKey(const std::string& key) const; static std::chrono::system_clock::time_point GetTimePointFromKey(const std::string& key); +protected: + std::string GetPrefix(std::chrono::system_clock::time_point date); + private: - std::unique_ptr p; + class Impl; + std::unique_ptr p; }; } // namespace provider diff --git a/wxdata/include/scwx/provider/aws_nexrad_data_provider.hpp b/wxdata/include/scwx/provider/aws_nexrad_data_provider.hpp new file mode 100644 index 00000000..d56132ce --- /dev/null +++ b/wxdata/include/scwx/provider/aws_nexrad_data_provider.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include + +namespace scwx +{ +namespace provider +{ + +/** + * @brief AWS NEXRAD Data Provider + */ +class AwsNexradDataProvider : public NexradDataProvider +{ +public: + explicit AwsNexradDataProvider(const std::string& radarSite, + const std::string& bucketName, + const std::string& region); + ~AwsNexradDataProvider(); + + AwsNexradDataProvider(const AwsNexradDataProvider&) = delete; + AwsNexradDataProvider& operator=(const AwsNexradDataProvider&) = delete; + + AwsNexradDataProvider(AwsNexradDataProvider&&) noexcept; + AwsNexradDataProvider& operator=(AwsNexradDataProvider&&) noexcept; + + size_t cache_size() const; + + std::chrono::system_clock::time_point last_modified() const; + std::chrono::seconds update_period() const; + + std::string FindKey(std::chrono::system_clock::time_point time); + std::string FindLatestKey(); + std::pair + ListObjects(std::chrono::system_clock::time_point date); + std::shared_ptr LoadObjectByKey(const std::string& key); + size_t Refresh(); + +protected: + virtual std::string + GetPrefix(std::chrono::system_clock::time_point date) = 0; + +private: + class Impl; + std::unique_ptr p; +}; + +} // namespace provider +} // namespace scwx diff --git a/wxdata/include/scwx/provider/level2_data_provider_factory.hpp b/wxdata/include/scwx/provider/level2_data_provider_factory.hpp deleted file mode 100644 index 5061933f..00000000 --- a/wxdata/include/scwx/provider/level2_data_provider_factory.hpp +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include - -#include - -namespace scwx -{ -namespace provider -{ - -class Level2DataProviderFactory -{ -private: - explicit Level2DataProviderFactory() = delete; - ~Level2DataProviderFactory() = delete; - - Level2DataProviderFactory(const Level2DataProviderFactory&) = delete; - Level2DataProviderFactory& operator=(const Level2DataProviderFactory&) = delete; - - Level2DataProviderFactory(Level2DataProviderFactory&&) noexcept = delete; - Level2DataProviderFactory& operator=(Level2DataProviderFactory&&) noexcept = delete; - -public: - static std::shared_ptr Create(const std::string& radarSite); -}; - -} // namespace provider -} // namespace scwx diff --git a/wxdata/include/scwx/provider/level2_data_provider.hpp b/wxdata/include/scwx/provider/nexrad_data_provider.hpp similarity index 68% rename from wxdata/include/scwx/provider/level2_data_provider.hpp rename to wxdata/include/scwx/provider/nexrad_data_provider.hpp index 2b8b4ca7..cb5dfe75 100644 --- a/wxdata/include/scwx/provider/level2_data_provider.hpp +++ b/wxdata/include/scwx/provider/nexrad_data_provider.hpp @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -11,19 +11,17 @@ namespace scwx namespace provider { -class Level2DataProviderImpl; - -class Level2DataProvider +class NexradDataProvider { public: - explicit Level2DataProvider(); - ~Level2DataProvider(); + explicit NexradDataProvider(); + ~NexradDataProvider(); - Level2DataProvider(const Level2DataProvider&) = delete; - Level2DataProvider& operator=(const Level2DataProvider&) = delete; + NexradDataProvider(const NexradDataProvider&) = delete; + NexradDataProvider& operator=(const NexradDataProvider&) = delete; - Level2DataProvider(Level2DataProvider&&) noexcept; - Level2DataProvider& operator=(Level2DataProvider&&) noexcept; + NexradDataProvider(NexradDataProvider&&) noexcept; + NexradDataProvider& operator=(NexradDataProvider&&) noexcept; virtual size_t cache_size() const = 0; @@ -49,19 +47,19 @@ public: * * @param time Upper-bound time for the key search * - * @return Level 2 data key + * @return NEXRAD data key */ virtual std::string FindKey(std::chrono::system_clock::time_point time) = 0; /** * Finds the most recent key in the cache. * - * @return Level 2 data key + * @return NEXRAD data key */ virtual std::string FindLatestKey() = 0; /** - * Lists level 2 objects for the date supplied, and adds them to the cache. + * Lists NEXRAD objects for the date supplied, and adds them to the cache. * * @param date Date for which to list objects * @@ -72,17 +70,17 @@ public: ListObjects(std::chrono::system_clock::time_point date) = 0; /** - * Loads a level 2 object by the given key. + * Loads a NEXRAD file object by the given key. * - * @param key Level 2 data key + * @param key NEXRAD data key * - * @return Level 2 data + * @return NEXRAD data */ - virtual std::shared_ptr + virtual std::shared_ptr LoadObjectByKey(const std::string& key) = 0; /** - * Lists level 2 objects for the current date, and adds them to the cache. If + * Lists NEXRAD objects for the current date, and adds them to the cache. If * no objects have been added to the cache for the current date, the previous * date is also queried for data. * @@ -93,15 +91,16 @@ public: /** * Convert the object key to a time point. * - * @key Level 2 data key + * @key NEXRAD data key * - * @return Level 2 data time point + * @return NEXRAD data time point */ virtual std::chrono::system_clock::time_point GetTimePointByKey(const std::string& key) const = 0; private: - std::unique_ptr p; + class Impl; + std::unique_ptr p; }; } // namespace provider diff --git a/wxdata/include/scwx/provider/nexrad_data_provider_factory.hpp b/wxdata/include/scwx/provider/nexrad_data_provider_factory.hpp new file mode 100644 index 00000000..fd49e07a --- /dev/null +++ b/wxdata/include/scwx/provider/nexrad_data_provider_factory.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include + +#include + +namespace scwx +{ +namespace provider +{ + +class NexradDataProviderFactory +{ +private: + explicit NexradDataProviderFactory() = delete; + ~NexradDataProviderFactory() = delete; + + NexradDataProviderFactory(const NexradDataProviderFactory&) = delete; + NexradDataProviderFactory& + operator=(const NexradDataProviderFactory&) = delete; + + NexradDataProviderFactory(NexradDataProviderFactory&&) noexcept = delete; + NexradDataProviderFactory& + operator=(NexradDataProviderFactory&&) noexcept = delete; + +public: + static std::shared_ptr + CreateLevel2DataProvider(const std::string& radarSite); +}; + +} // namespace provider +} // namespace scwx diff --git a/wxdata/source/scwx/provider/aws_level2_data_provider.cpp b/wxdata/source/scwx/provider/aws_level2_data_provider.cpp index 6120e419..dced937d 100644 --- a/wxdata/source/scwx/provider/aws_level2_data_provider.cpp +++ b/wxdata/source/scwx/provider/aws_level2_data_provider.cpp @@ -1,14 +1,7 @@ #include #include -#include #include -#include -#include - -#include -#include -#include #include #include @@ -24,63 +17,14 @@ static const auto logger_ = util::Logger::Create(logPrefix_); static const std::string kDefaultBucketName_ = "noaa-nexrad-level2"; static const std::string kDefaultRegion_ = "us-east-1"; -// Keep at least today, yesterday, and one more date -static const size_t kMinDatesBeforePruning_ = 4; -static const size_t kMaxObjects_ = 2500; - -class AwsLevel2DataProviderImpl +class AwsLevel2DataProvider::Impl { public: - struct ObjectRecord - { - explicit ObjectRecord( - const std::string& key, - std::chrono::system_clock::time_point lastModified) : - key_ {key}, lastModified_ {lastModified} - { - } - ~ObjectRecord() = default; + explicit Impl(const std::string& radarSite) : radarSite_ {radarSite} {} - std::string key_; - std::chrono::system_clock::time_point lastModified_; - }; - - explicit AwsLevel2DataProviderImpl(const std::string& radarSite, - const std::string& bucketName, - const std::string& region) : - radarSite_ {radarSite}, - bucketName_ {bucketName}, - region_ {region}, - client_ {nullptr}, - objects_ {}, - objectsMutex_ {}, - lastModified_ {}, - updatePeriod_ {} - { - Aws::Client::ClientConfiguration config; - config.region = region_; - - client_ = std::make_unique(config); - } - - ~AwsLevel2DataProviderImpl() {} - - void PruneObjects(); - void UpdateMetadata(); - void UpdateObjectDates(std::chrono::system_clock::time_point date); + ~Impl() {} std::string radarSite_; - std::string bucketName_; - std::string region_; - - std::unique_ptr client_; - - std::map objects_; - std::shared_mutex objectsMutex_; - std::list objectDates_; - - std::chrono::system_clock::time_point lastModified_; - std::chrono::seconds updatePeriod_; }; AwsLevel2DataProvider::AwsLevel2DataProvider(const std::string& radarSite) : @@ -90,8 +34,8 @@ AwsLevel2DataProvider::AwsLevel2DataProvider(const std::string& radarSite) : AwsLevel2DataProvider::AwsLevel2DataProvider(const std::string& radarSite, const std::string& bucketName, const std::string& region) : - p(std::make_unique( - radarSite, bucketName, region)) + AwsNexradDataProvider(radarSite, bucketName, region), + p(std::make_unique(radarSite)) { } AwsLevel2DataProvider::~AwsLevel2DataProvider() = default; @@ -101,255 +45,10 @@ AwsLevel2DataProvider::AwsLevel2DataProvider(AwsLevel2DataProvider&&) noexcept = AwsLevel2DataProvider& AwsLevel2DataProvider::operator=(AwsLevel2DataProvider&&) noexcept = default; -size_t AwsLevel2DataProvider::cache_size() const -{ - return p->objects_.size(); -} - -std::chrono::seconds AwsLevel2DataProvider::update_period() const -{ - return p->updatePeriod_; -} - -std::chrono::system_clock::time_point -AwsLevel2DataProvider::last_modified() const -{ - return p->lastModified_; -} - std::string -AwsLevel2DataProvider::FindKey(std::chrono::system_clock::time_point time) +AwsLevel2DataProvider::GetPrefix(std::chrono::system_clock::time_point date) { - logger_->debug("FindKey: {}", util::TimeString(time)); - - std::string key {}; - - std::shared_lock lock(p->objectsMutex_); - - auto element = util::GetBoundedElement(p->objects_, time); - - if (element.has_value()) - { - key = element->key_; - } - - return key; -} - -std::string AwsLevel2DataProvider::FindLatestKey() -{ - logger_->debug("FindLatestKey()"); - - std::string key {}; - - std::shared_lock lock(p->objectsMutex_); - - if (!p->objects_.empty()) - { - key = p->objects_.crbegin()->second.key_; - } - - return key; -} - -std::pair -AwsLevel2DataProvider::ListObjects(std::chrono::system_clock::time_point date) -{ - const std::string prefix = - fmt::format("{0:%Y/%m/%d}/{1}/", fmt::gmtime(date), p->radarSite_); - - logger_->debug("ListObjects: {}", prefix); - - Aws::S3::Model::ListObjectsV2Request request; - request.SetBucket(p->bucketName_); - request.SetPrefix(prefix); - - auto outcome = p->client_->ListObjectsV2(request); - - size_t newObjects = 0; - size_t totalObjects = 0; - - if (outcome.IsSuccess()) - { - auto& objects = outcome.GetResult().GetContents(); - - logger_->debug("Found {} objects", objects.size()); - - // Store objects - std::for_each( // - objects.cbegin(), - objects.cend(), - [&](const Aws::S3::Model::Object& object) - { - std::string key = object.GetKey(); - - if (!key.ends_with("_MDM")) - { - auto time = GetTimePointFromKey(key); - - std::chrono::seconds lastModifiedSeconds { - object.GetLastModified().Seconds()}; - std::chrono::system_clock::time_point lastModified { - lastModifiedSeconds}; - - std::unique_lock lock(p->objectsMutex_); - - auto [it, inserted] = p->objects_.insert_or_assign( - time, - AwsLevel2DataProviderImpl::ObjectRecord {key, lastModified}); - - if (inserted) - { - newObjects++; - } - - totalObjects++; - } - }); - - if (newObjects > 0) - { - p->UpdateObjectDates(date); - p->PruneObjects(); - p->UpdateMetadata(); - } - } - else - { - logger_->warn("Could not list objects: {}", - outcome.GetError().GetMessage()); - } - - return std::make_pair(newObjects, totalObjects); -} - -std::shared_ptr -AwsLevel2DataProvider::LoadObjectByKey(const std::string& key) -{ - std::shared_ptr level2File = nullptr; - - Aws::S3::Model::GetObjectRequest request; - request.SetBucket(p->bucketName_); - request.SetKey(key); - - auto outcome = p->client_->GetObject(request); - - if (outcome.IsSuccess()) - { - auto& body = outcome.GetResultWithOwnership().GetBody(); - - std::shared_ptr nexradFile = - wsr88d::NexradFileFactory::Create(body); - - level2File = std::dynamic_pointer_cast(nexradFile); - } - else - { - logger_->warn("Could not get object: {}", - outcome.GetError().GetMessage()); - } - - return level2File; -} - -size_t AwsLevel2DataProvider::Refresh() -{ - using namespace std::chrono; - - logger_->debug("Refresh()"); - - static std::mutex refreshMutex; - static system_clock::time_point refreshDate {}; - - auto today = floor(system_clock::now()); - auto yesterday = today - days {1}; - - std::unique_lock lock(refreshMutex); - - size_t totalNewObjects = 0; - - // If we haven't gotten any objects from today, first list objects for - // yesterday, to ensure we haven't missed any objects near midnight - if (refreshDate < today) - { - auto [newObjects, totalObjects] = ListObjects(yesterday); - totalNewObjects = newObjects; - if (totalObjects > 0) - { - refreshDate = yesterday; - } - } - - auto [newObjects, totalObjects] = ListObjects(today); - totalNewObjects += newObjects; - if (totalObjects > 0) - { - refreshDate = today; - } - - return totalNewObjects; -} - -void AwsLevel2DataProviderImpl::PruneObjects() -{ - using namespace std::chrono; - - auto today = floor(system_clock::now()); - auto yesterday = today - days {1}; - - std::unique_lock lock(objectsMutex_); - - for (auto it = objectDates_.cbegin(); - it != objectDates_.cend() && objects_.size() > kMaxObjects_ && - objectDates_.size() >= kMinDatesBeforePruning_;) - { - if (*it < yesterday) - { - // Erase oldest keys from objects list - auto eraseBegin = objects_.lower_bound(*it); - auto eraseEnd = objects_.lower_bound(*it + days {1}); - objects_.erase(eraseBegin, eraseEnd); - - // Remove oldest date from object dates list - it = objectDates_.erase(it); - } - else - { - ++it; - } - } -} - -void AwsLevel2DataProviderImpl::UpdateMetadata() -{ - std::shared_lock lock(objectsMutex_); - - if (!objects_.empty()) - { - lastModified_ = objects_.crbegin()->second.lastModified_; - } - - if (objects_.size() >= 2) - { - auto it = objects_.crbegin(); - auto lastModified = it->second.lastModified_; - auto prevModified = (++it)->second.lastModified_; - auto delta = lastModified - prevModified; - - updatePeriod_ = std::chrono::duration_cast(delta); - } -} - -void AwsLevel2DataProviderImpl::UpdateObjectDates( - std::chrono::system_clock::time_point date) -{ - auto day = std::chrono::floor(date); - - std::unique_lock lock(objectsMutex_); - - // Remove any existing occurrences of day, and add to the back of the list - objectDates_.remove(day); - objectDates_.push_back(day); + return fmt::format("{0:%Y/%m/%d}/{1}/", fmt::gmtime(date), p->radarSite_); } std::chrono::system_clock::time_point diff --git a/wxdata/source/scwx/provider/aws_nexrad_data_provider.cpp b/wxdata/source/scwx/provider/aws_nexrad_data_provider.cpp new file mode 100644 index 00000000..90555e5b --- /dev/null +++ b/wxdata/source/scwx/provider/aws_nexrad_data_provider.cpp @@ -0,0 +1,341 @@ +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +namespace scwx +{ +namespace provider +{ + +static const std::string logPrefix_ = + "scwx::provider::aws_nexrad_data_provider"; +static const auto logger_ = util::Logger::Create(logPrefix_); + +// Keep at least today, yesterday, and one more date +static const size_t kMinDatesBeforePruning_ = 4; +static const size_t kMaxObjects_ = 2500; + +class AwsNexradDataProvider::Impl +{ +public: + struct ObjectRecord + { + explicit ObjectRecord( + const std::string& key, + std::chrono::system_clock::time_point lastModified) : + key_ {key}, lastModified_ {lastModified} + { + } + ~ObjectRecord() = default; + + std::string key_; + std::chrono::system_clock::time_point lastModified_; + }; + + explicit Impl(const std::string& radarSite, + const std::string& bucketName, + const std::string& region) : + radarSite_ {radarSite}, + bucketName_ {bucketName}, + region_ {region}, + client_ {nullptr}, + objects_ {}, + objectsMutex_ {}, + lastModified_ {}, + updatePeriod_ {} + { + Aws::Client::ClientConfiguration config; + config.region = region_; + + client_ = std::make_unique(config); + } + + ~Impl() {} + + void PruneObjects(); + void UpdateMetadata(); + void UpdateObjectDates(std::chrono::system_clock::time_point date); + + std::string radarSite_; + std::string bucketName_; + std::string region_; + + std::unique_ptr client_; + + std::map objects_; + std::shared_mutex objectsMutex_; + std::list objectDates_; + + std::chrono::system_clock::time_point lastModified_; + std::chrono::seconds updatePeriod_; +}; + +AwsNexradDataProvider::AwsNexradDataProvider(const std::string& radarSite, + const std::string& bucketName, + const std::string& region) : + p(std::make_unique(radarSite, bucketName, region)) +{ +} +AwsNexradDataProvider::~AwsNexradDataProvider() = default; + +AwsNexradDataProvider::AwsNexradDataProvider(AwsNexradDataProvider&&) noexcept = + default; +AwsNexradDataProvider& +AwsNexradDataProvider::operator=(AwsNexradDataProvider&&) noexcept = default; + +size_t AwsNexradDataProvider::cache_size() const +{ + return p->objects_.size(); +} + +std::chrono::seconds AwsNexradDataProvider::update_period() const +{ + return p->updatePeriod_; +} + +std::chrono::system_clock::time_point +AwsNexradDataProvider::last_modified() const +{ + return p->lastModified_; +} + +std::string +AwsNexradDataProvider::FindKey(std::chrono::system_clock::time_point time) +{ + logger_->debug("FindKey: {}", util::TimeString(time)); + + std::string key {}; + + std::shared_lock lock(p->objectsMutex_); + + auto element = util::GetBoundedElement(p->objects_, time); + + if (element.has_value()) + { + key = element->key_; + } + + return key; +} + +std::string AwsNexradDataProvider::FindLatestKey() +{ + logger_->debug("FindLatestKey()"); + + std::string key {}; + + std::shared_lock lock(p->objectsMutex_); + + if (!p->objects_.empty()) + { + key = p->objects_.crbegin()->second.key_; + } + + return key; +} + +std::pair +AwsNexradDataProvider::ListObjects(std::chrono::system_clock::time_point date) +{ + const std::string prefix {GetPrefix(date)}; + + logger_->debug("ListObjects: {}", prefix); + + Aws::S3::Model::ListObjectsV2Request request; + request.SetBucket(p->bucketName_); + request.SetPrefix(prefix); + + auto outcome = p->client_->ListObjectsV2(request); + + size_t newObjects = 0; + size_t totalObjects = 0; + + if (outcome.IsSuccess()) + { + auto& objects = outcome.GetResult().GetContents(); + + logger_->debug("Found {} objects", objects.size()); + + // Store objects + std::for_each( // + objects.cbegin(), + objects.cend(), + [&](const Aws::S3::Model::Object& object) + { + std::string key = object.GetKey(); + + if (!key.ends_with("_MDM")) + { + auto time = GetTimePointByKey(key); + + std::chrono::seconds lastModifiedSeconds { + object.GetLastModified().Seconds()}; + std::chrono::system_clock::time_point lastModified { + lastModifiedSeconds}; + + std::unique_lock lock(p->objectsMutex_); + + auto [it, inserted] = p->objects_.insert_or_assign( + time, Impl::ObjectRecord {key, lastModified}); + + if (inserted) + { + newObjects++; + } + + totalObjects++; + } + }); + + if (newObjects > 0) + { + p->UpdateObjectDates(date); + p->PruneObjects(); + p->UpdateMetadata(); + } + } + else + { + logger_->warn("Could not list objects: {}", + outcome.GetError().GetMessage()); + } + + return std::make_pair(newObjects, totalObjects); +} + +std::shared_ptr +AwsNexradDataProvider::LoadObjectByKey(const std::string& key) +{ + std::shared_ptr nexradFile = nullptr; + + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(p->bucketName_); + request.SetKey(key); + + auto outcome = p->client_->GetObject(request); + + if (outcome.IsSuccess()) + { + auto& body = outcome.GetResultWithOwnership().GetBody(); + + nexradFile = wsr88d::NexradFileFactory::Create(body); + } + else + { + logger_->warn("Could not get object: {}", + outcome.GetError().GetMessage()); + } + + return nexradFile; +} + +size_t AwsNexradDataProvider::Refresh() +{ + using namespace std::chrono; + + logger_->debug("Refresh()"); + + static std::mutex refreshMutex; + static system_clock::time_point refreshDate {}; + + auto today = floor(system_clock::now()); + auto yesterday = today - days {1}; + + std::unique_lock lock(refreshMutex); + + size_t totalNewObjects = 0; + + // If we haven't gotten any objects from today, first list objects for + // yesterday, to ensure we haven't missed any objects near midnight + if (refreshDate < today) + { + auto [newObjects, totalObjects] = ListObjects(yesterday); + totalNewObjects = newObjects; + if (totalObjects > 0) + { + refreshDate = yesterday; + } + } + + auto [newObjects, totalObjects] = ListObjects(today); + totalNewObjects += newObjects; + if (totalObjects > 0) + { + refreshDate = today; + } + + return totalNewObjects; +} + +void AwsNexradDataProvider::Impl::PruneObjects() +{ + using namespace std::chrono; + + auto today = floor(system_clock::now()); + auto yesterday = today - days {1}; + + std::unique_lock lock(objectsMutex_); + + for (auto it = objectDates_.cbegin(); + it != objectDates_.cend() && objects_.size() > kMaxObjects_ && + objectDates_.size() >= kMinDatesBeforePruning_;) + { + if (*it < yesterday) + { + // Erase oldest keys from objects list + auto eraseBegin = objects_.lower_bound(*it); + auto eraseEnd = objects_.lower_bound(*it + days {1}); + objects_.erase(eraseBegin, eraseEnd); + + // Remove oldest date from object dates list + it = objectDates_.erase(it); + } + else + { + ++it; + } + } +} + +void AwsNexradDataProvider::Impl::UpdateMetadata() +{ + std::shared_lock lock(objectsMutex_); + + if (!objects_.empty()) + { + lastModified_ = objects_.crbegin()->second.lastModified_; + } + + if (objects_.size() >= 2) + { + auto it = objects_.crbegin(); + auto lastModified = it->second.lastModified_; + auto prevModified = (++it)->second.lastModified_; + auto delta = lastModified - prevModified; + + updatePeriod_ = std::chrono::duration_cast(delta); + } +} + +void AwsNexradDataProvider::Impl::UpdateObjectDates( + std::chrono::system_clock::time_point date) +{ + auto day = std::chrono::floor(date); + + std::unique_lock lock(objectsMutex_); + + // Remove any existing occurrences of day, and add to the back of the list + objectDates_.remove(day); + objectDates_.push_back(day); +} + +} // namespace provider +} // namespace scwx diff --git a/wxdata/source/scwx/provider/level2_data_provider.cpp b/wxdata/source/scwx/provider/level2_data_provider.cpp deleted file mode 100644 index 934cf781..00000000 --- a/wxdata/source/scwx/provider/level2_data_provider.cpp +++ /dev/null @@ -1,29 +0,0 @@ -#include - -namespace scwx -{ -namespace provider -{ - -static const std::string logPrefix_ = "scwx::provider::level2_data_provider"; - -class Level2DataProviderImpl -{ -public: - explicit Level2DataProviderImpl() {} - - ~Level2DataProviderImpl() {} -}; - -Level2DataProvider::Level2DataProvider() : - p(std::make_unique()) -{ -} -Level2DataProvider::~Level2DataProvider() = default; - -Level2DataProvider::Level2DataProvider(Level2DataProvider&&) noexcept = default; -Level2DataProvider& -Level2DataProvider::operator=(Level2DataProvider&&) noexcept = default; - -} // namespace provider -} // namespace scwx diff --git a/wxdata/source/scwx/provider/level2_data_provider_factory.cpp b/wxdata/source/scwx/provider/level2_data_provider_factory.cpp deleted file mode 100644 index 3b04badb..00000000 --- a/wxdata/source/scwx/provider/level2_data_provider_factory.cpp +++ /dev/null @@ -1,28 +0,0 @@ -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace scwx -{ -namespace provider -{ - -static const std::string logPrefix_ = - "scwx::provider::level2_data_provider_factory"; - -std::shared_ptr -Level2DataProviderFactory::Create(const std::string& radarSite) -{ - return std::make_unique(radarSite); -} - -} // namespace provider -} // namespace scwx diff --git a/wxdata/source/scwx/provider/nexrad_data_provider.cpp b/wxdata/source/scwx/provider/nexrad_data_provider.cpp new file mode 100644 index 00000000..74274fe8 --- /dev/null +++ b/wxdata/source/scwx/provider/nexrad_data_provider.cpp @@ -0,0 +1,26 @@ +#include + +namespace scwx +{ +namespace provider +{ + +static const std::string logPrefix_ = "scwx::provider::nexrad_data_provider"; + +class NexradDataProvider::Impl +{ +public: + explicit Impl() {} + + ~Impl() {} +}; + +NexradDataProvider::NexradDataProvider() : p(std::make_unique()) {} +NexradDataProvider::~NexradDataProvider() = default; + +NexradDataProvider::NexradDataProvider(NexradDataProvider&&) noexcept = default; +NexradDataProvider& +NexradDataProvider::operator=(NexradDataProvider&&) noexcept = default; + +} // namespace provider +} // namespace scwx diff --git a/wxdata/source/scwx/provider/nexrad_data_provider_factory.cpp b/wxdata/source/scwx/provider/nexrad_data_provider_factory.cpp new file mode 100644 index 00000000..58779ca6 --- /dev/null +++ b/wxdata/source/scwx/provider/nexrad_data_provider_factory.cpp @@ -0,0 +1,20 @@ +#include +#include + +namespace scwx +{ +namespace provider +{ + +static const std::string logPrefix_ = + "scwx::provider::nexrad_data_provider_factory"; + +std::shared_ptr +NexradDataProviderFactory::CreateLevel2DataProvider( + const std::string& radarSite) +{ + return std::make_unique(radarSite); +} + +} // namespace provider +} // namespace scwx diff --git a/wxdata/wxdata.cmake b/wxdata/wxdata.cmake index ee41fba6..b3852a84 100644 --- a/wxdata/wxdata.cmake +++ b/wxdata/wxdata.cmake @@ -35,11 +35,13 @@ set(SRC_COMMON source/scwx/common/color_table.cpp source/scwx/common/sites.cpp source/scwx/common/vcp.cpp) set(HDR_PROVIDER include/scwx/provider/aws_level2_data_provider.hpp - include/scwx/provider/level2_data_provider.hpp - include/scwx/provider/level2_data_provider_factory.hpp) + include/scwx/provider/aws_nexrad_data_provider.hpp + include/scwx/provider/nexrad_data_provider.hpp + include/scwx/provider/nexrad_data_provider_factory.hpp) set(SRC_PROVIDER source/scwx/provider/aws_level2_data_provider.cpp - source/scwx/provider/level2_data_provider.cpp - source/scwx/provider/level2_data_provider_factory.cpp) + source/scwx/provider/aws_nexrad_data_provider.cpp + source/scwx/provider/nexrad_data_provider.cpp + source/scwx/provider/nexrad_data_provider_factory.cpp) set(HDR_UTIL include/scwx/util/float.hpp include/scwx/util/iterator.hpp include/scwx/util/logger.hpp