mirror of
https://github.com/ciphervance/supercell-wx.git
synced 2025-10-30 20:50:06 +00:00
Refactoring AWS level 2 data provider, pushing up common functions
This commit is contained in:
parent
394aba2d83
commit
c5c54fbfa2
13 changed files with 512 additions and 439 deletions
|
|
@ -1,14 +1,7 @@
|
|||
#include <scwx/provider/aws_level2_data_provider.hpp>
|
||||
#include <scwx/util/logger.hpp>
|
||||
#include <scwx/util/map.hpp>
|
||||
#include <scwx/util/time.hpp>
|
||||
#include <scwx/wsr88d/nexrad_file_factory.hpp>
|
||||
|
||||
#include <shared_mutex>
|
||||
|
||||
#include <aws/s3/S3Client.h>
|
||||
#include <aws/s3/model/GetObjectRequest.h>
|
||||
#include <aws/s3/model/ListObjectsV2Request.h>
|
||||
#include <fmt/chrono.h>
|
||||
#include <fmt/format.h>
|
||||
|
||||
|
|
@ -24,63 +17,14 @@ static const auto logger_ = util::Logger::Create(logPrefix_);
|
|||
static const std::string kDefaultBucketName_ = "noaa-nexrad-level2";
|
||||
static const std::string kDefaultRegion_ = "us-east-1";
|
||||
|
||||
// Keep at least today, yesterday, and one more date
|
||||
static const size_t kMinDatesBeforePruning_ = 4;
|
||||
static const size_t kMaxObjects_ = 2500;
|
||||
|
||||
class AwsLevel2DataProviderImpl
|
||||
class AwsLevel2DataProvider::Impl
|
||||
{
|
||||
public:
|
||||
struct ObjectRecord
|
||||
{
|
||||
explicit ObjectRecord(
|
||||
const std::string& key,
|
||||
std::chrono::system_clock::time_point lastModified) :
|
||||
key_ {key}, lastModified_ {lastModified}
|
||||
{
|
||||
}
|
||||
~ObjectRecord() = default;
|
||||
explicit Impl(const std::string& radarSite) : radarSite_ {radarSite} {}
|
||||
|
||||
std::string key_;
|
||||
std::chrono::system_clock::time_point lastModified_;
|
||||
};
|
||||
|
||||
explicit AwsLevel2DataProviderImpl(const std::string& radarSite,
|
||||
const std::string& bucketName,
|
||||
const std::string& region) :
|
||||
radarSite_ {radarSite},
|
||||
bucketName_ {bucketName},
|
||||
region_ {region},
|
||||
client_ {nullptr},
|
||||
objects_ {},
|
||||
objectsMutex_ {},
|
||||
lastModified_ {},
|
||||
updatePeriod_ {}
|
||||
{
|
||||
Aws::Client::ClientConfiguration config;
|
||||
config.region = region_;
|
||||
|
||||
client_ = std::make_unique<Aws::S3::S3Client>(config);
|
||||
}
|
||||
|
||||
~AwsLevel2DataProviderImpl() {}
|
||||
|
||||
void PruneObjects();
|
||||
void UpdateMetadata();
|
||||
void UpdateObjectDates(std::chrono::system_clock::time_point date);
|
||||
~Impl() {}
|
||||
|
||||
std::string radarSite_;
|
||||
std::string bucketName_;
|
||||
std::string region_;
|
||||
|
||||
std::unique_ptr<Aws::S3::S3Client> client_;
|
||||
|
||||
std::map<std::chrono::system_clock::time_point, ObjectRecord> objects_;
|
||||
std::shared_mutex objectsMutex_;
|
||||
std::list<std::chrono::system_clock::time_point> objectDates_;
|
||||
|
||||
std::chrono::system_clock::time_point lastModified_;
|
||||
std::chrono::seconds updatePeriod_;
|
||||
};
|
||||
|
||||
AwsLevel2DataProvider::AwsLevel2DataProvider(const std::string& radarSite) :
|
||||
|
|
@ -90,8 +34,8 @@ AwsLevel2DataProvider::AwsLevel2DataProvider(const std::string& radarSite) :
|
|||
AwsLevel2DataProvider::AwsLevel2DataProvider(const std::string& radarSite,
|
||||
const std::string& bucketName,
|
||||
const std::string& region) :
|
||||
p(std::make_unique<AwsLevel2DataProviderImpl>(
|
||||
radarSite, bucketName, region))
|
||||
AwsNexradDataProvider(radarSite, bucketName, region),
|
||||
p(std::make_unique<Impl>(radarSite))
|
||||
{
|
||||
}
|
||||
AwsLevel2DataProvider::~AwsLevel2DataProvider() = default;
|
||||
|
|
@ -101,255 +45,10 @@ AwsLevel2DataProvider::AwsLevel2DataProvider(AwsLevel2DataProvider&&) noexcept =
|
|||
AwsLevel2DataProvider&
|
||||
AwsLevel2DataProvider::operator=(AwsLevel2DataProvider&&) noexcept = default;
|
||||
|
||||
size_t AwsLevel2DataProvider::cache_size() const
|
||||
{
|
||||
return p->objects_.size();
|
||||
}
|
||||
|
||||
std::chrono::seconds AwsLevel2DataProvider::update_period() const
|
||||
{
|
||||
return p->updatePeriod_;
|
||||
}
|
||||
|
||||
std::chrono::system_clock::time_point
|
||||
AwsLevel2DataProvider::last_modified() const
|
||||
{
|
||||
return p->lastModified_;
|
||||
}
|
||||
|
||||
std::string
|
||||
AwsLevel2DataProvider::FindKey(std::chrono::system_clock::time_point time)
|
||||
AwsLevel2DataProvider::GetPrefix(std::chrono::system_clock::time_point date)
|
||||
{
|
||||
logger_->debug("FindKey: {}", util::TimeString(time));
|
||||
|
||||
std::string key {};
|
||||
|
||||
std::shared_lock lock(p->objectsMutex_);
|
||||
|
||||
auto element = util::GetBoundedElement(p->objects_, time);
|
||||
|
||||
if (element.has_value())
|
||||
{
|
||||
key = element->key_;
|
||||
}
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
std::string AwsLevel2DataProvider::FindLatestKey()
|
||||
{
|
||||
logger_->debug("FindLatestKey()");
|
||||
|
||||
std::string key {};
|
||||
|
||||
std::shared_lock lock(p->objectsMutex_);
|
||||
|
||||
if (!p->objects_.empty())
|
||||
{
|
||||
key = p->objects_.crbegin()->second.key_;
|
||||
}
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
std::pair<size_t, size_t>
|
||||
AwsLevel2DataProvider::ListObjects(std::chrono::system_clock::time_point date)
|
||||
{
|
||||
const std::string prefix =
|
||||
fmt::format("{0:%Y/%m/%d}/{1}/", fmt::gmtime(date), p->radarSite_);
|
||||
|
||||
logger_->debug("ListObjects: {}", prefix);
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
request.SetBucket(p->bucketName_);
|
||||
request.SetPrefix(prefix);
|
||||
|
||||
auto outcome = p->client_->ListObjectsV2(request);
|
||||
|
||||
size_t newObjects = 0;
|
||||
size_t totalObjects = 0;
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
auto& objects = outcome.GetResult().GetContents();
|
||||
|
||||
logger_->debug("Found {} objects", objects.size());
|
||||
|
||||
// Store objects
|
||||
std::for_each( //
|
||||
objects.cbegin(),
|
||||
objects.cend(),
|
||||
[&](const Aws::S3::Model::Object& object)
|
||||
{
|
||||
std::string key = object.GetKey();
|
||||
|
||||
if (!key.ends_with("_MDM"))
|
||||
{
|
||||
auto time = GetTimePointFromKey(key);
|
||||
|
||||
std::chrono::seconds lastModifiedSeconds {
|
||||
object.GetLastModified().Seconds()};
|
||||
std::chrono::system_clock::time_point lastModified {
|
||||
lastModifiedSeconds};
|
||||
|
||||
std::unique_lock lock(p->objectsMutex_);
|
||||
|
||||
auto [it, inserted] = p->objects_.insert_or_assign(
|
||||
time,
|
||||
AwsLevel2DataProviderImpl::ObjectRecord {key, lastModified});
|
||||
|
||||
if (inserted)
|
||||
{
|
||||
newObjects++;
|
||||
}
|
||||
|
||||
totalObjects++;
|
||||
}
|
||||
});
|
||||
|
||||
if (newObjects > 0)
|
||||
{
|
||||
p->UpdateObjectDates(date);
|
||||
p->PruneObjects();
|
||||
p->UpdateMetadata();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
logger_->warn("Could not list objects: {}",
|
||||
outcome.GetError().GetMessage());
|
||||
}
|
||||
|
||||
return std::make_pair(newObjects, totalObjects);
|
||||
}
|
||||
|
||||
std::shared_ptr<wsr88d::Ar2vFile>
|
||||
AwsLevel2DataProvider::LoadObjectByKey(const std::string& key)
|
||||
{
|
||||
std::shared_ptr<wsr88d::Ar2vFile> level2File = nullptr;
|
||||
|
||||
Aws::S3::Model::GetObjectRequest request;
|
||||
request.SetBucket(p->bucketName_);
|
||||
request.SetKey(key);
|
||||
|
||||
auto outcome = p->client_->GetObject(request);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
auto& body = outcome.GetResultWithOwnership().GetBody();
|
||||
|
||||
std::shared_ptr<wsr88d::NexradFile> nexradFile =
|
||||
wsr88d::NexradFileFactory::Create(body);
|
||||
|
||||
level2File = std::dynamic_pointer_cast<wsr88d::Ar2vFile>(nexradFile);
|
||||
}
|
||||
else
|
||||
{
|
||||
logger_->warn("Could not get object: {}",
|
||||
outcome.GetError().GetMessage());
|
||||
}
|
||||
|
||||
return level2File;
|
||||
}
|
||||
|
||||
size_t AwsLevel2DataProvider::Refresh()
|
||||
{
|
||||
using namespace std::chrono;
|
||||
|
||||
logger_->debug("Refresh()");
|
||||
|
||||
static std::mutex refreshMutex;
|
||||
static system_clock::time_point refreshDate {};
|
||||
|
||||
auto today = floor<days>(system_clock::now());
|
||||
auto yesterday = today - days {1};
|
||||
|
||||
std::unique_lock lock(refreshMutex);
|
||||
|
||||
size_t totalNewObjects = 0;
|
||||
|
||||
// If we haven't gotten any objects from today, first list objects for
|
||||
// yesterday, to ensure we haven't missed any objects near midnight
|
||||
if (refreshDate < today)
|
||||
{
|
||||
auto [newObjects, totalObjects] = ListObjects(yesterday);
|
||||
totalNewObjects = newObjects;
|
||||
if (totalObjects > 0)
|
||||
{
|
||||
refreshDate = yesterday;
|
||||
}
|
||||
}
|
||||
|
||||
auto [newObjects, totalObjects] = ListObjects(today);
|
||||
totalNewObjects += newObjects;
|
||||
if (totalObjects > 0)
|
||||
{
|
||||
refreshDate = today;
|
||||
}
|
||||
|
||||
return totalNewObjects;
|
||||
}
|
||||
|
||||
void AwsLevel2DataProviderImpl::PruneObjects()
|
||||
{
|
||||
using namespace std::chrono;
|
||||
|
||||
auto today = floor<days>(system_clock::now());
|
||||
auto yesterday = today - days {1};
|
||||
|
||||
std::unique_lock lock(objectsMutex_);
|
||||
|
||||
for (auto it = objectDates_.cbegin();
|
||||
it != objectDates_.cend() && objects_.size() > kMaxObjects_ &&
|
||||
objectDates_.size() >= kMinDatesBeforePruning_;)
|
||||
{
|
||||
if (*it < yesterday)
|
||||
{
|
||||
// Erase oldest keys from objects list
|
||||
auto eraseBegin = objects_.lower_bound(*it);
|
||||
auto eraseEnd = objects_.lower_bound(*it + days {1});
|
||||
objects_.erase(eraseBegin, eraseEnd);
|
||||
|
||||
// Remove oldest date from object dates list
|
||||
it = objectDates_.erase(it);
|
||||
}
|
||||
else
|
||||
{
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void AwsLevel2DataProviderImpl::UpdateMetadata()
|
||||
{
|
||||
std::shared_lock lock(objectsMutex_);
|
||||
|
||||
if (!objects_.empty())
|
||||
{
|
||||
lastModified_ = objects_.crbegin()->second.lastModified_;
|
||||
}
|
||||
|
||||
if (objects_.size() >= 2)
|
||||
{
|
||||
auto it = objects_.crbegin();
|
||||
auto lastModified = it->second.lastModified_;
|
||||
auto prevModified = (++it)->second.lastModified_;
|
||||
auto delta = lastModified - prevModified;
|
||||
|
||||
updatePeriod_ = std::chrono::duration_cast<std::chrono::seconds>(delta);
|
||||
}
|
||||
}
|
||||
|
||||
void AwsLevel2DataProviderImpl::UpdateObjectDates(
|
||||
std::chrono::system_clock::time_point date)
|
||||
{
|
||||
auto day = std::chrono::floor<std::chrono::days>(date);
|
||||
|
||||
std::unique_lock lock(objectsMutex_);
|
||||
|
||||
// Remove any existing occurrences of day, and add to the back of the list
|
||||
objectDates_.remove(day);
|
||||
objectDates_.push_back(day);
|
||||
return fmt::format("{0:%Y/%m/%d}/{1}/", fmt::gmtime(date), p->radarSite_);
|
||||
}
|
||||
|
||||
std::chrono::system_clock::time_point
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue