diff --git a/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp b/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp index 5dccf149..236b97ee 100644 --- a/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp +++ b/wxdata/source/scwx/provider/aws_level2_chunks_data_provider.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -33,11 +34,12 @@ class AwsLevel2ChunksDataProvider::Impl public: struct ScanRecord { - explicit ScanRecord(std::string prefix) : + explicit ScanRecord(std::string prefix, bool valid = true) : + valid_ {valid}, prefix_ {std::move(prefix)}, nexradFile_ {}, lastModified_ {}, - secondLastModified_ {} + lastKey_ {""} { } ~ScanRecord() = default; @@ -46,10 +48,12 @@ public: ScanRecord& operator=(const ScanRecord&) = default; ScanRecord& operator=(ScanRecord&&) = default; + bool valid_; std::string prefix_; std::shared_ptr nexradFile_; + std::chrono::system_clock::time_point time_; std::chrono::system_clock::time_point lastModified_; - std::chrono::system_clock::time_point secondLastModified_; + std::string lastKey_; int nextFile_ {1}; bool hasAllFiles_ {false}; }; @@ -62,10 +66,11 @@ public: bucketName_ {std::move(bucketName)}, region_ {std::move(region)}, client_ {nullptr}, - scans_ {}, + scanTimes_ {}, + lastScan_ {"", false}, + currentScan_ {"", false}, scansMutex_ {}, - lastModified_ {}, - updatePeriod_ {}, + updatePeriod_ {15}, self_ {self} { // Disable HTTP request for region @@ -95,6 +100,7 @@ public: const std::chrono::system_clock::time_point& time, int last); std::shared_ptr LoadScan(Impl::ScanRecord& scanRecord); + int GetScanNumber(const std::string& prefix); std::string radarSite_; std::string bucketName_; @@ -103,11 +109,13 @@ public: std::mutex refreshMutex_; - std::map scans_; - std::shared_mutex scansMutex_; + std::unordered_map + scanTimes_; + ScanRecord lastScan_; + ScanRecord currentScan_; + std::shared_mutex scansMutex_; - std::chrono::system_clock::time_point lastModified_; - std::chrono::seconds updatePeriod_; + std::chrono::seconds updatePeriod_; AwsLevel2ChunksDataProvider* self_; }; @@ -169,13 +177,13 @@ AwsLevel2ChunksDataProvider::GetTimePointByKey(const std::string& key) const size_t AwsLevel2ChunksDataProvider::cache_size() const { - return p->scans_.size(); + return 2; } std::chrono::system_clock::time_point AwsLevel2ChunksDataProvider::last_modified() const { - return p->lastModified_; + return p->currentScan_.lastModified_; } std::chrono::seconds AwsLevel2ChunksDataProvider::update_period() const { @@ -188,12 +196,13 @@ AwsLevel2ChunksDataProvider::FindKey(std::chrono::system_clock::time_point time) logger_->debug("FindKey: {}", util::TimeString(time)); std::shared_lock lock(p->scansMutex_); - - auto element = util::GetBoundedElement(p->scans_, time); - - if (element.has_value()) + if (p->currentScan_.valid_ && time >= p->currentScan_.time_) { - return element->prefix_; + return p->currentScan_.prefix_; + } + else if (p->lastScan_.valid_ && time >= p->lastScan_.time_) + { + return p->lastScan_.prefix_; } return {}; @@ -202,24 +211,24 @@ AwsLevel2ChunksDataProvider::FindKey(std::chrono::system_clock::time_point time) std::string AwsLevel2ChunksDataProvider::FindLatestKey() { std::shared_lock lock(p->scansMutex_); - if (p->scans_.empty()) + if (!p->currentScan_.valid_) { return ""; } - return p->scans_.crbegin()->second.prefix_; + return p->currentScan_.prefix_; } std::chrono::system_clock::time_point AwsLevel2ChunksDataProvider::FindLatestTime() { std::shared_lock lock(p->scansMutex_); - if (p->scans_.empty()) + if (!p->currentScan_.valid_) { return {}; } - return p->scans_.crbegin()->first; + return p->currentScan_.time_; } std::vector @@ -232,6 +241,12 @@ AwsLevel2ChunksDataProvider::GetTimePointsByDate( std::chrono::system_clock::time_point AwsLevel2ChunksDataProvider::Impl::GetScanTime(const std::string& prefix) { + const auto& scanTimeIt = scanTimes_.find(prefix); // O(log(n)) + if (scanTimeIt != scanTimes_.cend()) + { + return scanTimeIt->second; + } + Aws::S3::Model::ListObjectsV2Request request; request.SetBucket(bucketName_); request.SetPrefix(prefix); @@ -241,8 +256,9 @@ AwsLevel2ChunksDataProvider::Impl::GetScanTime(const std::string& prefix) auto outcome = client_->ListObjectsV2(request); if (outcome.IsSuccess()) { - return self_->GetTimePointByKey( + auto timePoint = self_->GetTimePointByKey( outcome.GetResult().GetContents().at(0).GetKey()); + return timePoint; } return {}; @@ -264,44 +280,7 @@ std::string AwsLevel2ChunksDataProvider::Impl::GetScanKey( std::tuple AwsLevel2ChunksDataProvider::ListObjects(std::chrono::system_clock::time_point) { - // TODO this is slow. It could probably be speed up by not reloading every - // scan every time. - const std::string prefix = p->radarSite_ + "/"; - - logger_->debug("ListObjects: {}", prefix); - - Aws::S3::Model::ListObjectsV2Request request; - request.SetBucket(p->bucketName_); - request.SetPrefix(prefix); - request.SetDelimiter("/"); - - auto outcome = p->client_->ListObjectsV2(request); - - size_t newObjects = 0; - size_t totalObjects = 0; - - if (outcome.IsSuccess()) - { - auto& scans = outcome.GetResult().GetCommonPrefixes(); - logger_->debug("Found {} scans", scans.size()); - - for (const auto& scan : scans) - { - const std::string& prefixScan = scan.GetPrefix(); - - auto time = p->GetScanTime(prefixScan); - - if (!p->scans_.contains(time)) - { - p->scans_.insert_or_assign(time, Impl::ScanRecord {prefixScan}); - newObjects++; - } - - totalObjects++; - } - } - - return {outcome.IsSuccess(), newObjects, totalObjects}; + return {true, 0, 0}; } std::shared_ptr @@ -313,16 +292,23 @@ AwsLevel2ChunksDataProvider::LoadObjectByKey(const std::string& /*prefix*/) std::shared_ptr AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord) { - if (scanRecord.hasAllFiles_) + if (!scanRecord.valid_) + { + return nullptr; + } + else if (scanRecord.hasAllFiles_) { return scanRecord.nexradFile_; } - // TODO can get only new records using scanRecords last Aws::S3::Model::ListObjectsV2Request listRequest; listRequest.SetBucket(bucketName_); listRequest.SetPrefix(scanRecord.prefix_); listRequest.SetDelimiter("/"); + if (!scanRecord.lastKey_.empty()) + { + listRequest.SetStartAfter(scanRecord.lastKey_); + } auto listOutcome = client_->ListObjectsV2(listRequest); if (!listOutcome.IsSuccess()) @@ -336,6 +322,7 @@ AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord) { const std::string& key = chunk.GetKey(); + // TODO this is wrong, 1st number can be 1-3 digits // We just want the number of this chunk for now // KIND/585/20250324-134727-001-S static const size_t startNumberPos = @@ -347,6 +334,7 @@ AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord) continue; } + // TODO this is wrong, 1st number can be 1-3 digits // Now we want the ending char // KIND/585/20250324-134727-001-S static const size_t charPos = @@ -407,24 +395,15 @@ AwsLevel2ChunksDataProvider::Impl::LoadScan(Impl::ScanRecord& scanRecord) outcome.GetResult().GetLastModified().Seconds()}; std::chrono::system_clock::time_point lastModified {lastModifiedSeconds}; - scanRecord.secondLastModified_ = scanRecord.lastModified_; - scanRecord.lastModified_ = lastModified; + scanRecord.lastModified_ = lastModified; scanRecord.nextFile_ += 1; + scanRecord.lastKey_ = key; } - scanRecord.nexradFile_->IndexFile(); - if (!scans_.empty()) + if (scanRecord.nexradFile_ != nullptr) { - auto& lastScan = scans_.crend()->second; - lastModified_ = lastScan.lastModified_; - if (lastScan.secondLastModified_ != - std::chrono::system_clock::time_point()) - { - auto delta = lastScan.lastModified_ - lastScan.secondLastModified_; - updatePeriod_ = - std::chrono::duration_cast(delta); - } + scanRecord.nexradFile_->IndexFile(); } return scanRecord.nexradFile_; @@ -434,19 +413,20 @@ std::shared_ptr AwsLevel2ChunksDataProvider::LoadObjectByTime( std::chrono::system_clock::time_point time) { - std::shared_lock lock(p->scansMutex_); + std::unique_lock lock(p->scansMutex_); - logger_->error("LoadObjectByTime({})", time); - - auto scanRecord = util::GetBoundedElementPointer(p->scans_, time); - if (scanRecord == nullptr) + if (p->currentScan_.valid_ && time >= p->currentScan_.time_) + { + return p->LoadScan(p->currentScan_); + } + else if (p->lastScan_.valid_ && time >= p->lastScan_.time_) + { + return p->LoadScan(p->lastScan_); + } + else { - logger_->warn("Could not find object at time {}", time); return nullptr; } - - // The scanRecord must be a reference - return p->LoadScan(p->scans_.at(scanRecord->first)); } std::shared_ptr @@ -455,23 +435,120 @@ AwsLevel2ChunksDataProvider::LoadLatestObject() return LoadObjectByTime(FindLatestTime()); } +int AwsLevel2ChunksDataProvider::Impl::GetScanNumber(const std::string& prefix) +{ + + // We just want the number of this chunk for now + // KIND/585/20250324-134727-001-S + static const size_t startNumberPos = std::string("KIND/").size(); + const std::string& prefixNumberStr = prefix.substr(startNumberPos, 3); + return std::stoi(prefixNumberStr); +} + std::pair AwsLevel2ChunksDataProvider::Refresh() { using namespace std::chrono; std::unique_lock lock(p->refreshMutex_); + std::unique_lock scanLock(p->scansMutex_); - auto [success, newObjects, totalObjects] = ListObjects({}); - for (auto& scanRecord : p->scans_) + size_t newObjects = 0; + size_t totalObjects = 0; + + const std::string prefix = p->radarSite_ + "/"; + + Aws::S3::Model::ListObjectsV2Request request; + request.SetBucket(p->bucketName_); + request.SetPrefix(prefix); + request.SetDelimiter("/"); + + auto outcome = p->client_->ListObjectsV2(request); + + + if (outcome.IsSuccess()) { - if (scanRecord.second.nexradFile_ != nullptr) + auto& scans = outcome.GetResult().GetCommonPrefixes(); + logger_->debug("Found {} scans", scans.size()); + + boost::timer::cpu_timer timer {}; + timer.start(); + if (scans.size() > 0) { - p->LoadScan(scanRecord.second); - newObjects += 1; + + // TODO this cannot be done by getting things form the network. + // Use index number instead. + + // find latest scan + std::chrono::system_clock::time_point latestTime = {}; + std::chrono::system_clock::time_point secondLatestTime = {}; + size_t latestIndex = 0; + size_t secondLatestIndex = 0; + + + for (size_t i = 0; i < scans.size(); i++) // O(n log(n)) n <= 999 + { + auto time = p->GetScanTime(scans[i].GetPrefix()); + if (time > latestTime) + { + secondLatestTime = latestTime; + latestTime = time; + secondLatestIndex = latestIndex; + latestIndex = i; + } + } + + const auto& last = scans.at(secondLatestIndex).GetPrefix(); + if (secondLatestTime != std::chrono::system_clock::time_point {}) + { + p->lastScan_ = p->currentScan_; + } + else if (!p->lastScan_.valid_ || p->lastScan_.prefix_ != last) + { + p->lastScan_.valid_ = true; + p->lastScan_.prefix_ = last; + p->lastScan_.nexradFile_ = nullptr; + p->lastScan_.time_ = secondLatestTime; + p->lastScan_.lastModified_ = {}; + p->lastScan_.lastKey_ = ""; + p->lastScan_.nextFile_ = 1; + p->lastScan_.hasAllFiles_ = false; + newObjects += 1; + } + + const auto& current = scans.at(latestIndex).GetPrefix(); + if (!p->currentScan_.valid_ || p->currentScan_.prefix_ != current) + { + p->currentScan_.valid_ = true; + p->currentScan_.prefix_ = current; + p->currentScan_.nexradFile_ = nullptr; + p->currentScan_.time_ = latestTime; + p->currentScan_.lastModified_ = {}; + p->currentScan_.lastKey_ = ""; + p->currentScan_.nextFile_ = 1; + p->currentScan_.hasAllFiles_ = false; + newObjects += 1; + } } + + timer.stop(); + logger_->debug("Updated current scans in {}", timer.format(6, "%ws")); } + logger_->debug("Loading scans"); + + if (p->currentScan_.valid_) + { + p->LoadScan(p->currentScan_); + totalObjects += 1; + } + if (p->lastScan_.valid_) + { + p->LoadScan(p->lastScan_); + totalObjects += 1; + } + + return std::make_pair(newObjects, totalObjects); }