From 87af6479d6bf07189d76bc4e813b6e6b24d207ed Mon Sep 17 00:00:00 2001 From: Dan Paulat Date: Sat, 1 Feb 2025 00:25:00 -0600 Subject: [PATCH] Rewrite warnings provider to use HEAD requests instead of directory listing to find recent warnings --- .../scwx/qt/manager/text_event_manager.cpp | 46 ++- .../scwx/provider/warnings_provider.hpp | 4 +- .../scwx/provider/warnings_provider.cpp | 305 ++++++++++-------- 3 files changed, 208 insertions(+), 147 deletions(-) diff --git a/scwx-qt/source/scwx/qt/manager/text_event_manager.cpp b/scwx-qt/source/scwx/qt/manager/text_event_manager.cpp index e48ecd48..0a7c66f5 100644 --- a/scwx-qt/source/scwx/qt/manager/text_event_manager.cpp +++ b/scwx-qt/source/scwx/qt/manager/text_event_manager.cpp @@ -22,8 +22,10 @@ namespace manager static const std::string logPrefix_ = "scwx::qt::manager::text_event_manager"; static const auto logger_ = scwx::util::Logger::Create(logPrefix_); -static const std::string& kDefaultWarningsProviderUrl { - "https://warnings.allisonhouse.com"}; +static constexpr std::chrono::hours kInitialLoadHistoryDuration_ = + std::chrono::days {3}; +static constexpr std::chrono::hours kDefaultLoadHistoryDuration_ = + std::chrono::hours {1}; class TextEventManager::Impl { @@ -42,7 +44,9 @@ public: warningsProviderChangedCallbackUuid_ = generalSettings.warnings_provider().RegisterValueChangedCallback( - [this](const std::string& value) { + [this](const std::string& value) + { + loadHistoryDuration_ = kInitialLoadHistoryDuration_; warningsProvider_ = std::make_shared(value); }); @@ -94,6 +98,8 @@ public: std::shared_mutex textEventMutex_; std::shared_ptr warningsProvider_ {nullptr}; + std::chrono::hours loadHistoryDuration_ {kInitialLoadHistoryDuration_}; + std::chrono::sys_time prevLoadTime_ {}; boost::uuids::uuid warningsProviderChangedCallbackUuid_ {}; }; @@ -254,21 +260,33 @@ void TextEventManager::Impl::Refresh() std::shared_ptr warningsProvider = warningsProvider_; - // Update the file listing from the warnings provider - auto [newFiles, totalFiles] = warningsProvider->ListFiles(); + // Load updated files from the warnings provider + // Start time should default to: + // - 3 days of history for the first load + // - 1 hour of history for subsequent loads + // If the time jumps, we should attempt to load from no later than the + // previous load time + auto loadTime = + std::chrono::floor(std::chrono::system_clock::now()); + auto startTime = loadTime - loadHistoryDuration_; - if (newFiles > 0) + if (prevLoadTime_ != std::chrono::sys_time {}) { - // Load new files - auto updatedFiles = warningsProvider->LoadUpdatedFiles(); + startTime = std::min(startTime, prevLoadTime_); + } - // Handle messages - for (auto& file : updatedFiles) + auto updatedFiles = warningsProvider->LoadUpdatedFiles(startTime); + + // Store the load time and reset the load history duration + prevLoadTime_ = loadTime; + loadHistoryDuration_ = kDefaultLoadHistoryDuration_; + + // Handle messages + for (auto& file : updatedFiles) + { + for (auto& message : file->messages()) { - for (auto& message : file->messages()) - { - HandleMessage(message); - } + HandleMessage(message); } } diff --git a/wxdata/include/scwx/provider/warnings_provider.hpp b/wxdata/include/scwx/provider/warnings_provider.hpp index e519ec5d..140b671f 100644 --- a/wxdata/include/scwx/provider/warnings_provider.hpp +++ b/wxdata/include/scwx/provider/warnings_provider.hpp @@ -22,10 +22,8 @@ public: WarningsProvider(WarningsProvider&&) noexcept; WarningsProvider& operator=(WarningsProvider&&) noexcept; - std::pair - ListFiles(std::chrono::system_clock::time_point newerThan = {}); std::vector> - LoadUpdatedFiles(std::chrono::system_clock::time_point newerThan = {}); + LoadUpdatedFiles(std::chrono::sys_time newerThan = {}); private: class Impl; diff --git a/wxdata/source/scwx/provider/warnings_provider.cpp b/wxdata/source/scwx/provider/warnings_provider.cpp index 8cfe9b77..d506fc5c 100644 --- a/wxdata/source/scwx/provider/warnings_provider.cpp +++ b/wxdata/source/scwx/provider/warnings_provider.cpp @@ -1,9 +1,18 @@ -#include -#include -#include +// Prevent redefinition of __cpp_lib_format +#if defined(_MSC_VER) +# include +#endif -#include -#include +// Enable chrono formatters +#ifndef __cpp_lib_format +# define __cpp_lib_format 202110L +#endif + +#include +#include +#include + +#include #if defined(_MSC_VER) # pragma warning(push, 0) @@ -11,8 +20,6 @@ #define LIBXML_HTML_ENABLED #include -#include -#include #if (__cpp_lib_chrono < 201907L) # include @@ -35,13 +42,17 @@ class WarningsProvider::Impl public: struct FileInfoRecord { - std::chrono::system_clock::time_point startTime_ {}; - std::chrono::system_clock::time_point lastModified_ {}; - size_t size_ {}; - bool updated_ {}; + FileInfoRecord(const std::string& contentLength, + const std::string& lastModified) : + contentLengthStr_ {contentLength}, lastModifiedStr_ {lastModified} + { + } + + std::string contentLengthStr_ {}; + std::string lastModifiedStr_ {}; }; - typedef std::map WarningFileMap; + using WarningFileMap = std::map; explicit Impl(const std::string& baseUrl) : baseUrl_ {baseUrl}, files_ {}, filesMutex_ {} @@ -50,10 +61,13 @@ public: ~Impl() {} + bool UpdateFileRecord(const cpr::Response& response, + const std::string& filename); + std::string baseUrl_; - WarningFileMap files_; - std::shared_mutex filesMutex_; + WarningFileMap files_; + std::mutex filesMutex_; }; WarningsProvider::WarningsProvider(const std::string& baseUrl) : @@ -66,145 +80,176 @@ WarningsProvider::WarningsProvider(WarningsProvider&&) noexcept = default; WarningsProvider& WarningsProvider::operator=(WarningsProvider&&) noexcept = default; -std::pair -WarningsProvider::ListFiles(std::chrono::system_clock::time_point newerThan) +std::vector> +WarningsProvider::LoadUpdatedFiles( + std::chrono::sys_time startTime) { using namespace std::chrono; -#if (__cpp_lib_chrono < 201907L) +#if (__cpp_lib_chrono >= 201907L) + namespace date = std::chrono; + namespace df = std; +#else using namespace date; + namespace df = date; #endif - static constexpr LazyRE2 reWarningsFilename = { - "warnings_[0-9]{8}_[0-9]{2}.txt"}; - static const std::string dateTimeFormat {"warnings_%Y%m%d_%H.txt"}; - - logger_->trace("Listing files"); - - size_t updatedObjects = 0; - size_t totalObjects = 0; - - // Perform a directory listing - auto records = network::DirList(p->baseUrl_); - - // Sort records by filename - std::sort(records.begin(), - records.end(), - [](auto& a, auto& b) { return a.filename_ < b.filename_; }); - - // Filter warning records - auto warningRecords = - records | - std::views::filter( - [](auto& record) - { - return record.type_ == std::filesystem::file_type::regular && - RE2::FullMatch(record.filename_, *reWarningsFilename); - }); - - std::unique_lock lock(p->filesMutex_); - - Impl::WarningFileMap warningFileMap; - - // Store records - for (auto& record : warningRecords) - { - // Determine start time - std::chrono::sys_time startTime; - std::istringstream ssFilename {record.filename_}; - - ssFilename >> parse(dateTimeFormat, startTime); - - // If start time is valid - if (!ssFilename.fail()) - { - // Determine if the record should be marked updated - bool updated = true; - auto it = p->files_.find(record.filename_); - if (it != p->files_.cend()) - { - auto& existingRecord = it->second; - - updated = existingRecord.updated_ || - record.size_ != existingRecord.size_ || - record.mtime_ != existingRecord.lastModified_; - } - - // Update object counts, but only if newer than threshold - if (newerThan < startTime) - { - if (updated) - { - ++updatedObjects; - } - ++totalObjects; - } - - // Store record - warningFileMap.emplace( - std::piecewise_construct, - std::forward_as_tuple(record.filename_), - std::forward_as_tuple( - startTime, record.mtime_, record.size_, updated)); - } - } - - p->files_ = std::move(warningFileMap); - - return std::make_pair(updatedObjects, totalObjects); -} - -std::vector> -WarningsProvider::LoadUpdatedFiles( - std::chrono::system_clock::time_point newerThan) -{ - logger_->debug("Loading updated files"); - + std::vector< + std::pair, false>>> + asyncCallbacks; std::vector> updatedFiles; - std::vector> asyncResponses; + std::chrono::sys_time now = + std::chrono::floor(std::chrono::system_clock::now()); + std::chrono::sys_time currentHour = + (startTime != std::chrono::sys_time {}) ? + startTime : + now - std::chrono::hours {1}; - std::unique_lock lock(p->filesMutex_); + logger_->trace("Querying files newer than: {}", util::TimeString(startTime)); - // For each warning file - for (auto& record : p->files_) + while (currentHour <= now) { - // If file is updated, and time is later than the threshold - if (record.second.updated_ && newerThan < record.second.startTime_) - { - // Retrieve warning file - asyncResponses.emplace_back( - record.first, - cpr::GetAsync(cpr::Url {p->baseUrl_ + "/" + record.first})); + static constexpr std::string_view dateTimeFormat { + "warnings_{:%Y%m%d_%H}.txt"}; + const std::string filename = df::format(dateTimeFormat, currentHour); + const std::string url = p->baseUrl_ + "/" + filename; - // Clear updated flag - record.second.updated_ = false; - } + logger_->trace("HEAD request for file: {}", filename); + + asyncCallbacks.emplace_back( + filename, + cpr::HeadCallback( + [url, filename, this]( + cpr::Response headResponse) -> std::optional + { + if (headResponse.status_code == cpr::status::HTTP_OK) + { + bool updated = + p->UpdateFileRecord(headResponse, url); // TODO: filename + + if (updated) + { + logger_->trace("GET request for file: {}", filename); + return cpr::GetAsync(cpr::Url {url}); + } + } + else if (headResponse.status_code != cpr::status::HTTP_NOT_FOUND) + { + logger_->warn("HEAD request for file failed: {} ({})", + url, + headResponse.status_line); + } + + return std::nullopt; + }, + cpr::Url {url})); + + // Query the next hour + currentHour += 1h; } - lock.unlock(); - - // Wait for warning files to load - for (auto& asyncResponse : asyncResponses) + for (auto& asyncCallback : asyncCallbacks) { - cpr::Response response = asyncResponse.second.get(); - if (response.status_code == cpr::status::HTTP_OK) - { - logger_->debug("Loading file: {}", asyncResponse.first); + auto& filename = asyncCallback.first; + auto& callback = asyncCallback.second; - // Load file - std::shared_ptr textProductFile { - std::make_shared()}; - std::istringstream responseBody {response.text}; - if (textProductFile->LoadData(responseBody)) + if (callback.valid()) + { + // Wait for futures to complete + callback.wait(); + auto asyncResponse = callback.get(); + + if (asyncResponse.has_value()) { - updatedFiles.push_back(textProductFile); + auto response = asyncResponse.value().get(); + + if (response.status_code == cpr::status::HTTP_OK) + { + logger_->debug("Loading file: {}", filename); + + // Load file + std::shared_ptr textProductFile { + std::make_shared()}; + std::istringstream responseBody {response.text}; + if (textProductFile->LoadData(responseBody)) + { + updatedFiles.push_back(textProductFile); + } + } + else + { + logger_->warn("Could not load file: {} ({})", + filename, + response.status_line); + } } } + else + { + logger_->error("Invalid future state"); + } } return updatedFiles; } +bool WarningsProvider::Impl::UpdateFileRecord(const cpr::Response& response, + const std::string& filename) +{ + bool updated = false; + + auto contentLengthIt = response.header.find("Content-Length"); + auto lastModifiedIt = response.header.find("Last-Modified"); + + std::string contentLength {}; + std::string lastModified {}; + + if (contentLengthIt != response.header.cend()) + { + contentLength = contentLengthIt->second; + } + if (lastModifiedIt != response.header.cend()) + { + lastModified = lastModifiedIt->second; + } + + std::unique_lock lock(filesMutex_); + + auto it = files_.find(filename); + if (it != files_.cend()) + { + auto& existingRecord = it->second; + + // If the size or last modified changes, request an update + + if (!contentLength.empty() && + contentLength != existingRecord.contentLengthStr_) + { + // Size changed + existingRecord.contentLengthStr_ = contentLengthIt->second; + updated = true; + } + else if (!lastModified.empty() && + lastModified != existingRecord.lastModifiedStr_) + { + // Last modified changed + existingRecord.lastModifiedStr_ = lastModifiedIt->second; + updated = true; + } + } + else + { + // File not found + files_.emplace(std::piecewise_construct, + std::forward_as_tuple(filename), + std::forward_as_tuple(contentLength, lastModified)); + updated = true; + } + + return updated; +} + } // namespace provider } // namespace scwx