mirror of
https://github.com/ciphervance/supercell-wx.git
synced 2025-10-30 14:10:06 +00:00
working level2 chunks with auto rerendering
This commit is contained in:
parent
7c99bbc185
commit
fc83a7a36f
3 changed files with 232 additions and 125 deletions
|
|
@ -53,6 +53,7 @@ public:
|
|||
std::shared_ptr<wsr88d::Ar2vFile> nexradFile_;
|
||||
std::chrono::system_clock::time_point time_;
|
||||
std::chrono::system_clock::time_point lastModified_;
|
||||
std::chrono::system_clock::time_point secondLastModified_;
|
||||
std::string lastKey_;
|
||||
int nextFile_ {1};
|
||||
bool hasAllFiles_ {false};
|
||||
|
|
@ -70,7 +71,8 @@ public:
|
|||
lastScan_ {"", false},
|
||||
currentScan_ {"", false},
|
||||
scansMutex_ {},
|
||||
updatePeriod_ {15},
|
||||
lastTimeListed_ {},
|
||||
updatePeriod_ {7},
|
||||
self_ {self}
|
||||
{
|
||||
// Disable HTTP request for region
|
||||
|
|
@ -96,11 +98,14 @@ public:
|
|||
Impl& operator=(Impl&&) = delete;
|
||||
|
||||
std::chrono::system_clock::time_point GetScanTime(const std::string& prefix);
|
||||
std::string GetScanKey(const std::string& prefix,
|
||||
const std::chrono::system_clock::time_point& time,
|
||||
int last);
|
||||
std::shared_ptr<wsr88d::NexradFile> LoadScan(Impl::ScanRecord& scanRecord);
|
||||
int GetScanNumber(const std::string& prefix);
|
||||
int GetScanNumber(const std::string& prefix);
|
||||
std::string GetScanKey(const std::string& prefix,
|
||||
const std::chrono::system_clock::time_point& time,
|
||||
int last);
|
||||
|
||||
bool LoadScan(Impl::ScanRecord& scanRecord);
|
||||
std::tuple<bool, size_t, size_t> ListObjects();
|
||||
|
||||
|
||||
std::string radarSite_;
|
||||
std::string bucketName_;
|
||||
|
|
@ -110,10 +115,11 @@ public:
|
|||
std::mutex refreshMutex_;
|
||||
|
||||
std::unordered_map<std::string, std::chrono::system_clock::time_point>
|
||||
scanTimes_;
|
||||
ScanRecord lastScan_;
|
||||
ScanRecord currentScan_;
|
||||
std::shared_mutex scansMutex_;
|
||||
scanTimes_;
|
||||
ScanRecord lastScan_;
|
||||
ScanRecord currentScan_;
|
||||
std::shared_mutex scansMutex_;
|
||||
std::chrono::system_clock::time_point lastTimeListed_;
|
||||
|
||||
std::chrono::seconds updatePeriod_;
|
||||
|
||||
|
|
@ -187,6 +193,24 @@ AwsLevel2ChunksDataProvider::last_modified() const
|
|||
}
|
||||
std::chrono::seconds AwsLevel2ChunksDataProvider::update_period() const
|
||||
{
|
||||
std::shared_lock lock(p->scansMutex_);
|
||||
// Add an extra second of delay
|
||||
static const auto extra = std::chrono::seconds(1);
|
||||
// get update period from time between chunks
|
||||
if (p->currentScan_.valid_ && p->currentScan_.nextFile_ > 2)
|
||||
{
|
||||
auto delta =
|
||||
p->currentScan_.lastModified_ - p->currentScan_.secondLastModified_;
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(delta) + extra;
|
||||
}
|
||||
else if (p->lastScan_.valid_ && p->lastScan_.nextFile_ > 2)
|
||||
{
|
||||
auto delta =
|
||||
p->lastScan_.lastModified_ - p->lastScan_.secondLastModified_;
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(delta) + extra;
|
||||
}
|
||||
|
||||
// default to a set update period
|
||||
return p->updatePeriod_;
|
||||
}
|
||||
|
||||
|
|
@ -241,10 +265,16 @@ AwsLevel2ChunksDataProvider::GetTimePointsByDate(
|
|||
std::chrono::system_clock::time_point
|
||||
AwsLevel2ChunksDataProvider::Impl::GetScanTime(const std::string& prefix)
|
||||
{
|
||||
using namespace std::chrono;
|
||||
const auto& scanTimeIt = scanTimes_.find(prefix); // O(log(n))
|
||||
if (scanTimeIt != scanTimes_.cend())
|
||||
{
|
||||
return scanTimeIt->second;
|
||||
// If the time is greater than 2 hours ago, it may be a new scan
|
||||
auto replaceBy = system_clock::now() - hours {2};
|
||||
if (scanTimeIt->second > replaceBy)
|
||||
{
|
||||
return scanTimeIt->second;
|
||||
}
|
||||
}
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
|
|
@ -258,6 +288,7 @@ AwsLevel2ChunksDataProvider::Impl::GetScanTime(const std::string& prefix)
|
|||
{
|
||||
auto timePoint = self_->GetTimePointByKey(
|
||||
outcome.GetResult().GetContents().at(0).GetKey());
|
||||
scanTimes_.insert_or_assign(prefix, timePoint);
|
||||
return timePoint;
|
||||
}
|
||||
|
||||
|
|
@ -277,6 +308,135 @@ std::string AwsLevel2ChunksDataProvider::Impl::GetScanKey(
|
|||
"{0}/{1:%Y%m%d-%H%M%S}-{2}", prefix, fmt::gmtime(time), last - 1);
|
||||
}
|
||||
|
||||
std::tuple<bool, size_t, size_t>
|
||||
AwsLevel2ChunksDataProvider::Impl::ListObjects()
|
||||
{
|
||||
size_t newObjects = 0;
|
||||
size_t totalObjects = 0;
|
||||
|
||||
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
|
||||
|
||||
if (currentScan_.valid_ && !currentScan_.hasAllFiles_ &&
|
||||
lastTimeListed_ + std::chrono::minutes(7) > now)
|
||||
{
|
||||
return {true, newObjects, totalObjects};
|
||||
}
|
||||
logger_->debug("ListObjects");
|
||||
lastTimeListed_ = now;
|
||||
|
||||
const std::string prefix = radarSite_ + "/";
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
request.SetBucket(bucketName_);
|
||||
request.SetPrefix(prefix);
|
||||
request.SetDelimiter("/");
|
||||
|
||||
auto outcome = client_->ListObjectsV2(request);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
auto& scans = outcome.GetResult().GetCommonPrefixes();
|
||||
logger_->debug("Found {} scans", scans.size());
|
||||
|
||||
if (scans.size() > 0)
|
||||
{
|
||||
// find latest scan
|
||||
auto scanNumberMap = std::map<int, std::string>();
|
||||
|
||||
for (auto& scan : scans) // O(n log(n)) n <= 999
|
||||
{
|
||||
const std::string& scanPrefix = scan.GetPrefix();
|
||||
scanNumberMap.insert_or_assign(GetScanNumber(scanPrefix),
|
||||
scanPrefix);
|
||||
}
|
||||
|
||||
// TODO ensure not out of range
|
||||
int lastScanNumber = -1;
|
||||
// Start with last scan
|
||||
int previousScanNumber = scanNumberMap.crbegin()->first;
|
||||
const int firstScanNumber = scanNumberMap.cbegin()->first;
|
||||
|
||||
// This indicates that highest number scan is the last scan
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers)
|
||||
if (previousScanNumber != 999 || firstScanNumber != 1)
|
||||
{
|
||||
lastScanNumber = previousScanNumber;
|
||||
}
|
||||
else
|
||||
{
|
||||
// have already checked scan with highest number, so skip first
|
||||
previousScanNumber = firstScanNumber;
|
||||
bool first = true;
|
||||
for (const auto& scan : scanNumberMap)
|
||||
{
|
||||
if (first)
|
||||
{
|
||||
first = false;
|
||||
continue;
|
||||
}
|
||||
if (scan.first != previousScanNumber + 1)
|
||||
{
|
||||
lastScanNumber = previousScanNumber;
|
||||
break;
|
||||
}
|
||||
previousScanNumber = scan.first;
|
||||
}
|
||||
}
|
||||
|
||||
if (lastScanNumber == -1)
|
||||
{
|
||||
logger_->warn("Could not find last scan");
|
||||
// TODO make sure this makes sence
|
||||
return {false, 0, 0};
|
||||
}
|
||||
|
||||
std::string& lastScanPrefix = scanNumberMap.at(lastScanNumber);
|
||||
int secondLastScanNumber =
|
||||
lastScanNumber == 1 ? 999 : lastScanNumber - 1;
|
||||
|
||||
const auto& secondLastScanPrefix =
|
||||
scanNumberMap.find(secondLastScanNumber);
|
||||
|
||||
if (!currentScan_.valid_ ||
|
||||
currentScan_.prefix_ != lastScanPrefix)
|
||||
{
|
||||
if (currentScan_.valid_ &&
|
||||
(secondLastScanPrefix == scanNumberMap.cend() ||
|
||||
currentScan_.prefix_ == secondLastScanPrefix->second))
|
||||
{
|
||||
lastScan_ = currentScan_;
|
||||
}
|
||||
else if (secondLastScanPrefix != scanNumberMap.cend())
|
||||
{
|
||||
lastScan_.valid_ = true;
|
||||
lastScan_.prefix_ = secondLastScanPrefix->second;
|
||||
lastScan_.nexradFile_ = nullptr;
|
||||
lastScan_.time_ = GetScanTime(secondLastScanPrefix->second);
|
||||
lastScan_.lastModified_ = {};
|
||||
lastScan_.secondLastModified_ = {};
|
||||
lastScan_.lastKey_ = "";
|
||||
lastScan_.nextFile_ = 1;
|
||||
lastScan_.hasAllFiles_ = false;
|
||||
newObjects += 1;
|
||||
}
|
||||
|
||||
currentScan_.valid_ = true;
|
||||
currentScan_.prefix_ = lastScanPrefix;
|
||||
currentScan_.nexradFile_ = nullptr;
|
||||
currentScan_.time_ = GetScanTime(lastScanPrefix);
|
||||
currentScan_.lastModified_ = {};
|
||||
currentScan_.secondLastModified_ = {};
|
||||
currentScan_.lastKey_ = "";
|
||||
currentScan_.nextFile_ = 1;
|
||||
currentScan_.hasAllFiles_ = false;
|
||||
newObjects += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return {true, newObjects, totalObjects};
|
||||
}
|
||||
|
||||
std::tuple<bool, size_t, size_t>
|
||||
AwsLevel2ChunksDataProvider::ListObjects(std::chrono::system_clock::time_point)
|
||||
{
|
||||
|
|
@ -289,16 +449,16 @@ AwsLevel2ChunksDataProvider::LoadObjectByKey(const std::string& /*prefix*/)
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr<wsr88d::NexradFile>
|
||||
AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord)
|
||||
bool AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord)
|
||||
{
|
||||
if (!scanRecord.valid_)
|
||||
{
|
||||
return nullptr;
|
||||
logger_->warn("Tried to load scan which was not listed yet");
|
||||
return false;
|
||||
}
|
||||
else if (scanRecord.hasAllFiles_)
|
||||
{
|
||||
return scanRecord.nexradFile_;
|
||||
return false;
|
||||
}
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request listRequest;
|
||||
|
|
@ -314,19 +474,23 @@ AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord)
|
|||
if (!listOutcome.IsSuccess())
|
||||
{
|
||||
logger_->warn("Could not find scan at {}", scanRecord.prefix_);
|
||||
return nullptr;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool hasNew = false;
|
||||
auto& chunks = listOutcome.GetResult().GetContents();
|
||||
for (const auto& chunk : chunks)
|
||||
{
|
||||
const std::string& key = chunk.GetKey();
|
||||
|
||||
// TODO this is wrong, 1st number can be 1-3 digits
|
||||
// We just want the number of this chunk for now
|
||||
// KIND/585/20250324-134727-001-S
|
||||
static const size_t startNumberPos =
|
||||
std::string("KIND/585/20250324-134727-").size();
|
||||
// KIND/5/20250324-134727-001-S
|
||||
static const size_t firstSlash = std::string("KIND/").size();
|
||||
const size_t secondSlash = key.find('/', firstSlash);
|
||||
static const size_t startNumberPosOffset =
|
||||
std::string("/20250324-134727-").size();
|
||||
const size_t startNumberPos =
|
||||
secondSlash + startNumberPosOffset;
|
||||
const std::string& keyNumberStr = key.substr(startNumberPos, 3);
|
||||
const int keyNumber = std::stoi(keyNumberStr);
|
||||
if (keyNumber != scanRecord.nextFile_)
|
||||
|
|
@ -334,12 +498,11 @@ AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord)
|
|||
continue;
|
||||
}
|
||||
|
||||
// TODO this is wrong, 1st number can be 1-3 digits
|
||||
// Now we want the ending char
|
||||
// KIND/585/20250324-134727-001-S
|
||||
static const size_t charPos =
|
||||
std::string("KIND/585/20250324-134727-001-").size();
|
||||
const char keyChar = key[charPos];
|
||||
std::string("/20250324-134727-001-").size();
|
||||
const char keyChar = key[secondSlash + charPos];
|
||||
|
||||
Aws::S3::Model::GetObjectRequest objectRequest;
|
||||
objectRequest.SetBucket(bucketName_);
|
||||
|
|
@ -351,7 +514,7 @@ AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord)
|
|||
{
|
||||
logger_->warn("Could not get object: {}",
|
||||
outcome.GetError().GetMessage());
|
||||
return nullptr;
|
||||
return hasNew;
|
||||
}
|
||||
|
||||
auto& body = outcome.GetResultWithOwnership().GetBody();
|
||||
|
|
@ -364,7 +527,7 @@ AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord)
|
|||
if (!scanRecord.nexradFile_->LoadData(body))
|
||||
{
|
||||
logger_->warn("Failed to load first chunk");
|
||||
return nullptr;
|
||||
return hasNew;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
@ -373,7 +536,7 @@ AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord)
|
|||
if (!scanRecord.nexradFile_->LoadLDMRecords(body))
|
||||
{
|
||||
logger_->warn("Failed to load middle chunk");
|
||||
return nullptr;
|
||||
return hasNew;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
@ -382,31 +545,38 @@ AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord)
|
|||
if (!scanRecord.nexradFile_->LoadLDMRecords(body))
|
||||
{
|
||||
logger_->warn("Failed to load last chunk");
|
||||
return nullptr;
|
||||
return hasNew;
|
||||
}
|
||||
scanRecord.hasAllFiles_ = true;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return nullptr;
|
||||
logger_->warn("Could not load chunk with unknown char");
|
||||
return hasNew;
|
||||
}
|
||||
hasNew = true;
|
||||
|
||||
std::chrono::seconds lastModifiedSeconds {
|
||||
outcome.GetResult().GetLastModified().Seconds()};
|
||||
std::chrono::system_clock::time_point lastModified {lastModifiedSeconds};
|
||||
|
||||
scanRecord.secondLastModified_ = scanRecord.lastModified_;
|
||||
scanRecord.lastModified_ = lastModified;
|
||||
|
||||
scanRecord.nextFile_ += 1;
|
||||
scanRecord.lastKey_ = key;
|
||||
}
|
||||
|
||||
if (scanRecord.nexradFile_ != nullptr)
|
||||
if (scanRecord.nexradFile_ == nullptr)
|
||||
{
|
||||
logger_->warn("Could not load file");
|
||||
}
|
||||
else
|
||||
{
|
||||
scanRecord.nexradFile_->IndexFile();
|
||||
}
|
||||
|
||||
return scanRecord.nexradFile_;
|
||||
return hasNew;
|
||||
}
|
||||
|
||||
std::shared_ptr<wsr88d::NexradFile>
|
||||
|
|
@ -417,14 +587,15 @@ AwsLevel2ChunksDataProvider::LoadObjectByTime(
|
|||
|
||||
if (p->currentScan_.valid_ && time >= p->currentScan_.time_)
|
||||
{
|
||||
return p->LoadScan(p->currentScan_);
|
||||
return p->currentScan_.nexradFile_;
|
||||
}
|
||||
else if (p->lastScan_.valid_ && time >= p->lastScan_.time_)
|
||||
{
|
||||
return p->LoadScan(p->lastScan_);
|
||||
return p->lastScan_.nexradFile_;
|
||||
}
|
||||
else
|
||||
{
|
||||
logger_->warn("Could not find scan with time");
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
|
@ -440,8 +611,8 @@ int AwsLevel2ChunksDataProvider::Impl::GetScanNumber(const std::string& prefix)
|
|||
|
||||
// We just want the number of this chunk for now
|
||||
// KIND/585/20250324-134727-001-S
|
||||
static const size_t startNumberPos = std::string("KIND/").size();
|
||||
const std::string& prefixNumberStr = prefix.substr(startNumberPos, 3);
|
||||
static const size_t firstSlash = std::string("KIND/").size();
|
||||
const std::string& prefixNumberStr = prefix.substr(firstSlash, 3);
|
||||
return std::stoi(prefixNumberStr);
|
||||
}
|
||||
|
||||
|
|
@ -449,106 +620,35 @@ std::pair<size_t, size_t> AwsLevel2ChunksDataProvider::Refresh()
|
|||
{
|
||||
using namespace std::chrono;
|
||||
|
||||
boost::timer::cpu_timer timer {};
|
||||
timer.start();
|
||||
|
||||
std::unique_lock lock(p->refreshMutex_);
|
||||
std::unique_lock scanLock(p->scansMutex_);
|
||||
|
||||
|
||||
size_t newObjects = 0;
|
||||
size_t totalObjects = 0;
|
||||
|
||||
const std::string prefix = p->radarSite_ + "/";
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
request.SetBucket(p->bucketName_);
|
||||
request.SetPrefix(prefix);
|
||||
request.SetDelimiter("/");
|
||||
|
||||
auto outcome = p->client_->ListObjectsV2(request);
|
||||
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
auto& scans = outcome.GetResult().GetCommonPrefixes();
|
||||
logger_->debug("Found {} scans", scans.size());
|
||||
|
||||
boost::timer::cpu_timer timer {};
|
||||
timer.start();
|
||||
if (scans.size() > 0)
|
||||
{
|
||||
|
||||
// TODO this cannot be done by getting things form the network.
|
||||
// Use index number instead.
|
||||
|
||||
// find latest scan
|
||||
std::chrono::system_clock::time_point latestTime = {};
|
||||
std::chrono::system_clock::time_point secondLatestTime = {};
|
||||
size_t latestIndex = 0;
|
||||
size_t secondLatestIndex = 0;
|
||||
|
||||
|
||||
for (size_t i = 0; i < scans.size(); i++) // O(n log(n)) n <= 999
|
||||
{
|
||||
auto time = p->GetScanTime(scans[i].GetPrefix());
|
||||
if (time > latestTime)
|
||||
{
|
||||
secondLatestTime = latestTime;
|
||||
latestTime = time;
|
||||
secondLatestIndex = latestIndex;
|
||||
latestIndex = i;
|
||||
}
|
||||
}
|
||||
|
||||
const auto& last = scans.at(secondLatestIndex).GetPrefix();
|
||||
if (secondLatestTime != std::chrono::system_clock::time_point {})
|
||||
{
|
||||
p->lastScan_ = p->currentScan_;
|
||||
}
|
||||
else if (!p->lastScan_.valid_ || p->lastScan_.prefix_ != last)
|
||||
{
|
||||
p->lastScan_.valid_ = true;
|
||||
p->lastScan_.prefix_ = last;
|
||||
p->lastScan_.nexradFile_ = nullptr;
|
||||
p->lastScan_.time_ = secondLatestTime;
|
||||
p->lastScan_.lastModified_ = {};
|
||||
p->lastScan_.lastKey_ = "";
|
||||
p->lastScan_.nextFile_ = 1;
|
||||
p->lastScan_.hasAllFiles_ = false;
|
||||
newObjects += 1;
|
||||
}
|
||||
|
||||
const auto& current = scans.at(latestIndex).GetPrefix();
|
||||
if (!p->currentScan_.valid_ || p->currentScan_.prefix_ != current)
|
||||
{
|
||||
p->currentScan_.valid_ = true;
|
||||
p->currentScan_.prefix_ = current;
|
||||
p->currentScan_.nexradFile_ = nullptr;
|
||||
p->currentScan_.time_ = latestTime;
|
||||
p->currentScan_.lastModified_ = {};
|
||||
p->currentScan_.lastKey_ = "";
|
||||
p->currentScan_.nextFile_ = 1;
|
||||
p->currentScan_.hasAllFiles_ = false;
|
||||
newObjects += 1;
|
||||
}
|
||||
}
|
||||
|
||||
timer.stop();
|
||||
logger_->debug("Updated current scans in {}", timer.format(6, "%ws"));
|
||||
}
|
||||
|
||||
logger_->debug("Loading scans");
|
||||
auto [success, newObjects, totalObjects] = p->ListObjects();
|
||||
|
||||
if (p->currentScan_.valid_)
|
||||
{
|
||||
p->LoadScan(p->currentScan_);
|
||||
if (p->LoadScan(p->currentScan_))
|
||||
{
|
||||
newObjects += 1;
|
||||
}
|
||||
totalObjects += 1;
|
||||
}
|
||||
if (p->lastScan_.valid_)
|
||||
{
|
||||
p->LoadScan(p->lastScan_);
|
||||
/*
|
||||
if (p->LoadScan(p->lastScan_))
|
||||
{
|
||||
newObjects += 1;
|
||||
}
|
||||
*/
|
||||
totalObjects += 1;
|
||||
}
|
||||
|
||||
|
||||
timer.stop();
|
||||
logger_->debug("Refresh() in {}", timer.format(6, "%ws"));
|
||||
return std::make_pair(newObjects, totalObjects);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue