mirror of
				https://github.com/ciphervance/supercell-wx.git
				synced 2025-11-04 14:10:06 +00:00 
			
		
		
		
	Parallelize the chunks loading and load from archive when possible
This commit is contained in:
		
							parent
							
								
									e10ebdeb5e
								
							
						
					
					
						commit
						759a9e4379
					
				
					 3 changed files with 88 additions and 12 deletions
				
			
		| 
						 | 
					@ -151,6 +151,16 @@ public:
 | 
				
			||||||
      level2ChunksProviderManager_->provider_ =
 | 
					      level2ChunksProviderManager_->provider_ =
 | 
				
			||||||
         provider::NexradDataProviderFactory::CreateLevel2ChunksDataProvider(
 | 
					         provider::NexradDataProviderFactory::CreateLevel2ChunksDataProvider(
 | 
				
			||||||
            radarId);
 | 
					            radarId);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      auto level2ChunksProvider =
 | 
				
			||||||
 | 
					         std::dynamic_pointer_cast<provider::AwsLevel2ChunksDataProvider>(
 | 
				
			||||||
 | 
					            level2ChunksProviderManager_->provider_);
 | 
				
			||||||
 | 
					      if (level2ChunksProvider != nullptr)
 | 
				
			||||||
 | 
					      {
 | 
				
			||||||
 | 
					         level2ChunksProvider->SetLevel2DataProvider(
 | 
				
			||||||
 | 
					            std::dynamic_pointer_cast<provider::AwsLevel2DataProvider>(
 | 
				
			||||||
 | 
					               level2ProviderManager_->provider_));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
   }
 | 
					   }
 | 
				
			||||||
   ~RadarProductManagerImpl()
 | 
					   ~RadarProductManagerImpl()
 | 
				
			||||||
   {
 | 
					   {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,6 +1,7 @@
 | 
				
			||||||
#pragma once
 | 
					#pragma once
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#include <scwx/provider/nexrad_data_provider.hpp>
 | 
					#include <scwx/provider/nexrad_data_provider.hpp>
 | 
				
			||||||
 | 
					#include <scwx/provider/aws_level2_data_provider.hpp>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#include <optional>
 | 
					#include <optional>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -61,6 +62,9 @@ public:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   std::optional<float> GetCurrentElevation();
 | 
					   std::optional<float> GetCurrentElevation();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   void SetLevel2DataProvider(
 | 
				
			||||||
 | 
					      const std::shared_ptr<AwsLevel2DataProvider>& provider);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
private:
 | 
					private:
 | 
				
			||||||
   class Impl;
 | 
					   class Impl;
 | 
				
			||||||
   std::unique_ptr<Impl> p;
 | 
					   std::unique_ptr<Impl> p;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -13,10 +13,24 @@
 | 
				
			||||||
#include <aws/s3/S3Client.h>
 | 
					#include <aws/s3/S3Client.h>
 | 
				
			||||||
#include <aws/s3/model/GetObjectRequest.h>
 | 
					#include <aws/s3/model/GetObjectRequest.h>
 | 
				
			||||||
#include <aws/s3/model/ListObjectsV2Request.h>
 | 
					#include <aws/s3/model/ListObjectsV2Request.h>
 | 
				
			||||||
#include <boost/timer/timer.hpp>
 | 
					 | 
				
			||||||
#include <fmt/chrono.h>
 | 
					#include <fmt/chrono.h>
 | 
				
			||||||
#include <fmt/format.h>
 | 
					#include <fmt/format.h>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Avoid circular refrence errors in boost
 | 
				
			||||||
 | 
					// NOLINTBEGIN(misc-header-include-cycle)
 | 
				
			||||||
 | 
					#if defined(_MSC_VER)
 | 
				
			||||||
 | 
					#   pragma warning(push, 0)
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include <boost/asio/thread_pool.hpp>
 | 
				
			||||||
 | 
					#include <boost/asio/post.hpp>
 | 
				
			||||||
 | 
					#include <boost/timer/timer.hpp>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#if defined(_MSC_VER)
 | 
				
			||||||
 | 
					#   pragma warning(pop)
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					// NOLINTEND(misc-header-include-cycle)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#if (__cpp_lib_chrono < 201907L)
 | 
					#if (__cpp_lib_chrono < 201907L)
 | 
				
			||||||
#   include <date/date.h>
 | 
					#   include <date/date.h>
 | 
				
			||||||
#endif
 | 
					#endif
 | 
				
			||||||
| 
						 | 
					@ -75,6 +89,7 @@ public:
 | 
				
			||||||
       lastTimeListed_ {},
 | 
					       lastTimeListed_ {},
 | 
				
			||||||
       // NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers) about average
 | 
					       // NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers) about average
 | 
				
			||||||
       updatePeriod_ {7},
 | 
					       updatePeriod_ {7},
 | 
				
			||||||
 | 
					       level2DataProvider_ {},
 | 
				
			||||||
       self_ {self}
 | 
					       self_ {self}
 | 
				
			||||||
   {
 | 
					   {
 | 
				
			||||||
      // Disable HTTP request for region
 | 
					      // Disable HTTP request for region
 | 
				
			||||||
| 
						 | 
					@ -122,6 +137,8 @@ public:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   std::chrono::seconds updatePeriod_;
 | 
					   std::chrono::seconds updatePeriod_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   std::weak_ptr<AwsLevel2DataProvider> level2DataProvider_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   AwsLevel2ChunksDataProvider* self_;
 | 
					   AwsLevel2ChunksDataProvider* self_;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -634,7 +651,6 @@ AwsLevel2ChunksDataProvider::LoadLatestObject()
 | 
				
			||||||
   const std::unique_lock lock(p->scansMutex_);
 | 
					   const std::unique_lock lock(p->scansMutex_);
 | 
				
			||||||
   return std::make_shared<wsr88d::Ar2vFile>(p->currentScan_.nexradFile_,
 | 
					   return std::make_shared<wsr88d::Ar2vFile>(p->currentScan_.nexradFile_,
 | 
				
			||||||
                                             p->lastScan_.nexradFile_);
 | 
					                                             p->lastScan_.nexradFile_);
 | 
				
			||||||
   // return p->currentScan_.nexradFile_;
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
std::shared_ptr<wsr88d::NexradFile>
 | 
					std::shared_ptr<wsr88d::NexradFile>
 | 
				
			||||||
| 
						 | 
					@ -663,23 +679,63 @@ std::pair<size_t, size_t> AwsLevel2ChunksDataProvider::Refresh()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   auto [success, newObjects, totalObjects] = p->ListObjects();
 | 
					   auto [success, newObjects, totalObjects] = p->ListObjects();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   auto threadPool = boost::asio::thread_pool(3);
 | 
				
			||||||
 | 
					   bool newCurrent = false;
 | 
				
			||||||
 | 
					   bool newLast    = false;
 | 
				
			||||||
   if (p->currentScan_.valid_)
 | 
					   if (p->currentScan_.valid_)
 | 
				
			||||||
   {
 | 
					   {
 | 
				
			||||||
      if (p->LoadScan(p->currentScan_))
 | 
					      boost::asio::post(threadPool,
 | 
				
			||||||
      {
 | 
					                        [this, &newCurrent]()
 | 
				
			||||||
         newObjects += 1;
 | 
					                        { newCurrent = p->LoadScan(p->currentScan_); });
 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
      totalObjects += 1;
 | 
					      totalObjects += 1;
 | 
				
			||||||
   }
 | 
					   }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   if (p->lastScan_.valid_)
 | 
					   if (p->lastScan_.valid_)
 | 
				
			||||||
   {
 | 
					   {
 | 
				
			||||||
      // TODO this is slow when initially loading data. If possible, loading
 | 
					 | 
				
			||||||
      // this from the archive may speed it up a lot.
 | 
					 | 
				
			||||||
      if (p->LoadScan(p->lastScan_))
 | 
					 | 
				
			||||||
      {
 | 
					 | 
				
			||||||
         newObjects += 1;
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
      totalObjects += 1;
 | 
					      totalObjects += 1;
 | 
				
			||||||
 | 
					      boost::asio::post(
 | 
				
			||||||
 | 
					         threadPool,
 | 
				
			||||||
 | 
					         [this, &newLast]()
 | 
				
			||||||
 | 
					         {
 | 
				
			||||||
 | 
					            if (!p->lastScan_.hasAllFiles_)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					               // If we have chunks, use chunks
 | 
				
			||||||
 | 
					               if (p->lastScan_.nextFile_ != 1)
 | 
				
			||||||
 | 
					               {
 | 
				
			||||||
 | 
					                  newLast = p->LoadScan(p->lastScan_);
 | 
				
			||||||
 | 
					               }
 | 
				
			||||||
 | 
					               else
 | 
				
			||||||
 | 
					               {
 | 
				
			||||||
 | 
					                  auto level2DataProvider = p->level2DataProvider_.lock();
 | 
				
			||||||
 | 
					                  if (level2DataProvider != nullptr)
 | 
				
			||||||
 | 
					                  {
 | 
				
			||||||
 | 
					                     level2DataProvider->ListObjects(p->lastScan_.time_);
 | 
				
			||||||
 | 
					                     p->lastScan_.nexradFile_ =
 | 
				
			||||||
 | 
					                        std::dynamic_pointer_cast<wsr88d::Ar2vFile>(
 | 
				
			||||||
 | 
					                           level2DataProvider->LoadObjectByTime(
 | 
				
			||||||
 | 
					                              p->lastScan_.time_));
 | 
				
			||||||
 | 
					                     if (p->lastScan_.nexradFile_ != nullptr)
 | 
				
			||||||
 | 
					                     {
 | 
				
			||||||
 | 
					                        p->lastScan_.hasAllFiles_ = true;
 | 
				
			||||||
 | 
					                        // TODO maybe set lastModified for timing
 | 
				
			||||||
 | 
					                     }
 | 
				
			||||||
 | 
					                  }
 | 
				
			||||||
 | 
					                  // Fall back to chunks if files did not load
 | 
				
			||||||
 | 
					                  newLast = p->lastScan_.nexradFile_ != nullptr ||
 | 
				
			||||||
 | 
					                            p->LoadScan(p->lastScan_);
 | 
				
			||||||
 | 
					               }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					         });
 | 
				
			||||||
 | 
					   }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   threadPool.wait();
 | 
				
			||||||
 | 
					   if (newCurrent)
 | 
				
			||||||
 | 
					   {
 | 
				
			||||||
 | 
					      newObjects += 1;
 | 
				
			||||||
 | 
					   }
 | 
				
			||||||
 | 
					   if (newLast)
 | 
				
			||||||
 | 
					   {
 | 
				
			||||||
 | 
					      newObjects += 1;
 | 
				
			||||||
   }
 | 
					   }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   timer.stop();
 | 
					   timer.stop();
 | 
				
			||||||
| 
						 | 
					@ -732,4 +788,10 @@ std::optional<float> AwsLevel2ChunksDataProvider::GetCurrentElevation()
 | 
				
			||||||
   return {};
 | 
					   return {};
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void AwsLevel2ChunksDataProvider::SetLevel2DataProvider(
 | 
				
			||||||
 | 
					   const std::shared_ptr<AwsLevel2DataProvider>& provider)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					   p->level2DataProvider_ = provider;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
} // namespace scwx::provider
 | 
					} // namespace scwx::provider
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue