From 9570dcf20e26bfd2db576ff871f9a2121684cd23 Mon Sep 17 00:00:00 2001 From: AdenKoperczak Date: Thu, 27 Mar 2025 11:22:44 -0400 Subject: [PATCH] Initial working level2 chunks data provider --- .../aws_level2_chunks_data_provider.hpp | 62 +++ .../aws_level2_chunks_data_provider.cpp | 490 ++++++++++++++++++ wxdata/wxdata.cmake | 2 + 3 files changed, 554 insertions(+) create mode 100644 wxdata/include/scwx/provider/aws_level2_chunks_data_provider.hpp create mode 100644 wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp diff --git a/wxdata/include/scwx/provider/aws_level2_chunks_data_provider.hpp b/wxdata/include/scwx/provider/aws_level2_chunks_data_provider.hpp new file mode 100644 index 00000000..247ff346 --- /dev/null +++ b/wxdata/include/scwx/provider/aws_level2_chunks_data_provider.hpp @@ -0,0 +1,62 @@ +#pragma once + +#include + +namespace Aws::S3 +{ +class S3Client; +} // namespace Aws::S3 + +namespace scwx::provider +{ + +/** + * @brief AWS Level 2 Data Provider + */ +class AwsLevel2ChunksDataProvider : public NexradDataProvider +{ +public: + explicit AwsLevel2ChunksDataProvider(const std::string& radarSite); + explicit AwsLevel2ChunksDataProvider(const std::string& radarSite, + const std::string& bucketName, + const std::string& region); + ~AwsLevel2ChunksDataProvider() override; + + AwsLevel2ChunksDataProvider(const AwsLevel2ChunksDataProvider&) = delete; + AwsLevel2ChunksDataProvider& operator=(const AwsLevel2ChunksDataProvider&) = delete; + + AwsLevel2ChunksDataProvider(AwsLevel2ChunksDataProvider&&) noexcept; + AwsLevel2ChunksDataProvider& operator=(AwsLevel2ChunksDataProvider&&) noexcept; + + [[nodiscard]] std::chrono::system_clock::time_point + GetTimePointByKey(const std::string& key) const override; + + [[nodiscard]] size_t cache_size() const override; + + [[nodiscard]] std::chrono::system_clock::time_point + last_modified() const override; + [[nodiscard]] std::chrono::seconds update_period() const override; + + std::string FindKey(std::chrono::system_clock::time_point time) override; + std::string FindLatestKey() override; + std::chrono::system_clock::time_point FindLatestTime() override; + std::vector + GetTimePointsByDate(std::chrono::system_clock::time_point date) override; + std::tuple + ListObjects(std::chrono::system_clock::time_point date) override; + std::shared_ptr + LoadObjectByKey(const std::string& key) override; + std::shared_ptr + LoadObjectByTime(std::chrono::system_clock::time_point time) override; + std::shared_ptr LoadLatestObject() override; + std::pair Refresh() override; + + void RequestAvailableProducts() override; + std::vector GetAvailableProducts() override; + +private: + class Impl; + std::unique_ptr p; +}; + +} // namespace scwx::provider diff --git a/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp b/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp new file mode 100644 index 00000000..4d641d8a --- /dev/null +++ b/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp @@ -0,0 +1,490 @@ +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +#if (__cpp_lib_chrono < 201907L) +# include +#endif + +namespace scwx::provider +{ + +static const std::string logPrefix_ = + "scwx::provider::aws_level2_chunks_data_provider"; +static const auto logger_ = util::Logger::Create(logPrefix_); + +static const std::string kDefaultBucketName_ = "unidata-nexrad-level2-chunks"; +static const std::string kDefaultRegion_ = "us-east-1"; + +class AwsLevel2ChunksDataProvider::Impl +{ +public: + struct ScanRecord + { + explicit ScanRecord(std::string prefix) : + prefix_ {std::move(prefix)}, + nexradFile_ {}, + lastModified_ {}, + secondLastModified_ {} + { + } + ~ScanRecord() = default; + ScanRecord(const ScanRecord&) = default; + ScanRecord(ScanRecord&&) = default; + ScanRecord& operator=(const ScanRecord&) = default; + ScanRecord& operator=(ScanRecord&&) = default; + + std::string prefix_; + std::shared_ptr nexradFile_; + std::chrono::system_clock::time_point lastModified_; + std::chrono::system_clock::time_point secondLastModified_; + int nextFile_{1}; + bool hasAllFiles_{false}; + }; + + explicit Impl(AwsLevel2ChunksDataProvider* self, + std::string radarSite, + std::string bucketName, + std::string region) : + radarSite_ {std::move(radarSite)}, + bucketName_ {std::move(bucketName)}, + region_ {std::move(region)}, + client_ {nullptr}, + scans_ {}, + scansMutex_ {}, + lastModified_ {}, + updatePeriod_ {}, + self_ {self} + { + // Disable HTTP request for region + util::SetEnvironment("AWS_EC2_METADATA_DISABLED", "true"); + + // Use anonymous credentials + Aws::Auth::AWSCredentials credentials {}; + + Aws::Client::ClientConfiguration config; + config.region = region_; + config.connectTimeoutMs = 10000; + + client_ = std::make_shared( + credentials, + Aws::MakeShared( + Aws::S3::S3Client::GetAllocationTag()), + config); + } + ~Impl() = default; + Impl(const Impl&) = delete; + Impl(Impl&&) = delete; + Impl& operator=(const Impl&) = delete; + Impl& operator=(Impl&&) = delete; + + std::chrono::system_clock::time_point GetScanTime(const std::string& prefix); + std::string GetScanKey(const std::string& prefix, + const std::chrono::system_clock::time_point& time, + int last); + std::shared_ptr LoadScan(Impl::ScanRecord& scanRecord); + + std::string radarSite_; + std::string bucketName_; + std::string region_; + std::shared_ptr client_; + + std::mutex refreshMutex_; + + std::map scans_; + std::shared_mutex scansMutex_; + + std::chrono::system_clock::time_point lastModified_; + std::chrono::seconds updatePeriod_; + + AwsLevel2ChunksDataProvider* self_; + }; + +AwsLevel2ChunksDataProvider::AwsLevel2ChunksDataProvider( + const std::string& radarSite) : + AwsLevel2ChunksDataProvider(radarSite, kDefaultBucketName_, kDefaultRegion_) +{ +} + +AwsLevel2ChunksDataProvider::AwsLevel2ChunksDataProvider( + const std::string& radarSite, + const std::string& bucketName, + const std::string& region) : + p(std::make_unique(this, radarSite, bucketName, region)) +{ +} + +AwsLevel2ChunksDataProvider::~AwsLevel2ChunksDataProvider() = default; + +std::chrono::system_clock::time_point +AwsLevel2ChunksDataProvider::GetTimePointByKey(const std::string& key) const +{ + std::chrono::system_clock::time_point time {}; + + const size_t lastSeparator = key.rfind('/'); + const size_t offset = + (lastSeparator == std::string::npos) ? 0 : lastSeparator + 1; + + // Filename format is YYYYMMDD-TTTTTT-AAA-B + static const size_t formatSize = std::string("YYYYMMDD-TTTTTT").size(); + + if (key.size() >= offset + formatSize) + { + using namespace std::chrono; + +#if (__cpp_lib_chrono < 201907L) + using namespace date; +#endif + + static const std::string timeFormat {"%Y%m%d-%H%M%S"}; + + std::string timeStr {key.substr(offset, formatSize)}; + std::istringstream in {timeStr}; + in >> parse(timeFormat, time); + + if (in.fail()) + { + logger_->warn("Invalid time: \"{}\"", timeStr); + } + } + else + { + logger_->warn("Time not parsable from key: \"{}\"", key); + } + + return time; +} + +size_t AwsLevel2ChunksDataProvider::cache_size() const +{ + return p->scans_.size(); +} + +std::chrono::system_clock::time_point +AwsLevel2ChunksDataProvider::last_modified() const +{ + return p->lastModified_; +} +std::chrono::seconds AwsLevel2ChunksDataProvider::update_period() const +{ + return p->updatePeriod_; +} + +std::string +AwsLevel2ChunksDataProvider::FindKey(std::chrono::system_clock::time_point time) +{ + logger_->debug("FindKey: {}", util::TimeString(time)); + + std::shared_lock lock(p->scansMutex_); + + auto element = util::GetBoundedElement(p->scans_, time); + + if (element.has_value()) + { + return element->prefix_; + } + + return {}; +} + +std::string AwsLevel2ChunksDataProvider::FindLatestKey() +{ + std::shared_lock lock(p->scansMutex_); + if (p->scans_.empty()) + { + return ""; + } + + return p->scans_.crbegin()->second.prefix_; +} + +std::chrono::system_clock::time_point +AwsLevel2ChunksDataProvider::FindLatestTime() +{ + std::shared_lock lock(p->scansMutex_); + if (p->scans_.empty()) + { + return {}; + } + + return p->scans_.crbegin()->first; +} + +std::vector +AwsLevel2ChunksDataProvider::GetTimePointsByDate( + std::chrono::system_clock::time_point /*date*/) +{ + return {}; +} + +std::chrono::system_clock::time_point +AwsLevel2ChunksDataProvider::Impl::GetScanTime(const std::string& prefix) +{ + Aws::S3::Model::ListObjectsV2Request request; + request.SetBucket(bucketName_); + request.SetPrefix(prefix); + request.SetDelimiter("/"); + request.SetMaxKeys(1); + + auto outcome = client_->ListObjectsV2(request); + if (outcome.IsSuccess()) + { + return self_->GetTimePointByKey( + outcome.GetResult().GetContents().at(0).GetKey()); + } + + return {}; +} + +std::string AwsLevel2ChunksDataProvider::Impl::GetScanKey( + const std::string& prefix, + const std::chrono::system_clock::time_point& time, + int last) +{ + + static const std::string timeFormat {"%Y%m%d-%H%M%S"}; + + //TODO + return fmt::format( + "{0}/{1:%Y%m%d-%H%M%S}-{2}", prefix, fmt::gmtime(time), last - 1); +} + +std::tuple +AwsLevel2ChunksDataProvider::ListObjects(std::chrono::system_clock::time_point) +{ + // TODO this is slow. It could probably be speed up by not reloading every + // scan every time. + const std::string prefix = p->radarSite_ + "/"; + + logger_->debug("ListObjects: {}", prefix); + + Aws::S3::Model::ListObjectsV2Request request; + request.SetBucket(p->bucketName_); + request.SetPrefix(prefix); + request.SetDelimiter("/"); + + auto outcome = p->client_->ListObjectsV2(request); + + size_t newObjects = 0; + size_t totalObjects = 0; + + if (outcome.IsSuccess()) + { + auto& scans = outcome.GetResult().GetCommonPrefixes(); + logger_->debug("Found {} scans", scans.size()); + + for (const auto& scan : scans) + { + const std::string& prefix = scan.GetPrefix(); + + auto time = p->GetScanTime(prefix); + + if (!p->scans_.contains(time)) + { + p->scans_.insert_or_assign(time, Impl::ScanRecord {prefix}); + newObjects++; + } + + totalObjects++; + } + } + + return {outcome.IsSuccess(), newObjects, totalObjects}; +} + +std::shared_ptr +AwsLevel2ChunksDataProvider::LoadObjectByKey(const std::string& /*prefix*/) +{ + return nullptr; +} + +std::shared_ptr +AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord) +{ + if (scanRecord.hasAllFiles_) + { + return scanRecord.nexradFile_; + } + + // TODO can get only new records using scanRecords last + Aws::S3::Model::ListObjectsV2Request listRequest; + listRequest.SetBucket(bucketName_); + listRequest.SetPrefix(scanRecord.prefix_); + listRequest.SetDelimiter("/"); + + auto listOutcome = client_->ListObjectsV2(listRequest); + if (!listOutcome.IsSuccess()) + { + logger_->warn("Could not find scan at {}", scanRecord.prefix_); + return nullptr; + } + + auto& chunks = listOutcome.GetResult().GetContents(); + for (const auto& chunk : chunks) + { + const std::string& key = chunk.GetKey(); + + // We just want the number of this chunk for now + // KIND/585/20250324-134727-001-S + constexpr size_t startNumberPos = + std::string("KIND/585/20250324-134727-").size(); + const std::string& keyNumberStr = key.substr(startNumberPos, 3); + const int keyNumber = std::stoi(keyNumberStr); + if (keyNumber != scanRecord.nextFile_) + { + continue; + } + + // Now we want the ending char + // KIND/585/20250324-134727-001-S + constexpr size_t charPos = + std::string("KIND/585/20250324-134727-001-").size(); + const char keyChar = key[charPos]; + + Aws::S3::Model::GetObjectRequest objectRequest; + objectRequest.SetBucket(bucketName_); + objectRequest.SetKey(key); + + auto outcome = client_->GetObject(objectRequest); + + if (!outcome.IsSuccess()) + { + logger_->warn("Could not get object: {}", + outcome.GetError().GetMessage()); + return nullptr; + } + + auto& body = outcome.GetResultWithOwnership().GetBody(); + + switch (keyChar) { + case 'S': + { // First chunk + scanRecord.nexradFile_ = std::make_shared(); + if (!scanRecord.nexradFile_->LoadData(body)) + { + logger_->warn("Failed to load first chunk"); + return nullptr; + } + break; + } + case 'I': + { // Middle chunk + if (!scanRecord.nexradFile_->LoadLDMRecords(body)) + { + logger_->warn("Failed to load middle chunk"); + return nullptr; + } + break; + } + case 'E': + { // Last chunk + if (!scanRecord.nexradFile_->LoadLDMRecords(body)) + { + logger_->warn("Failed to load last chunk"); + return nullptr; + } + scanRecord.hasAllFiles_ = true; + break; + } + default: + return nullptr; + } + + std::chrono::seconds lastModifiedSeconds { + outcome.GetResult().GetLastModified().Seconds()}; + std::chrono::system_clock::time_point lastModified { + lastModifiedSeconds}; + + scanRecord.secondLastModified_ = scanRecord.lastModified_; + scanRecord.lastModified_ = lastModified; + + scanRecord.nextFile_ += 1; + } + scanRecord.nexradFile_->IndexFile(); + + if (!scans_.empty()) + { + auto& lastScan = scans_.crend()->second; + lastModified_ = lastScan.lastModified_; + if (lastScan.secondLastModified_ != + std::chrono::system_clock::time_point()) + { + auto delta = lastScan.lastModified_ - lastScan.secondLastModified_; + updatePeriod_ = + std::chrono::duration_cast(delta); + } + } + + return scanRecord.nexradFile_; +} + +std::shared_ptr +AwsLevel2ChunksDataProvider::LoadObjectByTime( + std::chrono::system_clock::time_point time) +{ + std::shared_lock lock(p->scansMutex_); + + logger_->error("LoadObjectByTime({})", time); + + auto scanRecord = util::GetBoundedElementPointer(p->scans_, time); + if (scanRecord == nullptr) + { + logger_->warn("Could not find object at time {}", time); + return nullptr; + } + + // The scanRecord must be a reference + return p->LoadScan(p->scans_.at(scanRecord->first)); +} + +std::shared_ptr +AwsLevel2ChunksDataProvider::LoadLatestObject() +{ + return LoadObjectByTime(FindLatestTime()); +} + +std::pair AwsLevel2ChunksDataProvider::Refresh() +{ + using namespace std::chrono; + + std::unique_lock lock(p->refreshMutex_); + + auto [success, newObjects, totalObjects] = ListObjects({}); + + for (auto& scanRecord : p->scans_) + { + if (scanRecord.second.nexradFile_ != nullptr) + { + p->LoadScan(scanRecord.second); + newObjects += 1; + } + } + + return std::make_pair(newObjects, totalObjects); +} + +void AwsLevel2ChunksDataProvider::RequestAvailableProducts() {} +std::vector AwsLevel2ChunksDataProvider::GetAvailableProducts() +{ + return {}; +} + +AwsLevel2ChunksDataProvider::AwsLevel2ChunksDataProvider( + AwsLevel2ChunksDataProvider&&) noexcept = default; +AwsLevel2ChunksDataProvider& AwsLevel2ChunksDataProvider::operator=( + AwsLevel2ChunksDataProvider&&) noexcept = default; + +} // namespace scwx::provider diff --git a/wxdata/wxdata.cmake b/wxdata/wxdata.cmake index 8d2e15b4..2c062f4b 100644 --- a/wxdata/wxdata.cmake +++ b/wxdata/wxdata.cmake @@ -60,6 +60,7 @@ set(HDR_NETWORK include/scwx/network/cpr.hpp set(SRC_NETWORK source/scwx/network/cpr.cpp source/scwx/network/dir_list.cpp) set(HDR_PROVIDER include/scwx/provider/aws_level2_data_provider.hpp + include/scwx/provider/aws_level2_chunks_data_provider.hpp include/scwx/provider/aws_level3_data_provider.hpp include/scwx/provider/aws_nexrad_data_provider.hpp include/scwx/provider/iem_api_provider.hpp @@ -68,6 +69,7 @@ set(HDR_PROVIDER include/scwx/provider/aws_level2_data_provider.hpp include/scwx/provider/nexrad_data_provider_factory.hpp include/scwx/provider/warnings_provider.hpp) set(SRC_PROVIDER source/scwx/provider/aws_level2_data_provider.cpp + source/scwx/provider/aws_level2_chunks_data_provider.cpp source/scwx/provider/aws_level3_data_provider.cpp source/scwx/provider/aws_nexrad_data_provider.cpp source/scwx/provider/iem_api_provider.cpp