#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace scwx { namespace qt { namespace manager { static const std::string logPrefix_ = "scwx::qt::manager::text_event_manager"; static const auto logger_ = scwx::util::Logger::Create(logPrefix_); static constexpr std::chrono::hours kInitialLoadHistoryDuration_ = std::chrono::days {3}; static constexpr std::chrono::hours kDefaultLoadHistoryDuration_ = std::chrono::hours {1}; static const std::array kPils_ = { "TOR", "SVR", "SVS", "FFW", "FFS"}; class TextEventManager::Impl { public: explicit Impl(TextEventManager* self) : self_ {self}, refreshTimer_ {threadPool_}, refreshMutex_ {}, textEventMap_ {}, textEventMutex_ {} { auto& generalSettings = settings::GeneralSettings::Instance(); warningsProvider_ = std::make_shared( generalSettings.warnings_provider().GetValue()); warningsProviderChangedCallbackUuid_ = generalSettings.warnings_provider().RegisterValueChangedCallback( [this](const std::string& value) { loadHistoryDuration_ = kInitialLoadHistoryDuration_; warningsProvider_ = std::make_shared(value); }); boost::asio::post(threadPool_, [this]() { try { main::Application::WaitForInitialization(); logger_->debug("Start Refresh"); Refresh(); } catch (const std::exception& ex) { logger_->error(ex.what()); } }); } ~Impl() { settings::GeneralSettings::Instance() .warnings_provider() .UnregisterValueChangedCallback(warningsProviderChangedCallbackUuid_); std::unique_lock lock(refreshMutex_); refreshTimer_.cancel(); lock.unlock(); threadPool_.join(); } void HandleMessage(const std::shared_ptr& message); void LoadArchives(ranges::any_view dates); void RefreshAsync(); void Refresh(); void UpdateArchiveDates(ranges::any_view dates); // Thread pool sized for: // - Live Refresh (1x) // - Archive Loading (1x) boost::asio::thread_pool threadPool_ {2u}; TextEventManager* self_; boost::asio::steady_timer refreshTimer_; std::mutex refreshMutex_; std::unordered_map>, types::TextEventHash> textEventMap_; std::shared_mutex textEventMutex_; std::unique_ptr iemApiProvider_ { std::make_unique()}; std::shared_ptr warningsProvider_ {nullptr}; std::chrono::hours loadHistoryDuration_ {kInitialLoadHistoryDuration_}; std::chrono::sys_time prevLoadTime_ {}; std::chrono::sys_days archiveLimit_ {}; std::mutex archiveMutex_ {}; std::list archiveDates_ {}; std::map>> archiveMap_; boost::uuids::uuid warningsProviderChangedCallbackUuid_ {}; }; TextEventManager::TextEventManager() : p(std::make_unique(this)) {} TextEventManager::~TextEventManager() = default; size_t TextEventManager::message_count(const types::TextEventKey& key) const { size_t messageCount = 0u; std::shared_lock lock(p->textEventMutex_); auto it = p->textEventMap_.find(key); if (it != p->textEventMap_.cend()) { messageCount = it->second.size(); } return messageCount; } std::vector> TextEventManager::message_list(const types::TextEventKey& key) const { std::vector> messageList {}; std::shared_lock lock(p->textEventMutex_); auto it = p->textEventMap_.find(key); if (it != p->textEventMap_.cend()) { messageList.assign(it->second.begin(), it->second.end()); } return messageList; } void TextEventManager::LoadFile(const std::string& filename) { logger_->debug("LoadFile: {}", filename); boost::asio::post(p->threadPool_, [=, this]() { try { awips::TextProductFile file; // Load file bool fileLoaded = file.LoadFile(filename); if (!fileLoaded) { return; } // Process messages auto messages = file.messages(); for (auto& message : messages) { p->HandleMessage(message); } } catch (const std::exception& ex) { logger_->error(ex.what()); } }); } void TextEventManager::SelectTime( std::chrono::system_clock::time_point dateTime) { if (dateTime == std::chrono::system_clock::time_point {}) { // Ignore a default date/time selection return; } logger_->trace("Select Time: {}", util::TimeString(dateTime)); const auto today = std::chrono::floor(dateTime); const auto yesterday = today - std::chrono::days {1}; const auto tomorrow = today + std::chrono::days {1}; const auto dateArray = std::array {today, yesterday, tomorrow}; const ranges::any_view dates = dateArray | ranges::views::filter( [this](const auto& date) { return p->archiveLimit_ == std::chrono::sys_days {} || date < p->archiveLimit_; }); p->LoadArchives(dates); } void TextEventManager::Impl::HandleMessage( const std::shared_ptr& message) { auto segments = message->segments(); // If there are no segments, skip this message if (segments.empty()) { return; } for (auto& segment : segments) { // If a segment has no header, or if there is no VTEC string, skip this // message. A segmented message corresponding to a text event should have // this information. if (!segment->header_.has_value() || segment->header_->vtecString_.empty()) { return; } } std::unique_lock lock(textEventMutex_); // Find a matching event in the event map auto& vtecString = segments[0]->header_->vtecString_; types::TextEventKey key {vtecString[0].pVtec_}; size_t messageIndex = 0; auto it = textEventMap_.find(key); bool updated = false; if (it == textEventMap_.cend()) { // If there was no matching event, add the message to a new event textEventMap_.emplace(key, std::vector {message}); messageIndex = 0; updated = true; } else if (std::find_if(it->second.cbegin(), it->second.cend(), [=](auto& storedMessage) { return *message->wmo_header().get() == *storedMessage->wmo_header().get(); }) == it->second.cend()) { // If there was a matching event, and this message has not been stored // (WMO header equivalence check), add the updated message to the existing // event // Determine the chronological sequence of the message. Note, if there // were no time hints given to the WMO header, this will place the message // at the end of the vector. auto insertionPoint = std::upper_bound( it->second.begin(), it->second.end(), message, [](const std::shared_ptr& a, const std::shared_ptr& b) { return a->wmo_header()->GetDateTime() < b->wmo_header()->GetDateTime(); }); // Insert the message in chronological order messageIndex = std::distance(it->second.begin(), insertionPoint); it->second.insert(insertionPoint, message); updated = true; }; lock.unlock(); if (updated) { Q_EMIT self_->AlertUpdated(key, messageIndex, message->uuid()); } } void TextEventManager::Impl::LoadArchives( ranges::any_view dates) { UpdateArchiveDates(dates); std::unique_lock lock {archiveMutex_}; // Don't reload data that has already been loaded const ranges::any_view filteredDates = dates | ranges::views::filter([this](const auto& date) { return !archiveMap_.contains(date); }); lock.unlock(); // Query for products const auto& productIds = iemApiProvider_->ListTextProducts(filteredDates, {}, kPils_); if (productIds.has_value()) { logger_->debug("Loading {} products", productIds.value().size()); // Load listed products auto products = iemApiProvider_->LoadTextProducts(productIds.value()); for (auto& product : products) { const auto& messages = product->messages(); for (auto& message : messages) { HandleMessage(message); } } lock.lock(); for (const auto& date : dates) { archiveMap_[date]; // TODO: Store the products in the archive } lock.unlock(); } } void TextEventManager::Impl::RefreshAsync() { boost::asio::post(threadPool_, [this]() { try { Refresh(); } catch (const std::exception& ex) { logger_->error(ex.what()); } }); } void TextEventManager::Impl::Refresh() { logger_->trace("Refresh"); // Take a unique lock before refreshing std::unique_lock lock(refreshMutex_); // Take a copy of the current warnings provider to protect against change std::shared_ptr warningsProvider = warningsProvider_; // Load updated files from the warnings provider // Start time should default to: // - 3 days of history for the first load // - 1 hour of history for subsequent loads // If the time jumps, we should attempt to load from no later than the // previous load time auto loadTime = std::chrono::floor(std::chrono::system_clock::now()); auto startTime = loadTime - loadHistoryDuration_; if (prevLoadTime_ != std::chrono::sys_time {}) { startTime = std::min(startTime, prevLoadTime_); } if (archiveLimit_ == std::chrono::sys_days {}) { archiveLimit_ = std::chrono::ceil(startTime); } auto updatedFiles = warningsProvider->LoadUpdatedFiles(startTime); // Store the load time and reset the load history duration prevLoadTime_ = loadTime; loadHistoryDuration_ = kDefaultLoadHistoryDuration_; // Handle messages for (auto& file : updatedFiles) { for (auto& message : file->messages()) { HandleMessage(message); } } // Schedule another update in 15 seconds using namespace std::chrono; refreshTimer_.expires_after(15s); refreshTimer_.async_wait( [this](const boost::system::error_code& e) { if (e == boost::asio::error::operation_aborted) { logger_->debug("Refresh timer cancelled"); } else if (e != boost::system::errc::success) { logger_->warn("Refresh timer error: {}", e.message()); } else { RefreshAsync(); } }); } void TextEventManager::Impl::UpdateArchiveDates( ranges::any_view dates) { std::unique_lock lock {archiveMutex_}; for (const auto& date : dates) { // Remove any existing occurrences of day, and add to the back of the list archiveDates_.remove(date); archiveDates_.push_back(date); } } std::shared_ptr TextEventManager::Instance() { static std::weak_ptr textEventManagerReference_ {}; static std::mutex instanceMutex_ {}; std::unique_lock lock(instanceMutex_); std::shared_ptr textEventManager = textEventManagerReference_.lock(); if (textEventManager == nullptr) { textEventManager = std::make_shared(); textEventManagerReference_ = textEventManager; } return textEventManager; } } // namespace manager } // namespace qt } // namespace scwx