From 4b9d12a7ef6bc9e8e90c0631ce605f9ad7a4650e Mon Sep 17 00:00:00 2001 From: Dan Paulat Date: Sat, 28 May 2022 02:04:59 -0500 Subject: [PATCH] Additional level 2 data provider functionality for display of latest data --- .../provider/aws_level2_data_provider.hpp | 7 + .../scwx/provider/level2_data_provider.hpp | 34 +++++ .../provider/aws_level2_data_provider.cpp | 130 ++++++++++++++---- 3 files changed, 147 insertions(+), 24 deletions(-) diff --git a/wxdata/include/scwx/provider/aws_level2_data_provider.hpp b/wxdata/include/scwx/provider/aws_level2_data_provider.hpp index 2d99f801..b0f89d50 100644 --- a/wxdata/include/scwx/provider/aws_level2_data_provider.hpp +++ b/wxdata/include/scwx/provider/aws_level2_data_provider.hpp @@ -29,12 +29,19 @@ public: size_t cache_size() const; + std::chrono::system_clock::time_point last_modified() const; + std::chrono::seconds update_period() const; + std::string FindKey(std::chrono::system_clock::time_point time); + std::string FindLatestKey(); std::pair ListObjects(std::chrono::system_clock::time_point date); std::shared_ptr LoadObjectByKey(const std::string& key); size_t Refresh(); + std::chrono::system_clock::time_point + GetTimePointByKey(const std::string& key) const; + static std::chrono::system_clock::time_point GetTimePointFromKey(const std::string& key); diff --git a/wxdata/include/scwx/provider/level2_data_provider.hpp b/wxdata/include/scwx/provider/level2_data_provider.hpp index 976020b4..2b8b4ca7 100644 --- a/wxdata/include/scwx/provider/level2_data_provider.hpp +++ b/wxdata/include/scwx/provider/level2_data_provider.hpp @@ -27,6 +27,23 @@ public: virtual size_t cache_size() const = 0; + /** + * Gets the last modified time. This is equal to the most recent object's + * modification time. If there are no objects, the epoch is returned. + * + * @return Last modified time + */ + virtual std::chrono::system_clock::time_point last_modified() const = 0; + + /** + * Gets the current update period. This is equal to the difference between + * the last two objects' modification times. If there are less than two + * objects, an update period of 0 is returned. + * + * @return Update period + */ + virtual std::chrono::seconds update_period() const = 0; + /** * Finds the most recent key in the cache, no later than the time provided. * @@ -36,6 +53,13 @@ public: */ virtual std::string FindKey(std::chrono::system_clock::time_point time) = 0; + /** + * Finds the most recent key in the cache. + * + * @return Level 2 data key + */ + virtual std::string FindLatestKey() = 0; + /** * Lists level 2 objects for the date supplied, and adds them to the cache. * @@ -66,6 +90,16 @@ public: */ virtual size_t Refresh() = 0; + /** + * Convert the object key to a time point. + * + * @key Level 2 data key + * + * @return Level 2 data time point + */ + virtual std::chrono::system_clock::time_point + GetTimePointByKey(const std::string& key) const = 0; + private: std::unique_ptr p; }; diff --git a/wxdata/source/scwx/provider/aws_level2_data_provider.cpp b/wxdata/source/scwx/provider/aws_level2_data_provider.cpp index 9088dae5..aac6da3f 100644 --- a/wxdata/source/scwx/provider/aws_level2_data_provider.cpp +++ b/wxdata/source/scwx/provider/aws_level2_data_provider.cpp @@ -27,6 +27,20 @@ static const std::string kDefaultRegion_ = "us-east-1"; class AwsLevel2DataProviderImpl { public: + struct ObjectRecord + { + explicit ObjectRecord( + const std::string& key, + std::chrono::system_clock::time_point lastModified) : + key_ {key}, lastModified_ {lastModified} + { + } + ~ObjectRecord() = default; + + std::string key_; + std::chrono::system_clock::time_point lastModified_; + }; + explicit AwsLevel2DataProviderImpl(const std::string& radarSite, const std::string& bucketName, const std::string& region) : @@ -35,7 +49,9 @@ public: region_ {region}, client_ {nullptr}, objects_ {}, - objectsMutex_ {} + objectsMutex_ {}, + lastModified_ {}, + updatePeriod_ {} { Aws::Client::ClientConfiguration config; config.region = region_; @@ -45,14 +61,19 @@ public: ~AwsLevel2DataProviderImpl() {} + void UpdateMetadata(); + std::string radarSite_; std::string bucketName_; std::string region_; std::unique_ptr client_; - std::map objects_; - std::shared_mutex objectsMutex_; + std::map objects_; + std::shared_mutex objectsMutex_; + + std::chrono::system_clock::time_point lastModified_; + std::chrono::seconds updatePeriod_; }; AwsLevel2DataProvider::AwsLevel2DataProvider(const std::string& radarSite) : @@ -78,6 +99,17 @@ size_t AwsLevel2DataProvider::cache_size() const return p->objects_.size(); } +std::chrono::seconds AwsLevel2DataProvider::update_period() const +{ + return p->updatePeriod_; +} + +std::chrono::system_clock::time_point +AwsLevel2DataProvider::last_modified() const +{ + return p->lastModified_; +} + std::string AwsLevel2DataProvider::FindKey(std::chrono::system_clock::time_point time) { @@ -87,12 +119,27 @@ AwsLevel2DataProvider::FindKey(std::chrono::system_clock::time_point time) std::shared_lock lock(p->objectsMutex_); - std::optional element = - util::GetBoundedElement(p->objects_, time); + auto element = util::GetBoundedElement(p->objects_, time); if (element.has_value()) { - key = *element; + key = element->key_; + } + + return key; +} + +std::string AwsLevel2DataProvider::FindLatestKey() +{ + logger_->debug("FindLatestKey()"); + + std::string key {}; + + std::shared_lock lock(p->objectsMutex_); + + if (!p->objects_.empty()) + { + key = p->objects_.crbegin()->second.key_; } return key; @@ -122,29 +169,36 @@ AwsLevel2DataProvider::ListObjects(std::chrono::system_clock::time_point date) logger_->debug("Found {} objects", objects.size()); // Store objects - std::for_each(objects.cbegin(), - objects.cend(), - [&](const Aws::S3::Model::Object& object) - { - std::string key = object.GetKey(); + std::for_each( // + objects.cbegin(), + objects.cend(), + [&](const Aws::S3::Model::Object& object) + { + std::string key = object.GetKey(); - if (!key.ends_with("_MDM")) - { - auto time = GetTimePointFromKey(key); + if (!key.ends_with("_MDM")) + { + auto time = GetTimePointFromKey(key); - std::unique_lock lock(p->objectsMutex_); + std::chrono::seconds lastModifiedSeconds { + object.GetLastModified().Seconds()}; + std::chrono::system_clock::time_point lastModified { + lastModifiedSeconds}; - auto [it, inserted] = - p->objects_.insert_or_assign(time, key); + std::unique_lock lock(p->objectsMutex_); - if (inserted) - { - newObjects++; - } + auto [it, inserted] = p->objects_.insert_or_assign( + time, + AwsLevel2DataProviderImpl::ObjectRecord {key, lastModified}); - totalObjects++; - } - }); + if (inserted) + { + newObjects++; + } + + totalObjects++; + } + }); } else { @@ -152,6 +206,8 @@ AwsLevel2DataProvider::ListObjects(std::chrono::system_clock::time_point date) outcome.GetError().GetMessage()); } + p->UpdateMetadata(); + return std::make_pair(newObjects, totalObjects); } @@ -222,6 +278,32 @@ size_t AwsLevel2DataProvider::Refresh() return totalNewObjects; } +void AwsLevel2DataProviderImpl::UpdateMetadata() +{ + std::shared_lock lock(objectsMutex_); + + if (!objects_.empty()) + { + lastModified_ = objects_.crbegin()->second.lastModified_; + } + + if (objects_.size() >= 2) + { + auto it = objects_.crbegin(); + auto lastModified = it->second.lastModified_; + auto prevModified = (++it)->second.lastModified_; + auto delta = lastModified - prevModified; + + updatePeriod_ = std::chrono::duration_cast(delta); + } +} + +std::chrono::system_clock::time_point +AwsLevel2DataProvider::GetTimePointByKey(const std::string& key) const +{ + return GetTimePointFromKey(key); +} + std::chrono::system_clock::time_point AwsLevel2DataProvider::GetTimePointFromKey(const std::string& key) {