Refresh AWS level 2 data over multiple days

This commit is contained in:
Dan Paulat 2022-05-22 07:59:25 -05:00
parent 690f3f6216
commit 80310029e5
3 changed files with 54 additions and 12 deletions

View file

@ -24,7 +24,7 @@ TEST(AwsLevel2DataProvider, Refresh)
provider.Refresh(); provider.Refresh();
// TODO: Check object count EXPECT_GT(provider.cache_size(), 0);
} }
TEST(AwsLevel2DataProvider, TimePointValid) TEST(AwsLevel2DataProvider, TimePointValid)

View file

@ -28,7 +28,9 @@ public:
AwsLevel2DataProvider(AwsLevel2DataProvider&&) noexcept; AwsLevel2DataProvider(AwsLevel2DataProvider&&) noexcept;
AwsLevel2DataProvider& operator=(AwsLevel2DataProvider&&) noexcept; AwsLevel2DataProvider& operator=(AwsLevel2DataProvider&&) noexcept;
void ListObjects(std::chrono::system_clock::time_point date); size_t cache_size() const;
size_t ListObjects(std::chrono::system_clock::time_point date);
std::shared_ptr<wsr88d::Ar2vFile> LoadObjectByKey(const std::string& key); std::shared_ptr<wsr88d::Ar2vFile> LoadObjectByKey(const std::string& key);
void Refresh(); void Refresh();

View file

@ -71,11 +71,16 @@ AwsLevel2DataProvider::AwsLevel2DataProvider(AwsLevel2DataProvider&&) noexcept =
AwsLevel2DataProvider& AwsLevel2DataProvider&
AwsLevel2DataProvider::operator=(AwsLevel2DataProvider&&) noexcept = default; AwsLevel2DataProvider::operator=(AwsLevel2DataProvider&&) noexcept = default;
void AwsLevel2DataProvider::ListObjects( size_t AwsLevel2DataProvider::cache_size() const
std::chrono::system_clock::time_point date) {
return p->objects_.size();
}
size_t
AwsLevel2DataProvider::ListObjects(std::chrono::system_clock::time_point date)
{ {
const std::string prefix = const std::string prefix =
fmt::format("{0:%Y/%m/%d}/{1}/", date, p->radarSite_); fmt::format("{0:%Y/%m/%d}/{1}/", fmt::gmtime(date), p->radarSite_);
logger_->debug("ListObjects: {}", prefix); logger_->debug("ListObjects: {}", prefix);
@ -85,6 +90,8 @@ void AwsLevel2DataProvider::ListObjects(
auto outcome = p->client_->ListObjectsV2(request); auto outcome = p->client_->ListObjectsV2(request);
size_t totalObjects = 0;
if (outcome.IsSuccess()) if (outcome.IsSuccess())
{ {
auto& objects = outcome.GetResult().GetContents(); auto& objects = outcome.GetResult().GetContents();
@ -96,13 +103,18 @@ void AwsLevel2DataProvider::ListObjects(
objects.cend(), objects.cend(),
[&](const Aws::S3::Model::Object& object) [&](const Aws::S3::Model::Object& object)
{ {
// TODO: Skip MDM
std::string key = object.GetKey(); std::string key = object.GetKey();
if (!key.ends_with("_MDM"))
{
auto time = GetTimePointFromKey(key); auto time = GetTimePointFromKey(key);
std::unique_lock lock(p->objectsMutex_); std::unique_lock lock(p->objectsMutex_);
p->objects_[time] = key; p->objects_[time] = key;
totalObjects++;
}
}); });
} }
else else
@ -110,6 +122,8 @@ void AwsLevel2DataProvider::ListObjects(
logger_->warn("Could not list objects: {}", logger_->warn("Could not list objects: {}",
outcome.GetError().GetMessage()); outcome.GetError().GetMessage());
} }
return totalObjects;
} }
std::shared_ptr<wsr88d::Ar2vFile> std::shared_ptr<wsr88d::Ar2vFile>
@ -143,10 +157,36 @@ AwsLevel2DataProvider::LoadObjectByKey(const std::string& key)
void AwsLevel2DataProvider::Refresh() void AwsLevel2DataProvider::Refresh()
{ {
using namespace std::chrono;
logger_->debug("Refresh()"); logger_->debug("Refresh()");
// TODO: What if the date just rolled, we might miss from the previous date? static std::mutex refreshMutex;
ListObjects(std::chrono::system_clock::now()); static system_clock::time_point refreshDate {};
auto today = floor<days>(system_clock::now());
auto yesterday = today - days {1};
std::unique_lock lock(refreshMutex);
size_t objectCount;
// If we haven't gotten any objects from today, first list objects for
// yesterday, to ensure we haven't missed any objects near midnight
if (refreshDate < today)
{
objectCount = ListObjects(yesterday);
if (objectCount > 0)
{
refreshDate = yesterday;
}
}
objectCount = ListObjects(today);
if (objectCount > 0)
{
refreshDate = today;
}
} }
std::chrono::system_clock::time_point std::chrono::system_clock::time_point