#include #include #include #include #include #include #include #include namespace scwx { namespace qt { namespace manager { static const std::string logPrefix_ = "scwx::qt::manager::text_event_manager"; static const auto logger_ = scwx::util::Logger::Create(logPrefix_); static const std::string& kDefaultWarningsProviderUrl { "https://warnings.allisonhouse.com"}; class TextEventManager::Impl { public: explicit Impl(TextEventManager* self) : self_ {self}, refreshTimer_ {util::io_context()}, refreshMutex_ {}, textEventMap_ {}, textEventMutex_ {}, warningsProvider_ {kDefaultWarningsProviderUrl} { util::async([=]() { Refresh(); }); } ~Impl() { std::unique_lock lock(refreshMutex_); refreshTimer_.cancel(); } void HandleMessage(std::shared_ptr message); void Refresh(); TextEventManager* self_; boost::asio::steady_timer refreshTimer_; std::mutex refreshMutex_; std::unordered_map>, types::TextEventHash> textEventMap_; std::shared_mutex textEventMutex_; provider::WarningsProvider warningsProvider_; }; TextEventManager::TextEventManager() : p(std::make_unique(this)) {} TextEventManager::~TextEventManager() = default; size_t TextEventManager::message_count(const types::TextEventKey& key) const { size_t messageCount = 0u; std::shared_lock lock(p->textEventMutex_); auto it = p->textEventMap_.find(key); if (it != p->textEventMap_.cend()) { messageCount = it->second.size(); } return messageCount; } std::vector> TextEventManager::message_list(const types::TextEventKey& key) const { std::vector> messageList {}; std::shared_lock lock(p->textEventMutex_); auto it = p->textEventMap_.find(key); if (it != p->textEventMap_.cend()) { messageList.assign(it->second.begin(), it->second.end()); } return messageList; } void TextEventManager::LoadFile(const std::string& filename) { logger_->debug("LoadFile: {}", filename); util::async( [=]() { awips::TextProductFile file; // Load file bool fileLoaded = file.LoadFile(filename); if (!fileLoaded) { return; } // Process messages auto messages = file.messages(); for (auto& message : messages) { p->HandleMessage(message); } }); } void TextEventManager::Impl::HandleMessage( std::shared_ptr message) { auto segments = message->segments(); // If there are no segments, skip this message if (segments.empty()) { return; } for (auto& segment : segments) { // If a segment has no header, or if there is no VTEC string, skip this // message. A segmented message corresponding to a text event should have // this information. if (!segment->header_.has_value() || segment->header_->vtecString_.empty()) { return; } } std::unique_lock lock(textEventMutex_); // Find a matching event in the event map auto& vtecString = segments[0]->header_->vtecString_; types::TextEventKey key {vtecString[0].pVtec_}; size_t messageIndex = 0; auto it = textEventMap_.find(key); bool updated = false; if (it == textEventMap_.cend()) { // If there was no matching event, add the message to a new event textEventMap_.emplace(key, std::vector {message}); messageIndex = 0; updated = true; } else if (std::find_if(it->second.cbegin(), it->second.cend(), [=](auto& storedMessage) { return *message->wmo_header().get() == *storedMessage->wmo_header().get(); }) == it->second.cend()) { // If there was a matching event, and this message has not been stored // (WMO header equivalence check), add the updated message to the existing // event messageIndex = it->second.size(); it->second.push_back(message); updated = true; }; lock.unlock(); if (updated) { emit self_->AlertUpdated(key, messageIndex); } } void TextEventManager::Impl::Refresh() { using namespace std::chrono; logger_->trace("Refresh"); // Take a unique lock before refreshing std::unique_lock lock(refreshMutex_); // Set threshold to last 30 hours auto newerThan = std::chrono::system_clock::now() - 30h; // Update the file listing from the warnings provider auto [newFiles, totalFiles] = warningsProvider_.ListFiles(newerThan); if (newFiles > 0) { // Load new files auto updatedFiles = warningsProvider_.LoadUpdatedFiles(newerThan); // Handle messages for (auto& file : updatedFiles) { for (auto& message : file->messages()) { HandleMessage(message); } } } // Schedule another update in 15 seconds using namespace std::chrono; refreshTimer_.expires_after(15s); refreshTimer_.async_wait( [=](const boost::system::error_code& e) { if (e == boost::asio::error::operation_aborted) { logger_->debug("Refresh timer cancelled"); } else if (e != boost::system::errc::success) { logger_->warn("Refresh timer error: {}", e.message()); } else { Refresh(); } }); } std::shared_ptr TextEventManager::Instance() { static std::weak_ptr textEventManagerReference_ {}; static std::mutex instanceMutex_ {}; std::unique_lock lock(instanceMutex_); std::shared_ptr textEventManager = textEventManagerReference_.lock(); if (textEventManager == nullptr) { textEventManager = std::make_shared(); textEventManagerReference_ = textEventManager; } return textEventManager; } } // namespace manager } // namespace qt } // namespace scwx