Fix crash when switching radar sites while loading data from the old site

- Use thread pools owned by radar product manager, unless called statically (#51)
This commit is contained in:
Dan Paulat 2023-06-20 20:12:57 -05:00
parent 16e3d1533f
commit 081b626855

View file

@ -19,7 +19,9 @@
# pragma warning(push, 0) # pragma warning(push, 0)
#endif #endif
#include <boost/asio/post.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/container_hash/hash.hpp> #include <boost/container_hash/hash.hpp>
#include <boost/range/irange.hpp> #include <boost/range/irange.hpp>
#include <boost/timer/timer.hpp> #include <boost/timer/timer.hpp>
@ -91,7 +93,7 @@ public:
group_ {group}, group_ {group},
product_ {product}, product_ {product},
refreshEnabled_ {false}, refreshEnabled_ {false},
refreshTimer_ {scwx::util::io_context()}, refreshTimer_ {threadPool_},
refreshTimerMutex_ {}, refreshTimerMutex_ {},
provider_ {nullptr} provider_ {nullptr}
{ {
@ -106,6 +108,8 @@ public:
void Disable(); void Disable();
boost::asio::thread_pool threadPool_ {1u};
const std::string radarId_; const std::string radarId_;
const common::RadarProductGroup group_; const common::RadarProductGroup group_;
const std::string product_; const std::string product_;
@ -179,6 +183,8 @@ public:
RadarProductManager* self_; RadarProductManager* self_;
boost::asio::thread_pool threadPool_ {4u};
std::shared_ptr<ProviderManager> std::shared_ptr<ProviderManager>
GetLevel3ProviderManager(const std::string& product); GetLevel3ProviderManager(const std::string& product);
@ -199,6 +205,10 @@ public:
void UpdateRecentRecords(RadarProductRecordList& recentList, void UpdateRecentRecords(RadarProductRecordList& recentList,
std::shared_ptr<types::RadarProductRecord> record); std::shared_ptr<types::RadarProductRecord> record);
void LoadNexradFileAsync(CreateNexradFileFunction load,
std::shared_ptr<request::NexradFileRequest> request,
std::mutex& mutex,
std::chrono::system_clock::time_point time);
void LoadProviderData(std::chrono::system_clock::time_point time, void LoadProviderData(std::chrono::system_clock::time_point time,
std::shared_ptr<ProviderManager> providerManager, std::shared_ptr<ProviderManager> providerManager,
RadarProductRecordMap& recordMap, RadarProductRecordMap& recordMap,
@ -523,7 +533,8 @@ void RadarProductManager::EnableRefresh(common::RadarProductGroup group,
p->GetLevel3ProviderManager(product); p->GetLevel3ProviderManager(product);
// Only enable refresh on available products // Only enable refresh on available products
scwx::util::async( boost::asio::post(
p->threadPool_,
[=, this]() [=, this]()
{ {
providerManager->provider_->RequestAvailableProducts(); providerManager->provider_->RequestAvailableProducts();
@ -614,7 +625,8 @@ void RadarProductManagerImpl::RefreshData(
providerManager->refreshTimer_.cancel(); providerManager->refreshTimer_.cancel();
} }
scwx::util::async( boost::asio::post(
threadPool_,
[=, this]() [=, this]()
{ {
auto [newObjects, totalObjects] = auto [newObjects, totalObjects] =
@ -774,9 +786,8 @@ void RadarProductManagerImpl::LoadProviderData(
providerManager->name(), providerManager->name(),
scwx::util::TimeString(time)); scwx::util::TimeString(time));
RadarProductManagerImpl::LoadNexradFile( LoadNexradFileAsync(
[=, &recordMap, &recordMutex, &loadDataMutex]() [=, &recordMap, &recordMutex]() -> std::shared_ptr<wsr88d::NexradFile>
-> std::shared_ptr<wsr88d::NexradFile>
{ {
std::shared_ptr<types::RadarProductRecord> existingRecord = nullptr; std::shared_ptr<types::RadarProductRecord> existingRecord = nullptr;
std::shared_ptr<wsr88d::NexradFile> nexradFile = nullptr; std::shared_ptr<wsr88d::NexradFile> nexradFile = nullptr;
@ -874,11 +885,15 @@ void RadarProductManager::LoadData(
{ {
logger_->debug("LoadData()"); logger_->debug("LoadData()");
RadarProductManagerImpl::LoadNexradFile( scwx::util::async(
[=, &is]() -> std::shared_ptr<wsr88d::NexradFile> [=, &is]()
{ return wsr88d::NexradFileFactory::Create(is); }, {
request, RadarProductManagerImpl::LoadNexradFile(
fileLoadMutex_); [=, &is]() -> std::shared_ptr<wsr88d::NexradFile>
{ return wsr88d::NexradFileFactory::Create(is); },
request,
fileLoadMutex_);
});
} }
void RadarProductManager::LoadFile( void RadarProductManager::LoadFile(
@ -915,11 +930,15 @@ void RadarProductManager::LoadFile(
} }
}); });
RadarProductManagerImpl::LoadNexradFile( scwx::util::async(
[=]() -> std::shared_ptr<wsr88d::NexradFile> [=]()
{ return wsr88d::NexradFileFactory::Create(filename); }, {
request, RadarProductManagerImpl::LoadNexradFile(
fileLoadMutex_); [=]() -> std::shared_ptr<wsr88d::NexradFile>
{ return wsr88d::NexradFileFactory::Create(filename); },
request,
fileLoadMutex_);
});
} }
else if (request != nullptr) else if (request != nullptr)
{ {
@ -928,51 +947,58 @@ void RadarProductManager::LoadFile(
} }
} }
void RadarProductManagerImpl::LoadNexradFileAsync(
CreateNexradFileFunction load,
std::shared_ptr<request::NexradFileRequest> request,
std::mutex& mutex,
std::chrono::system_clock::time_point time)
{
boost::asio::post(threadPool_,
[=, &mutex]()
{ LoadNexradFile(load, request, mutex, time); });
}
void RadarProductManagerImpl::LoadNexradFile( void RadarProductManagerImpl::LoadNexradFile(
CreateNexradFileFunction load, CreateNexradFileFunction load,
std::shared_ptr<request::NexradFileRequest> request, std::shared_ptr<request::NexradFileRequest> request,
std::mutex& mutex, std::mutex& mutex,
std::chrono::system_clock::time_point time) std::chrono::system_clock::time_point time)
{ {
scwx::util::async( std::unique_lock lock {mutex};
[=, &mutex]()
std::shared_ptr<wsr88d::NexradFile> nexradFile = load();
std::shared_ptr<types::RadarProductRecord> record = nullptr;
bool fileValid = (nexradFile != nullptr);
if (fileValid)
{
record = types::RadarProductRecord::Create(nexradFile);
// If the time is already determined, override the time in the file.
// Sometimes, level 2 data has been seen to be a few seconds off
// between filename and file data. Overriding this can help prevent
// issues with locating and storing the correct records.
if (time != std::chrono::system_clock::time_point {})
{ {
std::unique_lock lock {mutex}; record->set_time(time);
}
std::shared_ptr<wsr88d::NexradFile> nexradFile = load(); std::shared_ptr<RadarProductManager> manager =
RadarProductManager::Instance(record->radar_id());
std::shared_ptr<types::RadarProductRecord> record = nullptr; manager->Initialize();
record = manager->p->StoreRadarProductRecord(record);
}
bool fileValid = (nexradFile != nullptr); lock.unlock();
if (fileValid) if (request != nullptr)
{ {
record = types::RadarProductRecord::Create(nexradFile); request->set_radar_product_record(record);
Q_EMIT request->RequestComplete(request);
// If the time is already determined, override the time in the file. }
// Sometimes, level 2 data has been seen to be a few seconds off
// between filename and file data. Overriding this can help prevent
// issues with locating and storing the correct records.
if (time != std::chrono::system_clock::time_point {})
{
record->set_time(time);
}
std::shared_ptr<RadarProductManager> manager =
RadarProductManager::Instance(record->radar_id());
manager->Initialize();
record = manager->p->StoreRadarProductRecord(record);
}
lock.unlock();
if (request != nullptr)
{
request->set_radar_product_record(record);
Q_EMIT request->RequestComplete(request);
}
});
} }
void RadarProductManagerImpl::PopulateLevel2ProductTimes( void RadarProductManagerImpl::PopulateLevel2ProductTimes(
@ -1349,7 +1375,8 @@ void RadarProductManager::UpdateAvailableProducts()
logger_->debug("UpdateAvailableProducts()"); logger_->debug("UpdateAvailableProducts()");
scwx::util::async( boost::asio::post(
p->threadPool_,
[this]() [this]()
{ {
auto level3ProviderManager = auto level3ProviderManager =