Merge pull request #230 from dpaulat/feature/thread-error-handling

Catch exceptions in background threads
This commit is contained in:
Dan Paulat 2024-06-15 23:45:05 -05:00 committed by GitHub
commit a47c77e4a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 866 additions and 665 deletions

View file

@ -612,20 +612,27 @@ void MainWindow::on_actionCheckForUpdates_triggered()
p->threadPool_,
[this]()
{
if (!p->updateManager_->CheckForUpdates(main::kVersionString_))
try
{
QMetaObject::invokeMethod(
this,
[this]()
{
QMessageBox* messageBox = new QMessageBox(this);
messageBox->setIcon(QMessageBox::Icon::Information);
messageBox->setWindowTitle(tr("Check for Updates"));
messageBox->setText(tr("Supercell Wx is up to date."));
messageBox->setStandardButtons(
QMessageBox::StandardButton::Ok);
messageBox->show();
});
if (!p->updateManager_->CheckForUpdates(main::kVersionString_))
{
QMetaObject::invokeMethod(
this,
[this]()
{
QMessageBox* messageBox = new QMessageBox(this);
messageBox->setIcon(QMessageBox::Icon::Information);
messageBox->setWindowTitle(tr("Check for Updates"));
messageBox->setText(tr("Supercell Wx is up to date."));
messageBox->setStandardButtons(
QMessageBox::StandardButton::Ok);
messageBox->show();
});
}
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
@ -663,9 +670,16 @@ void MainWindowImpl::AsyncSetup()
boost::asio::post(threadPool_,
[this]()
{
manager::UpdateManager::RemoveTemporaryReleases();
updateManager_->CheckForUpdates(
main::kVersionString_);
try
{
manager::UpdateManager::RemoveTemporaryReleases();
updateManager_->CheckForUpdates(
main::kVersionString_);
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
}

View file

@ -42,7 +42,17 @@ public:
[this](const types::TextEventKey& key, size_t messageIndex)
{
boost::asio::post(threadPool_,
[=, this]() { HandleAlert(key, messageIndex); });
[=, this]()
{
try
{
HandleAlert(key, messageIndex);
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
});
}

View file

@ -25,6 +25,8 @@ public:
~Impl() { threadPool_.join(); }
void DownloadSync(const std::shared_ptr<request::DownloadRequest>& request);
boost::asio::thread_pool threadPool_ {1u};
DownloadManager* self_;
@ -36,224 +38,229 @@ DownloadManager::~DownloadManager() = default;
void DownloadManager::Download(
const std::shared_ptr<request::DownloadRequest>& request)
{
boost::asio::post(
p->threadPool_,
[=]()
boost::asio::post(p->threadPool_,
[=]()
{
try
{
p->DownloadSync(request);
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
void DownloadManager::Impl::DownloadSync(
const std::shared_ptr<request::DownloadRequest>& request)
{
// Prepare destination file
const std::filesystem::path& destinationPath = request->destination_path();
if (!destinationPath.has_parent_path())
{
logger_->error("Destination has no parent path: \"{}\"");
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
const std::filesystem::path parentPath = destinationPath.parent_path();
// Create directory if it doesn't exist
if (!std::filesystem::exists(parentPath))
{
if (!std::filesystem::create_directories(parentPath))
{
// Prepare destination file
const std::filesystem::path& destinationPath =
request->destination_path();
logger_->error("Unable to create download directory: \"{}\"",
parentPath.string());
if (!destinationPath.has_parent_path())
{
logger_->error("Destination has no parent path: \"{}\"");
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
const std::filesystem::path parentPath = destinationPath.parent_path();
// Create directory if it doesn't exist
if (!std::filesystem::exists(parentPath))
{
if (!std::filesystem::create_directories(parentPath))
{
logger_->error("Unable to create download directory: \"{}\"",
parentPath.string());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
}
// Remove file if it exists
if (std::filesystem::exists(destinationPath))
{
std::error_code error;
if (!std::filesystem::remove(destinationPath, error))
{
logger_->error(
"Unable to remove existing destination file ({}): \"{}\"",
error.message(),
destinationPath.string());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
}
// Open file for writing
std::ofstream ofs {destinationPath,
std::ios_base::out | std::ios_base::binary |
std::ios_base::trunc};
if (!ofs.is_open() || !ofs.good())
{
logger_->error(
"Unable to open destination file for writing: \"{}\"",
destinationPath.string());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
std::chrono::system_clock::time_point lastUpdated {};
cpr::cpr_off_t lastDownloadNow {};
cpr::cpr_off_t lastDownloadTotal {};
// Download file
cpr::Response response =
cpr::Get(cpr::Url {request->url()},
cpr::ProgressCallback(
[&](cpr::cpr_off_t downloadTotal,
cpr::cpr_off_t downloadNow,
cpr::cpr_off_t /* uploadTotal */,
cpr::cpr_off_t /* uploadNow */,
std::intptr_t /* userdata */)
{
using namespace std::chrono_literals;
std::chrono::system_clock::time_point now =
std::chrono::system_clock::now();
// Only emit an update every 100ms
if ((now > lastUpdated + 100ms ||
downloadNow == downloadTotal) &&
(downloadNow != lastDownloadNow ||
downloadTotal != lastDownloadTotal))
{
logger_->trace("Downloaded: {} / {}",
downloadNow,
downloadTotal);
Q_EMIT request->ProgressUpdated(downloadNow,
downloadTotal);
lastUpdated = now;
lastDownloadNow = downloadNow;
lastDownloadTotal = downloadTotal;
}
return !request->IsCanceled();
}),
cpr::WriteCallback(
[&](std::string data, std::intptr_t /* userdata */)
{
// Write file
ofs << data;
return !request->IsCanceled();
}));
bool ofsGood = ofs.good();
ofs.close();
// Handle error response
if (response.error.code != cpr::ErrorCode::OK ||
request->IsCanceled() || !ofsGood)
{
request::DownloadRequest::CompleteReason reason =
request::DownloadRequest::CompleteReason::IOError;
if (request->IsCanceled())
{
logger_->info("Download request cancelled: {}", request->url());
reason = request::DownloadRequest::CompleteReason::Canceled;
}
else if (response.error.code != cpr::ErrorCode::OK)
{
logger_->error("Error downloading file ({}): {}",
response.error.message,
request->url());
reason = request::DownloadRequest::CompleteReason::RemoteError;
}
else if (!ofsGood)
{
logger_->error("File I/O error: {}", destinationPath.string());
reason = request::DownloadRequest::CompleteReason::IOError;
}
std::error_code error;
if (!std::filesystem::remove(destinationPath, error))
{
logger_->error("Unable to remove destination file: {}, {}",
destinationPath.string(),
error.message());
}
Q_EMIT request->RequestComplete(reason);
return;
}
// Handle response
const auto contentMd5 = response.header.find("content-md5");
if (contentMd5 != response.header.cend() &&
!contentMd5->second.empty())
{
// Open file for reading
std::ifstream is {destinationPath,
std::ios_base::in | std::ios_base::binary};
if (!is.is_open() || !is.good())
{
logger_->error("Unable to open destination file for reading: {}",
destinationPath.string());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
// Compute MD5
std::vector<std::uint8_t> digest {};
if (!util::ComputeDigest(EVP_md5(), is, digest))
{
logger_->error("Failed to compute MD5: {}",
destinationPath.string());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
// Compare calculated MD5 with digest in response header
QByteArray expectedDigestArray =
QByteArray::fromBase64(contentMd5->second.c_str());
std::vector<std::uint8_t> expectedDigest(
expectedDigestArray.cbegin(), expectedDigestArray.cend());
if (digest != expectedDigest)
{
QByteArray calculatedDigest(
reinterpret_cast<char*>(digest.data()), digest.size());
logger_->error("Digest mismatch: {} != {}",
calculatedDigest.toBase64().toStdString(),
contentMd5->second);
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::DigestError);
return;
}
}
logger_->info("Download complete: {}", request->url());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::OK);
});
request::DownloadRequest::CompleteReason::IOError);
return;
}
}
// Remove file if it exists
if (std::filesystem::exists(destinationPath))
{
std::error_code error;
if (!std::filesystem::remove(destinationPath, error))
{
logger_->error(
"Unable to remove existing destination file ({}): \"{}\"",
error.message(),
destinationPath.string());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
}
// Open file for writing
std::ofstream ofs {destinationPath,
std::ios_base::out | std::ios_base::binary |
std::ios_base::trunc};
if (!ofs.is_open() || !ofs.good())
{
logger_->error("Unable to open destination file for writing: \"{}\"",
destinationPath.string());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
std::chrono::system_clock::time_point lastUpdated {};
cpr::cpr_off_t lastDownloadNow {};
cpr::cpr_off_t lastDownloadTotal {};
// Download file
cpr::Response response = cpr::Get(
cpr::Url {request->url()},
cpr::ProgressCallback(
[&](cpr::cpr_off_t downloadTotal,
cpr::cpr_off_t downloadNow,
cpr::cpr_off_t /* uploadTotal */,
cpr::cpr_off_t /* uploadNow */,
std::intptr_t /* userdata */)
{
using namespace std::chrono_literals;
std::chrono::system_clock::time_point now =
std::chrono::system_clock::now();
// Only emit an update every 100ms
if ((now > lastUpdated + 100ms || downloadNow == downloadTotal) &&
(downloadNow != lastDownloadNow ||
downloadTotal != lastDownloadTotal))
{
logger_->trace(
"Downloaded: {} / {}", downloadNow, downloadTotal);
Q_EMIT request->ProgressUpdated(downloadNow, downloadTotal);
lastUpdated = now;
lastDownloadNow = downloadNow;
lastDownloadTotal = downloadTotal;
}
return !request->IsCanceled();
}),
cpr::WriteCallback(
[&](std::string data, std::intptr_t /* userdata */)
{
// Write file
ofs << data;
return !request->IsCanceled();
}));
bool ofsGood = ofs.good();
ofs.close();
// Handle error response
if (response.error.code != cpr::ErrorCode::OK || request->IsCanceled() ||
!ofsGood)
{
request::DownloadRequest::CompleteReason reason =
request::DownloadRequest::CompleteReason::IOError;
if (request->IsCanceled())
{
logger_->info("Download request cancelled: {}", request->url());
reason = request::DownloadRequest::CompleteReason::Canceled;
}
else if (response.error.code != cpr::ErrorCode::OK)
{
logger_->error("Error downloading file ({}): {}",
response.error.message,
request->url());
reason = request::DownloadRequest::CompleteReason::RemoteError;
}
else if (!ofsGood)
{
logger_->error("File I/O error: {}", destinationPath.string());
reason = request::DownloadRequest::CompleteReason::IOError;
}
std::error_code error;
if (!std::filesystem::remove(destinationPath, error))
{
logger_->error("Unable to remove destination file: {}, {}",
destinationPath.string(),
error.message());
}
Q_EMIT request->RequestComplete(reason);
return;
}
// Handle response
const auto contentMd5 = response.header.find("content-md5");
if (contentMd5 != response.header.cend() && !contentMd5->second.empty())
{
// Open file for reading
std::ifstream is {destinationPath,
std::ios_base::in | std::ios_base::binary};
if (!is.is_open() || !is.good())
{
logger_->error("Unable to open destination file for reading: {}",
destinationPath.string());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
// Compute MD5
std::vector<std::uint8_t> digest {};
if (!util::ComputeDigest(EVP_md5(), is, digest))
{
logger_->error("Failed to compute MD5: {}", destinationPath.string());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::IOError);
return;
}
// Compare calculated MD5 with digest in response header
QByteArray expectedDigestArray =
QByteArray::fromBase64(contentMd5->second.c_str());
std::vector<std::uint8_t> expectedDigest(expectedDigestArray.cbegin(),
expectedDigestArray.cend());
if (digest != expectedDigest)
{
QByteArray calculatedDigest(reinterpret_cast<char*>(digest.data()),
digest.size());
logger_->error("Digest mismatch: {} != {}",
calculatedDigest.toBase64().toStdString(),
contentMd5->second);
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::DigestError);
return;
}
}
logger_->info("Download complete: {}", request->url());
Q_EMIT request->RequestComplete(
request::DownloadRequest::CompleteReason::OK);
}
std::shared_ptr<DownloadManager> DownloadManager::Instance()

View file

@ -157,12 +157,19 @@ PlacefileManager::PlacefileManager() : p(std::make_unique<Impl>(this))
boost::asio::post(p->threadPool_,
[this]()
{
p->InitializePlacefileSettings();
try
{
p->InitializePlacefileSettings();
// Read placefile settings on startup
main::Application::WaitForInitialization();
p->ReadPlacefileSettings();
Q_EMIT PlacefilesInitialized();
// Read placefile settings on startup
main::Application::WaitForInitialization();
p->ReadPlacefileSettings();
Q_EMIT PlacefilesInitialized();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
@ -678,7 +685,7 @@ void PlacefileManager::Impl::PlacefileRecord::ScheduleRefresh()
}
else
{
Update();
UpdateAsync();
}
});
}
@ -691,7 +698,18 @@ void PlacefileManager::Impl::PlacefileRecord::CancelRefresh()
void PlacefileManager::Impl::PlacefileRecord::UpdateAsync()
{
boost::asio::post(threadPool_, [this]() { Update(); });
boost::asio::post(threadPool_,
[this]()
{
try
{
Update();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
std::shared_ptr<PlacefileManager> PlacefileManager::Instance()

View file

@ -196,6 +196,7 @@ public:
std::shared_ptr<ProviderManager> providerManager,
bool enabled);
void RefreshData(std::shared_ptr<ProviderManager> providerManager);
void RefreshDataSync(std::shared_ptr<ProviderManager> providerManager);
std::tuple<std::shared_ptr<types::RadarProductRecord>,
std::chrono::system_clock::time_point>
@ -225,6 +226,8 @@ public:
void PopulateLevel3ProductTimes(const std::string& product,
std::chrono::system_clock::time_point time);
void UpdateAvailableProductsSync();
static void
PopulateProductTimes(std::shared_ptr<ProviderManager> providerManager,
RadarProductRecordMap& productRecordMap,
@ -576,16 +579,23 @@ void RadarProductManager::EnableRefresh(common::RadarProductGroup group,
p->threadPool_,
[=, this]()
{
providerManager->provider_->RequestAvailableProducts();
auto availableProducts =
providerManager->provider_->GetAvailableProducts();
if (std::find(std::execution::par_unseq,
availableProducts.cbegin(),
availableProducts.cend(),
product) != availableProducts.cend())
try
{
p->EnableRefresh(uuid, providerManager, enabled);
providerManager->provider_->RequestAvailableProducts();
auto availableProducts =
providerManager->provider_->GetAvailableProducts();
if (std::find(std::execution::par_unseq,
availableProducts.cbegin(),
availableProducts.cend(),
product) != availableProducts.cend())
{
p->EnableRefresh(uuid, providerManager, enabled);
}
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
@ -664,95 +674,102 @@ void RadarProductManagerImpl::RefreshData(
providerManager->refreshTimer_.cancel();
}
boost::asio::post(
threadPool_,
[=, this]()
boost::asio::post(threadPool_,
[=, this]()
{
try
{
RefreshDataSync(providerManager);
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
void RadarProductManagerImpl::RefreshDataSync(
std::shared_ptr<ProviderManager> providerManager)
{
using namespace std::chrono_literals;
auto [newObjects, totalObjects] = providerManager->provider_->Refresh();
std::chrono::milliseconds interval = kFastRetryInterval_;
if (totalObjects > 0)
{
std::string key = providerManager->provider_->FindLatestKey();
auto latestTime = providerManager->provider_->GetTimePointByKey(key);
auto updatePeriod = providerManager->provider_->update_period();
auto lastModified = providerManager->provider_->last_modified();
auto sinceLastModified = std::chrono::system_clock::now() - lastModified;
// For the default interval, assume products are updated at a
// constant rate. Expect the next product at a time based on the
// previous two.
interval = std::chrono::duration_cast<std::chrono::milliseconds>(
updatePeriod - sinceLastModified);
if (updatePeriod > 0s && sinceLastModified > updatePeriod * 5)
{
using namespace std::chrono_literals;
// If it has been at least 5 update periods since the file has
// been last modified, slow the retry period
interval = kSlowRetryInterval_;
}
else if (interval < std::chrono::milliseconds {kFastRetryInterval_})
{
// The interval should be no quicker than the fast retry interval
interval = kFastRetryInterval_;
}
auto [newObjects, totalObjects] =
providerManager->provider_->Refresh();
if (newObjects > 0)
{
Q_EMIT providerManager->NewDataAvailable(
providerManager->group_, providerManager->product_, latestTime);
}
}
else if (providerManager->refreshEnabled_)
{
logger_->info("[{}] No data found", providerManager->name());
std::chrono::milliseconds interval = kFastRetryInterval_;
// If no data is found, retry at the slow retry interval
interval = kSlowRetryInterval_;
}
if (totalObjects > 0)
{
std::string key = providerManager->provider_->FindLatestKey();
auto latestTime =
providerManager->provider_->GetTimePointByKey(key);
if (providerManager->refreshEnabled_)
{
std::unique_lock lock(providerManager->refreshTimerMutex_);
auto updatePeriod = providerManager->provider_->update_period();
auto lastModified = providerManager->provider_->last_modified();
auto sinceLastModified =
std::chrono::system_clock::now() - lastModified;
logger_->debug(
"[{}] Scheduled refresh in {:%M:%S}",
providerManager->name(),
std::chrono::duration_cast<std::chrono::seconds>(interval));
// For the default interval, assume products are updated at a
// constant rate. Expect the next product at a time based on the
// previous two.
interval = std::chrono::duration_cast<std::chrono::milliseconds>(
updatePeriod - sinceLastModified);
if (updatePeriod > 0s && sinceLastModified > updatePeriod * 5)
{
providerManager->refreshTimer_.expires_after(interval);
providerManager->refreshTimer_.async_wait(
[=, this](const boost::system::error_code& e)
{
// If it has been at least 5 update periods since the file has
// been last modified, slow the retry period
interval = kSlowRetryInterval_;
}
else if (interval < std::chrono::milliseconds {kFastRetryInterval_})
{
// The interval should be no quicker than the fast retry interval
interval = kFastRetryInterval_;
}
if (newObjects > 0)
{
Q_EMIT providerManager->NewDataAvailable(
providerManager->group_,
providerManager->product_,
latestTime);
}
}
else if (providerManager->refreshEnabled_)
{
logger_->info("[{}] No data found", providerManager->name());
// If no data is found, retry at the slow retry interval
interval = kSlowRetryInterval_;
}
if (providerManager->refreshEnabled_)
{
std::unique_lock lock(providerManager->refreshTimerMutex_);
logger_->debug(
"[{}] Scheduled refresh in {:%M:%S}",
providerManager->name(),
std::chrono::duration_cast<std::chrono::seconds>(interval));
{
providerManager->refreshTimer_.expires_after(interval);
providerManager->refreshTimer_.async_wait(
[=, this](const boost::system::error_code& e)
{
if (e == boost::system::errc::success)
{
RefreshData(providerManager);
}
else if (e == boost::asio::error::operation_aborted)
{
logger_->debug("[{}] Data refresh timer cancelled",
providerManager->name());
}
else
{
logger_->warn("[{}] Data refresh timer error: {}",
providerManager->name(),
e.message());
}
});
}
}
});
if (e == boost::system::errc::success)
{
RefreshData(providerManager);
}
else if (e == boost::asio::error::operation_aborted)
{
logger_->debug("[{}] Data refresh timer cancelled",
providerManager->name());
}
else
{
logger_->warn("[{}] Data refresh timer error: {}",
providerManager->name(),
e.message());
}
});
}
}
}
std::set<std::chrono::system_clock::time_point>
@ -1009,7 +1026,16 @@ void RadarProductManagerImpl::LoadNexradFileAsync(
{
boost::asio::post(threadPool_,
[=, &mutex]()
{ LoadNexradFile(load, request, mutex, time); });
{
try
{
LoadNexradFile(load, request, mutex, time);
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
void RadarProductManagerImpl::LoadNexradFile(
@ -1441,64 +1467,73 @@ void RadarProductManager::UpdateAvailableProducts()
logger_->debug("UpdateAvailableProducts()");
boost::asio::post(
p->threadPool_,
[this]()
boost::asio::post(p->threadPool_,
[this]()
{
try
{
p->UpdateAvailableProductsSync();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
void RadarProductManagerImpl::UpdateAvailableProductsSync()
{
auto level3ProviderManager =
GetLevel3ProviderManager(kDefaultLevel3Product_);
level3ProviderManager->provider_->RequestAvailableProducts();
auto updatedAwipsIdList =
level3ProviderManager->provider_->GetAvailableProducts();
std::unique_lock lock {availableCategoryMutex_};
for (common::Level3ProductCategory category :
common::Level3ProductCategoryIterator())
{
const auto& products = common::GetLevel3ProductsByCategory(category);
std::unordered_map<std::string, std::vector<std::string>>
availableProducts;
for (const auto& product : products)
{
auto level3ProviderManager =
p->GetLevel3ProviderManager(kDefaultLevel3Product_);
level3ProviderManager->provider_->RequestAvailableProducts();
auto updatedAwipsIdList =
level3ProviderManager->provider_->GetAvailableProducts();
const auto& awipsIds = common::GetLevel3AwipsIdsByProduct(product);
std::unique_lock lock {p->availableCategoryMutex_};
std::vector<std::string> availableAwipsIds;
for (common::Level3ProductCategory category :
common::Level3ProductCategoryIterator())
for (const auto& awipsId : awipsIds)
{
const auto& products =
common::GetLevel3ProductsByCategory(category);
std::unordered_map<std::string, std::vector<std::string>>
availableProducts;
for (const auto& product : products)
if (std::find(updatedAwipsIdList.cbegin(),
updatedAwipsIdList.cend(),
awipsId) != updatedAwipsIdList.cend())
{
const auto& awipsIds =
common::GetLevel3AwipsIdsByProduct(product);
std::vector<std::string> availableAwipsIds;
for (const auto& awipsId : awipsIds)
{
if (std::find(updatedAwipsIdList.cbegin(),
updatedAwipsIdList.cend(),
awipsId) != updatedAwipsIdList.cend())
{
availableAwipsIds.push_back(awipsId);
}
}
if (!availableAwipsIds.empty())
{
availableProducts.insert_or_assign(
product, std::move(availableAwipsIds));
}
}
if (!availableProducts.empty())
{
p->availableCategoryMap_.insert_or_assign(
category, std::move(availableProducts));
}
else
{
p->availableCategoryMap_.erase(category);
availableAwipsIds.push_back(awipsId);
}
}
Q_EMIT Level3ProductsChanged();
});
if (!availableAwipsIds.empty())
{
availableProducts.insert_or_assign(product,
std::move(availableAwipsIds));
}
}
if (!availableProducts.empty())
{
availableCategoryMap_.insert_or_assign(category,
std::move(availableProducts));
}
else
{
availableCategoryMap_.erase(category);
}
}
Q_EMIT self_->Level3ProductsChanged();
}
std::shared_ptr<RadarProductManager>

View file

@ -50,9 +50,16 @@ public:
boost::asio::post(threadPool_,
[this]()
{
main::Application::WaitForInitialization();
logger_->debug("Start Refresh");
Refresh();
try
{
main::Application::WaitForInitialization();
logger_->debug("Start Refresh");
Refresh();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
@ -70,6 +77,7 @@ public:
}
void HandleMessage(std::shared_ptr<awips::TextProductMessage> message);
void RefreshAsync();
void Refresh();
boost::asio::thread_pool threadPool_ {1u};
@ -131,20 +139,27 @@ void TextEventManager::LoadFile(const std::string& filename)
boost::asio::post(p->threadPool_,
[=, this]()
{
awips::TextProductFile file;
// Load file
bool fileLoaded = file.LoadFile(filename);
if (!fileLoaded)
try
{
return;
awips::TextProductFile file;
// Load file
bool fileLoaded = file.LoadFile(filename);
if (!fileLoaded)
{
return;
}
// Process messages
auto messages = file.messages();
for (auto& message : messages)
{
p->HandleMessage(message);
}
}
// Process messages
auto messages = file.messages();
for (auto& message : messages)
catch (const std::exception& ex)
{
p->HandleMessage(message);
logger_->error(ex.what());
}
});
}
@ -212,6 +227,22 @@ void TextEventManager::Impl::HandleMessage(
}
}
void TextEventManager::Impl::RefreshAsync()
{
boost::asio::post(threadPool_,
[this]()
{
try
{
Refresh();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
void TextEventManager::Impl::Refresh()
{
logger_->trace("Refresh");
@ -257,7 +288,7 @@ void TextEventManager::Impl::Refresh()
}
else
{
Refresh();
RefreshAsync();
}
});
}

View file

@ -69,11 +69,13 @@ public:
void Pause();
void Play();
void PlaySync();
void
SelectTimeAsync(std::chrono::system_clock::time_point selectedTime = {});
std::pair<bool, bool>
SelectTime(std::chrono::system_clock::time_point selectedTime = {});
void StepAsync(Direction direction);
void Step(Direction direction);
boost::asio::thread_pool playThreadPool_ {1};
boost::asio::thread_pool selectThreadPool_ {1};
@ -405,8 +407,6 @@ void TimelineManager::Impl::UpdateCacheLimit(
void TimelineManager::Impl::Play()
{
using namespace std::chrono_literals;
if (animationState_ != types::AnimationState::Play)
{
animationState_ = types::AnimationState::Play;
@ -418,92 +418,105 @@ void TimelineManager::Impl::Play()
animationTimer_.cancel();
}
boost::asio::post(
playThreadPool_,
[this]()
boost::asio::post(playThreadPool_,
[this]()
{
try
{
PlaySync();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
void TimelineManager::Impl::PlaySync()
{
using namespace std::chrono_literals;
// Take a lock for time selection
std::unique_lock lock {selectTimeMutex_};
auto [startTime, endTime] = GetLoopStartAndEndTimes();
std::chrono::system_clock::time_point currentTime = selectedTime_;
std::chrono::system_clock::time_point newTime;
if (currentTime < startTime || currentTime >= endTime)
{
// If the currently selected time is out of the loop, select the
// start time
newTime = startTime;
}
else
{
// If the currently selected time is in the loop, increment
newTime = currentTime + 1min;
}
// Unlock prior to selecting time
lock.unlock();
// Lock radar sweep monitor
std::unique_lock radarSweepMonitorLock {radarSweepMonitorMutex_};
// Reset radar sweep monitor in preparation for update
RadarSweepMonitorReset();
// Select the time
auto selectTimeStart = std::chrono::steady_clock::now();
auto [volumeTimeUpdated, selectedTimeUpdated] = SelectTime(newTime);
auto selectTimeEnd = std::chrono::steady_clock::now();
auto elapsedTime = selectTimeEnd - selectTimeStart;
if (volumeTimeUpdated)
{
// Wait for radar sweeps to update
RadarSweepMonitorWait(radarSweepMonitorLock);
}
else
{
// Disable radar sweep monitor
RadarSweepMonitorDisable();
}
// Calculate the interval until the next update, prior to selecting
std::chrono::milliseconds interval;
if (newTime != endTime)
{
// Determine repeat interval (speed of 1.0 is 1 minute per second)
interval = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::milliseconds(std::lroundl(1000.0 / loopSpeed_)) -
elapsedTime);
}
else
{
// Pause at the end of the loop
interval = std::chrono::duration_cast<std::chrono::milliseconds>(
loopDelay_ - elapsedTime);
}
std::unique_lock animationTimerLock {animationTimerMutex_};
animationTimer_.expires_after(interval);
animationTimer_.async_wait(
[this](const boost::system::error_code& e)
{
// Take a lock for time selection
std::unique_lock lock {selectTimeMutex_};
auto [startTime, endTime] = GetLoopStartAndEndTimes();
std::chrono::system_clock::time_point currentTime = selectedTime_;
std::chrono::system_clock::time_point newTime;
if (currentTime < startTime || currentTime >= endTime)
if (e == boost::system::errc::success)
{
// If the currently selected time is out of the loop, select the
// start time
newTime = startTime;
}
else
{
// If the currently selected time is in the loop, increment
newTime = currentTime + 1min;
}
// Unlock prior to selecting time
lock.unlock();
// Lock radar sweep monitor
std::unique_lock radarSweepMonitorLock {radarSweepMonitorMutex_};
// Reset radar sweep monitor in preparation for update
RadarSweepMonitorReset();
// Select the time
auto selectTimeStart = std::chrono::steady_clock::now();
auto [volumeTimeUpdated, selectedTimeUpdated] = SelectTime(newTime);
auto selectTimeEnd = std::chrono::steady_clock::now();
auto elapsedTime = selectTimeEnd - selectTimeStart;
if (volumeTimeUpdated)
{
// Wait for radar sweeps to update
RadarSweepMonitorWait(radarSweepMonitorLock);
}
else
{
// Disable radar sweep monitor
RadarSweepMonitorDisable();
}
// Calculate the interval until the next update, prior to selecting
std::chrono::milliseconds interval;
if (newTime != endTime)
{
// Determine repeat interval (speed of 1.0 is 1 minute per second)
interval = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::milliseconds(std::lroundl(1000.0 / loopSpeed_)) -
elapsedTime);
}
else
{
// Pause at the end of the loop
interval = std::chrono::duration_cast<std::chrono::milliseconds>(
loopDelay_ - elapsedTime);
}
std::unique_lock animationTimerLock {animationTimerMutex_};
animationTimer_.expires_after(interval);
animationTimer_.async_wait(
[this](const boost::system::error_code& e)
if (animationState_ == types::AnimationState::Play)
{
if (e == boost::system::errc::success)
{
if (animationState_ == types::AnimationState::Play)
{
Play();
}
}
else if (e == boost::asio::error::operation_aborted)
{
logger_->debug("Play timer cancelled");
}
else
{
logger_->warn("Play timer error: {}", e.message());
}
});
Play();
}
}
else if (e == boost::asio::error::operation_aborted)
{
logger_->debug("Play timer cancelled");
}
else
{
logger_->warn("Play timer error: {}", e.message());
}
});
}
@ -511,7 +524,17 @@ void TimelineManager::Impl::SelectTimeAsync(
std::chrono::system_clock::time_point selectedTime)
{
boost::asio::post(selectThreadPool_,
[=, this]() { SelectTime(selectedTime); });
[=, this]()
{
try
{
SelectTime(selectedTime);
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
std::pair<bool, bool> TimelineManager::Impl::SelectTime(
@ -597,91 +620,100 @@ std::pair<bool, bool> TimelineManager::Impl::SelectTime(
void TimelineManager::Impl::StepAsync(Direction direction)
{
boost::asio::post(
selectThreadPool_,
[=, this]()
boost::asio::post(selectThreadPool_,
[=, this]()
{
try
{
Step(direction);
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
void TimelineManager::Impl::Step(Direction direction)
{
// Take a lock for time selection
std::unique_lock lock {selectTimeMutex_};
// Determine time to get active volume times
std::chrono::system_clock::time_point queryTime = adjustedTime_;
if (queryTime == std::chrono::system_clock::time_point {})
{
queryTime = std::chrono::system_clock::now();
}
// Request active volume times
auto radarProductManager =
manager::RadarProductManager::Instance(radarSite_);
auto volumeTimes = radarProductManager->GetActiveVolumeTimes(queryTime);
if (volumeTimes.empty())
{
logger_->debug("No products to step through");
return;
}
// Dynamically update maximum cached volume scans
UpdateCacheLimit(radarProductManager, volumeTimes);
std::set<std::chrono::system_clock::time_point>::const_iterator it;
if (adjustedTime_ == std::chrono::system_clock::time_point {})
{
// If the adjusted time is live, get the last element in the set
it = std::prev(volumeTimes.cend());
}
else
{
// Get the current element in the set
it = scwx::util::GetBoundedElementIterator(volumeTimes, adjustedTime_);
}
if (it == volumeTimes.cend())
{
// Should not get here, but protect against an error
logger_->error("No suitable volume time found");
return;
}
if (direction == Direction::Back)
{
// Only if we aren't at the beginning of the volume times set
if (it != volumeTimes.cbegin())
{
// Take a lock for time selection
std::unique_lock lock {selectTimeMutex_};
// Select the previous time
adjustedTime_ = *(--it);
selectedTime_ = adjustedTime_;
// Determine time to get active volume times
std::chrono::system_clock::time_point queryTime = adjustedTime_;
if (queryTime == std::chrono::system_clock::time_point {})
{
queryTime = std::chrono::system_clock::now();
}
logger_->debug("Volume time updated: {}",
scwx::util::TimeString(adjustedTime_));
// Request active volume times
auto radarProductManager =
manager::RadarProductManager::Instance(radarSite_);
auto volumeTimes =
radarProductManager->GetActiveVolumeTimes(queryTime);
Q_EMIT self_->LiveStateUpdated(false);
Q_EMIT self_->VolumeTimeUpdated(adjustedTime_);
Q_EMIT self_->SelectedTimeUpdated(adjustedTime_);
}
}
else
{
// Only if we aren't at the end of the volume times set
if (it != std::prev(volumeTimes.cend()))
{
// Select the next time
adjustedTime_ = *(++it);
selectedTime_ = adjustedTime_;
if (volumeTimes.empty())
{
logger_->debug("No products to step through");
return;
}
logger_->debug("Volume time updated: {}",
scwx::util::TimeString(adjustedTime_));
// Dynamically update maximum cached volume scans
UpdateCacheLimit(radarProductManager, volumeTimes);
std::set<std::chrono::system_clock::time_point>::const_iterator it;
if (adjustedTime_ == std::chrono::system_clock::time_point {})
{
// If the adjusted time is live, get the last element in the set
it = std::prev(volumeTimes.cend());
}
else
{
// Get the current element in the set
it = scwx::util::GetBoundedElementIterator(volumeTimes,
adjustedTime_);
}
if (it == volumeTimes.cend())
{
// Should not get here, but protect against an error
logger_->error("No suitable volume time found");
return;
}
if (direction == Direction::Back)
{
// Only if we aren't at the beginning of the volume times set
if (it != volumeTimes.cbegin())
{
// Select the previous time
adjustedTime_ = *(--it);
selectedTime_ = adjustedTime_;
logger_->debug("Volume time updated: {}",
scwx::util::TimeString(adjustedTime_));
Q_EMIT self_->LiveStateUpdated(false);
Q_EMIT self_->VolumeTimeUpdated(adjustedTime_);
Q_EMIT self_->SelectedTimeUpdated(adjustedTime_);
}
}
else
{
// Only if we aren't at the end of the volume times set
if (it != std::prev(volumeTimes.cend()))
{
// Select the next time
adjustedTime_ = *(++it);
selectedTime_ = adjustedTime_;
logger_->debug("Volume time updated: {}",
scwx::util::TimeString(adjustedTime_));
Q_EMIT self_->LiveStateUpdated(false);
Q_EMIT self_->VolumeTimeUpdated(adjustedTime_);
Q_EMIT self_->SelectedTimeUpdated(adjustedTime_);
}
}
});
Q_EMIT self_->LiveStateUpdated(false);
Q_EMIT self_->VolumeTimeUpdated(adjustedTime_);
Q_EMIT self_->SelectedTimeUpdated(adjustedTime_);
}
}
}
std::shared_ptr<TimelineManager> TimelineManager::Instance()

View file

@ -326,7 +326,14 @@ void AlertLayerHandler::UpdateAlerts()
}
else
{
UpdateAlerts();
try
{
UpdateAlerts();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
}
});
}

View file

@ -1638,15 +1638,22 @@ void MapWidgetImpl::RadarProductManagerConnect()
threadPool_,
[=, this]()
{
if (group == common::RadarProductGroup::Level2)
try
{
radarProductManager_->LoadLevel2Data(latestTime,
request);
if (group == common::RadarProductGroup::Level2)
{
radarProductManager_->LoadLevel2Data(latestTime,
request);
}
else
{
radarProductManager_->LoadLevel3Data(
product, latestTime, request);
}
}
else
catch (const std::exception& ex)
{
radarProductManager_->LoadLevel3Data(
product, latestTime, request);
logger_->error(ex.what());
}
});
}
@ -1672,22 +1679,30 @@ void MapWidgetImpl::InitializeNewRadarProductView(
boost::asio::post(threadPool_,
[=, this]()
{
auto radarProductView = context_->radar_product_view();
std::string colorTableFile =
settings::PaletteSettings::Instance()
.palette(colorPalette)
.GetValue();
if (!colorTableFile.empty())
try
{
std::unique_ptr<std::istream> colorTableStream =
util::OpenFile(colorTableFile);
std::shared_ptr<common::ColorTable> colorTable =
common::ColorTable::Load(*colorTableStream);
radarProductView->LoadColorTable(colorTable);
}
auto radarProductView =
context_->radar_product_view();
radarProductView->Initialize();
std::string colorTableFile =
settings::PaletteSettings::Instance()
.palette(colorPalette)
.GetValue();
if (!colorTableFile.empty())
{
std::unique_ptr<std::istream> colorTableStream =
util::OpenFile(colorTableFile);
std::shared_ptr<common::ColorTable> colorTable =
common::ColorTable::Load(*colorTableStream);
radarProductView->LoadColorTable(colorTable);
}
radarProductView->Initialize();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
if (map_ != nullptr)

View file

@ -45,6 +45,7 @@ public:
~Impl() { threadPool_.join(); }
void ConnectSignals();
void ReloadDataSync();
boost::asio::thread_pool threadPool_ {1};
@ -170,91 +171,95 @@ void PlacefileLayer::Deinitialize()
void PlacefileLayer::ReloadData()
{
boost::asio::post(
p->threadPool_,
[this]()
boost::asio::post(p->threadPool_,
[this]()
{
try
{
p->ReloadDataSync();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
void PlacefileLayer::Impl::ReloadDataSync()
{
logger_->debug("ReloadData: {}", placefileName_);
std::unique_lock lock {dataMutex_};
std::shared_ptr<manager::PlacefileManager> placefileManager =
manager::PlacefileManager::Instance();
auto placefile = placefileManager->placefile(placefileName_);
if (placefile == nullptr)
{
return;
}
// Start draw items
placefileIcons_->StartIcons();
placefileImages_->StartImages(placefile->name());
placefileLines_->StartLines();
placefilePolygons_->StartPolygons();
placefileTriangles_->StartTriangles();
placefileText_->StartText();
placefileIcons_->SetIconFiles(placefile->icon_files(), placefile->name());
placefileText_->SetFonts(placefileManager->placefile_fonts(placefileName_));
for (auto& drawItem : placefile->GetDrawItems())
{
switch (drawItem->itemType_)
{
logger_->debug("ReloadData: {}", p->placefileName_);
case gr::Placefile::ItemType::Text:
placefileText_->AddText(
std::static_pointer_cast<gr::Placefile::TextDrawItem>(drawItem));
break;
std::unique_lock lock {p->dataMutex_};
case gr::Placefile::ItemType::Icon:
placefileIcons_->AddIcon(
std::static_pointer_cast<gr::Placefile::IconDrawItem>(drawItem));
break;
std::shared_ptr<manager::PlacefileManager> placefileManager =
manager::PlacefileManager::Instance();
case gr::Placefile::ItemType::Line:
placefileLines_->AddLine(
std::static_pointer_cast<gr::Placefile::LineDrawItem>(drawItem));
break;
auto placefile = placefileManager->placefile(p->placefileName_);
if (placefile == nullptr)
{
return;
}
case gr::Placefile::ItemType::Polygon:
placefilePolygons_->AddPolygon(
std::static_pointer_cast<gr::Placefile::PolygonDrawItem>(drawItem));
break;
// Start draw items
p->placefileIcons_->StartIcons();
p->placefileImages_->StartImages(placefile->name());
p->placefileLines_->StartLines();
p->placefilePolygons_->StartPolygons();
p->placefileTriangles_->StartTriangles();
p->placefileText_->StartText();
case gr::Placefile::ItemType::Image:
placefileImages_->AddImage(
std::static_pointer_cast<gr::Placefile::ImageDrawItem>(drawItem));
break;
p->placefileIcons_->SetIconFiles(placefile->icon_files(),
placefile->name());
p->placefileText_->SetFonts(
placefileManager->placefile_fonts(p->placefileName_));
case gr::Placefile::ItemType::Triangles:
placefileTriangles_->AddTriangles(
std::static_pointer_cast<gr::Placefile::TrianglesDrawItem>(
drawItem));
break;
for (auto& drawItem : placefile->GetDrawItems())
{
switch (drawItem->itemType_)
{
case gr::Placefile::ItemType::Text:
p->placefileText_->AddText(
std::static_pointer_cast<gr::Placefile::TextDrawItem>(
drawItem));
break;
default:
break;
}
}
case gr::Placefile::ItemType::Icon:
p->placefileIcons_->AddIcon(
std::static_pointer_cast<gr::Placefile::IconDrawItem>(
drawItem));
break;
// Finish draw items
placefileIcons_->FinishIcons();
placefileImages_->FinishImages();
placefileLines_->FinishLines();
placefilePolygons_->FinishPolygons();
placefileTriangles_->FinishTriangles();
placefileText_->FinishText();
case gr::Placefile::ItemType::Line:
p->placefileLines_->AddLine(
std::static_pointer_cast<gr::Placefile::LineDrawItem>(
drawItem));
break;
case gr::Placefile::ItemType::Polygon:
p->placefilePolygons_->AddPolygon(
std::static_pointer_cast<gr::Placefile::PolygonDrawItem>(
drawItem));
break;
case gr::Placefile::ItemType::Image:
p->placefileImages_->AddImage(
std::static_pointer_cast<gr::Placefile::ImageDrawItem>(
drawItem));
break;
case gr::Placefile::ItemType::Triangles:
p->placefileTriangles_->AddTriangles(
std::static_pointer_cast<gr::Placefile::TrianglesDrawItem>(
drawItem));
break;
default:
break;
}
}
// Finish draw items
p->placefileIcons_->FinishIcons();
p->placefileImages_->FinishImages();
p->placefileLines_->FinishLines();
p->placefilePolygons_->FinishPolygons();
p->placefileTriangles_->FinishTriangles();
p->placefileText_->FinishText();
Q_EMIT DataReloaded();
});
Q_EMIT self_->DataReloaded();
}
} // namespace map

View file

@ -120,7 +120,14 @@ void AlertProxyModelImpl::UpdateAlerts()
}
else
{
UpdateAlerts();
try
{
UpdateAlerts();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
}
});
}

View file

@ -237,10 +237,19 @@ void OverlayProductView::Impl::LoadProduct(
}
// Load file
boost::asio::post(
threadPool_,
[=, this]()
{ radarProductManager_->LoadLevel3Data(product, time, request); });
boost::asio::post(threadPool_,
[=, this]()
{
try
{
radarProductManager_->LoadLevel3Data(
product, time, request);
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
void OverlayProductView::Impl::ResetProducts()

View file

@ -121,7 +121,18 @@ void RadarProductView::SelectTime(std::chrono::system_clock::time_point time)
void RadarProductView::Update()
{
boost::asio::post(thread_pool(), [this]() { ComputeSweep(); });
boost::asio::post(thread_pool(),
[this]()
{
try
{
ComputeSweep();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
});
}
bool RadarProductView::IsInitialized() const