Rewrite warnings provider to use HEAD requests instead of directory listing to find recent warnings

This commit is contained in:
Dan Paulat 2025-02-01 00:25:00 -06:00
parent 6ecb3f6ffb
commit 87af6479d6
3 changed files with 208 additions and 147 deletions

View file

@ -22,10 +22,8 @@ public:
WarningsProvider(WarningsProvider&&) noexcept;
WarningsProvider& operator=(WarningsProvider&&) noexcept;
std::pair<size_t, size_t>
ListFiles(std::chrono::system_clock::time_point newerThan = {});
std::vector<std::shared_ptr<awips::TextProductFile>>
LoadUpdatedFiles(std::chrono::system_clock::time_point newerThan = {});
LoadUpdatedFiles(std::chrono::sys_time<std::chrono::hours> newerThan = {});
private:
class Impl;

View file

@ -1,9 +1,18 @@
#include <scwx/provider/warnings_provider.hpp>
#include <scwx/network/dir_list.hpp>
#include <scwx/util/logger.hpp>
// Prevent redefinition of __cpp_lib_format
#if defined(_MSC_VER)
# include <yvals_core.h>
#endif
#include <ranges>
#include <shared_mutex>
// Enable chrono formatters
#ifndef __cpp_lib_format
# define __cpp_lib_format 202110L
#endif
#include <scwx/provider/warnings_provider.hpp>
#include <scwx/util/logger.hpp>
#include <scwx/util/time.hpp>
#include <mutex>
#if defined(_MSC_VER)
# pragma warning(push, 0)
@ -11,8 +20,6 @@
#define LIBXML_HTML_ENABLED
#include <cpr/cpr.h>
#include <libxml/HTMLparser.h>
#include <re2/re2.h>
#if (__cpp_lib_chrono < 201907L)
# include <date/date.h>
@ -35,13 +42,17 @@ class WarningsProvider::Impl
public:
struct FileInfoRecord
{
std::chrono::system_clock::time_point startTime_ {};
std::chrono::system_clock::time_point lastModified_ {};
size_t size_ {};
bool updated_ {};
FileInfoRecord(const std::string& contentLength,
const std::string& lastModified) :
contentLengthStr_ {contentLength}, lastModifiedStr_ {lastModified}
{
}
std::string contentLengthStr_ {};
std::string lastModifiedStr_ {};
};
typedef std::map<std::string, FileInfoRecord> WarningFileMap;
using WarningFileMap = std::map<std::string, FileInfoRecord>;
explicit Impl(const std::string& baseUrl) :
baseUrl_ {baseUrl}, files_ {}, filesMutex_ {}
@ -50,10 +61,13 @@ public:
~Impl() {}
bool UpdateFileRecord(const cpr::Response& response,
const std::string& filename);
std::string baseUrl_;
WarningFileMap files_;
std::shared_mutex filesMutex_;
WarningFileMap files_;
std::mutex filesMutex_;
};
WarningsProvider::WarningsProvider(const std::string& baseUrl) :
@ -66,145 +80,176 @@ WarningsProvider::WarningsProvider(WarningsProvider&&) noexcept = default;
WarningsProvider&
WarningsProvider::operator=(WarningsProvider&&) noexcept = default;
std::pair<size_t, size_t>
WarningsProvider::ListFiles(std::chrono::system_clock::time_point newerThan)
std::vector<std::shared_ptr<awips::TextProductFile>>
WarningsProvider::LoadUpdatedFiles(
std::chrono::sys_time<std::chrono::hours> startTime)
{
using namespace std::chrono;
#if (__cpp_lib_chrono < 201907L)
#if (__cpp_lib_chrono >= 201907L)
namespace date = std::chrono;
namespace df = std;
#else
using namespace date;
namespace df = date;
#endif
static constexpr LazyRE2 reWarningsFilename = {
"warnings_[0-9]{8}_[0-9]{2}.txt"};
static const std::string dateTimeFormat {"warnings_%Y%m%d_%H.txt"};
logger_->trace("Listing files");
size_t updatedObjects = 0;
size_t totalObjects = 0;
// Perform a directory listing
auto records = network::DirList(p->baseUrl_);
// Sort records by filename
std::sort(records.begin(),
records.end(),
[](auto& a, auto& b) { return a.filename_ < b.filename_; });
// Filter warning records
auto warningRecords =
records |
std::views::filter(
[](auto& record)
{
return record.type_ == std::filesystem::file_type::regular &&
RE2::FullMatch(record.filename_, *reWarningsFilename);
});
std::unique_lock lock(p->filesMutex_);
Impl::WarningFileMap warningFileMap;
// Store records
for (auto& record : warningRecords)
{
// Determine start time
std::chrono::sys_time<hours> startTime;
std::istringstream ssFilename {record.filename_};
ssFilename >> parse(dateTimeFormat, startTime);
// If start time is valid
if (!ssFilename.fail())
{
// Determine if the record should be marked updated
bool updated = true;
auto it = p->files_.find(record.filename_);
if (it != p->files_.cend())
{
auto& existingRecord = it->second;
updated = existingRecord.updated_ ||
record.size_ != existingRecord.size_ ||
record.mtime_ != existingRecord.lastModified_;
}
// Update object counts, but only if newer than threshold
if (newerThan < startTime)
{
if (updated)
{
++updatedObjects;
}
++totalObjects;
}
// Store record
warningFileMap.emplace(
std::piecewise_construct,
std::forward_as_tuple(record.filename_),
std::forward_as_tuple(
startTime, record.mtime_, record.size_, updated));
}
}
p->files_ = std::move(warningFileMap);
return std::make_pair(updatedObjects, totalObjects);
}
std::vector<std::shared_ptr<awips::TextProductFile>>
WarningsProvider::LoadUpdatedFiles(
std::chrono::system_clock::time_point newerThan)
{
logger_->debug("Loading updated files");
std::vector<
std::pair<std::string,
cpr::AsyncWrapper<std::optional<cpr::AsyncResponse>, false>>>
asyncCallbacks;
std::vector<std::shared_ptr<awips::TextProductFile>> updatedFiles;
std::vector<std::pair<std::string, cpr::AsyncResponse>> asyncResponses;
std::chrono::sys_time<std::chrono::hours> now =
std::chrono::floor<std::chrono::hours>(std::chrono::system_clock::now());
std::chrono::sys_time<std::chrono::hours> currentHour =
(startTime != std::chrono::sys_time<std::chrono::hours> {}) ?
startTime :
now - std::chrono::hours {1};
std::unique_lock lock(p->filesMutex_);
logger_->trace("Querying files newer than: {}", util::TimeString(startTime));
// For each warning file
for (auto& record : p->files_)
while (currentHour <= now)
{
// If file is updated, and time is later than the threshold
if (record.second.updated_ && newerThan < record.second.startTime_)
{
// Retrieve warning file
asyncResponses.emplace_back(
record.first,
cpr::GetAsync(cpr::Url {p->baseUrl_ + "/" + record.first}));
static constexpr std::string_view dateTimeFormat {
"warnings_{:%Y%m%d_%H}.txt"};
const std::string filename = df::format(dateTimeFormat, currentHour);
const std::string url = p->baseUrl_ + "/" + filename;
// Clear updated flag
record.second.updated_ = false;
}
logger_->trace("HEAD request for file: {}", filename);
asyncCallbacks.emplace_back(
filename,
cpr::HeadCallback(
[url, filename, this](
cpr::Response headResponse) -> std::optional<cpr::AsyncResponse>
{
if (headResponse.status_code == cpr::status::HTTP_OK)
{
bool updated =
p->UpdateFileRecord(headResponse, url); // TODO: filename
if (updated)
{
logger_->trace("GET request for file: {}", filename);
return cpr::GetAsync(cpr::Url {url});
}
}
else if (headResponse.status_code != cpr::status::HTTP_NOT_FOUND)
{
logger_->warn("HEAD request for file failed: {} ({})",
url,
headResponse.status_line);
}
return std::nullopt;
},
cpr::Url {url}));
// Query the next hour
currentHour += 1h;
}
lock.unlock();
// Wait for warning files to load
for (auto& asyncResponse : asyncResponses)
for (auto& asyncCallback : asyncCallbacks)
{
cpr::Response response = asyncResponse.second.get();
if (response.status_code == cpr::status::HTTP_OK)
{
logger_->debug("Loading file: {}", asyncResponse.first);
auto& filename = asyncCallback.first;
auto& callback = asyncCallback.second;
// Load file
std::shared_ptr<awips::TextProductFile> textProductFile {
std::make_shared<awips::TextProductFile>()};
std::istringstream responseBody {response.text};
if (textProductFile->LoadData(responseBody))
if (callback.valid())
{
// Wait for futures to complete
callback.wait();
auto asyncResponse = callback.get();
if (asyncResponse.has_value())
{
updatedFiles.push_back(textProductFile);
auto response = asyncResponse.value().get();
if (response.status_code == cpr::status::HTTP_OK)
{
logger_->debug("Loading file: {}", filename);
// Load file
std::shared_ptr<awips::TextProductFile> textProductFile {
std::make_shared<awips::TextProductFile>()};
std::istringstream responseBody {response.text};
if (textProductFile->LoadData(responseBody))
{
updatedFiles.push_back(textProductFile);
}
}
else
{
logger_->warn("Could not load file: {} ({})",
filename,
response.status_line);
}
}
}
else
{
logger_->error("Invalid future state");
}
}
return updatedFiles;
}
bool WarningsProvider::Impl::UpdateFileRecord(const cpr::Response& response,
const std::string& filename)
{
bool updated = false;
auto contentLengthIt = response.header.find("Content-Length");
auto lastModifiedIt = response.header.find("Last-Modified");
std::string contentLength {};
std::string lastModified {};
if (contentLengthIt != response.header.cend())
{
contentLength = contentLengthIt->second;
}
if (lastModifiedIt != response.header.cend())
{
lastModified = lastModifiedIt->second;
}
std::unique_lock lock(filesMutex_);
auto it = files_.find(filename);
if (it != files_.cend())
{
auto& existingRecord = it->second;
// If the size or last modified changes, request an update
if (!contentLength.empty() &&
contentLength != existingRecord.contentLengthStr_)
{
// Size changed
existingRecord.contentLengthStr_ = contentLengthIt->second;
updated = true;
}
else if (!lastModified.empty() &&
lastModified != existingRecord.lastModifiedStr_)
{
// Last modified changed
existingRecord.lastModifiedStr_ = lastModifiedIt->second;
updated = true;
}
}
else
{
// File not found
files_.emplace(std::piecewise_construct,
std::forward_as_tuple(filename),
std::forward_as_tuple(contentLength, lastModified));
updated = true;
}
return updated;
}
} // namespace provider
} // namespace scwx