#include #include #include #include #include #include #include #include #include #include #include namespace scwx { namespace provider { static const std::string logPrefix_ = "scwx::provider::aws_level2_data_provider"; static const auto logger_ = util::Logger::Create(logPrefix_); static const std::string kDefaultBucketName_ = "noaa-nexrad-level2"; static const std::string kDefaultRegion_ = "us-east-1"; class AwsLevel2DataProviderImpl { public: explicit AwsLevel2DataProviderImpl(const std::string& radarSite, const std::string& bucketName, const std::string& region) : radarSite_ {radarSite}, bucketName_ {bucketName}, region_ {region}, client_ {nullptr}, objects_ {}, objectsMutex_ {} { Aws::Client::ClientConfiguration config; config.region = region_; client_ = std::make_unique(config); } ~AwsLevel2DataProviderImpl() {} std::string radarSite_; std::string bucketName_; std::string region_; std::unique_ptr client_; std::map objects_; std::shared_mutex objectsMutex_; }; AwsLevel2DataProvider::AwsLevel2DataProvider(const std::string& radarSite) : AwsLevel2DataProvider(radarSite, kDefaultBucketName_, kDefaultRegion_) { } AwsLevel2DataProvider::AwsLevel2DataProvider(const std::string& radarSite, const std::string& bucketName, const std::string& region) : p(std::make_unique( radarSite, bucketName, region)) { } AwsLevel2DataProvider::~AwsLevel2DataProvider() = default; AwsLevel2DataProvider::AwsLevel2DataProvider(AwsLevel2DataProvider&&) noexcept = default; AwsLevel2DataProvider& AwsLevel2DataProvider::operator=(AwsLevel2DataProvider&&) noexcept = default; size_t AwsLevel2DataProvider::cache_size() const { return p->objects_.size(); } std::string AwsLevel2DataProvider::FindKey(std::chrono::system_clock::time_point time) { logger_->debug("FindKey: {}", util::TimeString(time)); std::string key {}; std::optional element = util::GetBoundedElement(p->objects_, time); if (element.has_value()) { key = *element; } return key; } size_t AwsLevel2DataProvider::ListObjects(std::chrono::system_clock::time_point date) { const std::string prefix = fmt::format("{0:%Y/%m/%d}/{1}/", fmt::gmtime(date), p->radarSite_); logger_->debug("ListObjects: {}", prefix); Aws::S3::Model::ListObjectsV2Request request; request.SetBucket(p->bucketName_); request.SetPrefix(prefix); auto outcome = p->client_->ListObjectsV2(request); size_t totalObjects = 0; if (outcome.IsSuccess()) { auto& objects = outcome.GetResult().GetContents(); logger_->debug("Found {} objects", objects.size()); // Store objects std::for_each(objects.cbegin(), objects.cend(), [&](const Aws::S3::Model::Object& object) { std::string key = object.GetKey(); if (!key.ends_with("_MDM")) { auto time = GetTimePointFromKey(key); std::unique_lock lock(p->objectsMutex_); p->objects_[time] = key; totalObjects++; } }); } else { logger_->warn("Could not list objects: {}", outcome.GetError().GetMessage()); } return totalObjects; } std::shared_ptr AwsLevel2DataProvider::LoadObjectByKey(const std::string& key) { std::shared_ptr level2File = nullptr; Aws::S3::Model::GetObjectRequest request; request.SetBucket(p->bucketName_); request.SetKey(key); auto outcome = p->client_->GetObject(request); if (outcome.IsSuccess()) { auto& body = outcome.GetResultWithOwnership().GetBody(); std::shared_ptr nexradFile = wsr88d::NexradFileFactory::Create(body); level2File = std::dynamic_pointer_cast(nexradFile); } else { logger_->warn("Could not get object: {}", outcome.GetError().GetMessage()); } return level2File; } void AwsLevel2DataProvider::Refresh() { using namespace std::chrono; logger_->debug("Refresh()"); static std::mutex refreshMutex; static system_clock::time_point refreshDate {}; auto today = floor(system_clock::now()); auto yesterday = today - days {1}; std::unique_lock lock(refreshMutex); size_t objectCount; // If we haven't gotten any objects from today, first list objects for // yesterday, to ensure we haven't missed any objects near midnight if (refreshDate < today) { objectCount = ListObjects(yesterday); if (objectCount > 0) { refreshDate = yesterday; } } objectCount = ListObjects(today); if (objectCount > 0) { refreshDate = today; } } std::chrono::system_clock::time_point AwsLevel2DataProvider::GetTimePointFromKey(const std::string& key) { std::chrono::system_clock::time_point time {}; const size_t lastSeparator = key.rfind('/'); const size_t offset = (lastSeparator == std::string::npos) ? 0 : lastSeparator + 5; // Filename format is GGGGYYYYMMDD_TTTTTT(_V##).gz static constexpr size_t formatSize = std::string("YYYYMMDD_TTTTTT").size(); if (key.size() >= offset + formatSize) { using namespace std::chrono; static const std::string timeFormat {"%Y%m%d_%H%M%S"}; std::string timeStr {key.substr(offset, formatSize)}; std::istringstream in {timeStr}; in >> parse(timeFormat, time); if (in.fail()) { logger_->warn("Invalid time: \"{}\"", timeStr); } } else { logger_->warn("Time not parsable from key: \"{}\"", key); } return time; } } // namespace provider } // namespace scwx