Update Archive II processing to better handle older data sets

This commit is contained in:
Dan Paulat 2024-01-27 01:17:34 -06:00
parent fdd09011f4
commit 243a7c870c
4 changed files with 110 additions and 65 deletions

View file

@ -32,14 +32,17 @@ public:
Ar2vFile(Ar2vFile&&) noexcept; Ar2vFile(Ar2vFile&&) noexcept;
Ar2vFile& operator=(Ar2vFile&&) noexcept; Ar2vFile& operator=(Ar2vFile&&) noexcept;
uint32_t julian_date() const; std::uint32_t julian_date() const;
uint32_t milliseconds() const; std::uint32_t milliseconds() const;
std::string icao() const; std::string icao() const;
std::size_t message_count() const;
std::chrono::system_clock::time_point start_time() const; std::chrono::system_clock::time_point start_time() const;
std::chrono::system_clock::time_point end_time() const; std::chrono::system_clock::time_point end_time() const;
std::map<uint16_t, std::shared_ptr<rda::ElevationScan>> radar_data() const; std::map<std::uint16_t, std::shared_ptr<rda::ElevationScan>>
radar_data() const;
std::shared_ptr<const rda::VolumeCoveragePatternData> vcp_data() const; std::shared_ptr<const rda::VolumeCoveragePatternData> vcp_data() const;
std::tuple<std::shared_ptr<rda::ElevationScan>, float, std::vector<float>> std::tuple<std::shared_ptr<rda::ElevationScan>, float, std::vector<float>>

View file

@ -19,6 +19,7 @@
# pragma GCC diagnostic ignored "-Wdeprecated-copy" # pragma GCC diagnostic ignored "-Wdeprecated-copy"
#endif #endif
#include <boost/algorithm/string/trim.hpp>
#include <boost/iostreams/copy.hpp> #include <boost/iostreams/copy.hpp>
#include <boost/iostreams/filtering_streambuf.hpp> #include <boost/iostreams/filtering_streambuf.hpp>
#include <boost/iostreams/filter/bzip2.hpp> #include <boost/iostreams/filter/bzip2.hpp>
@ -42,16 +43,7 @@ static const auto logger_ = util::Logger::Create(logPrefix_);
class Ar2vFileImpl class Ar2vFileImpl
{ {
public: public:
explicit Ar2vFileImpl() : explicit Ar2vFileImpl() {};
tapeFilename_ {},
extensionNumber_ {},
julianDate_ {0},
milliseconds_ {0},
icao_ {},
vcpData_ {nullptr},
radarData_ {},
index_ {},
rawRecords_ {} {};
~Ar2vFileImpl() = default; ~Ar2vFileImpl() = default;
std::size_t DecompressLDMRecords(std::istream& is); std::size_t DecompressLDMRecords(std::istream& is);
@ -61,20 +53,22 @@ public:
void ParseLDMRecord(std::istream& is); void ParseLDMRecord(std::istream& is);
void ProcessRadarData(const std::shared_ptr<rda::GenericRadarData>& message); void ProcessRadarData(const std::shared_ptr<rda::GenericRadarData>& message);
std::string tapeFilename_; std::string tapeFilename_ {};
std::string extensionNumber_; std::string extensionNumber_ {};
std::uint32_t julianDate_; std::uint32_t julianDate_ {0};
std::uint32_t milliseconds_; std::uint32_t milliseconds_ {0};
std::string icao_; std::string icao_ {};
std::shared_ptr<rda::VolumeCoveragePatternData> vcpData_; std::size_t messageCount_ {0};
std::map<std::uint16_t, std::shared_ptr<rda::ElevationScan>> radarData_;
std::shared_ptr<rda::VolumeCoveragePatternData> vcpData_ {};
std::map<std::uint16_t, std::shared_ptr<rda::ElevationScan>> radarData_ {};
std::map<rda::DataBlockType, std::map<rda::DataBlockType,
std::map<std::uint16_t, std::shared_ptr<rda::ElevationScan>>> std::map<std::uint16_t, std::shared_ptr<rda::ElevationScan>>>
index_; index_ {};
std::list<std::stringstream> rawRecords_; std::list<std::stringstream> rawRecords_ {};
}; };
Ar2vFile::Ar2vFile() : p(std::make_unique<Ar2vFileImpl>()) {} Ar2vFile::Ar2vFile() : p(std::make_unique<Ar2vFileImpl>()) {}
@ -98,6 +92,11 @@ std::string Ar2vFile::icao() const
return p->icao_; return p->icao_;
} }
std::size_t Ar2vFile::message_count() const
{
return p->messageCount_;
}
std::chrono::system_clock::time_point Ar2vFile::start_time() const std::chrono::system_clock::time_point Ar2vFile::start_time() const
{ {
return util::TimePoint(p->julianDate_, p->milliseconds_); return util::TimePoint(p->julianDate_, p->milliseconds_);
@ -235,6 +234,10 @@ bool Ar2vFile::LoadData(std::istream& is)
dataValid = false; dataValid = false;
} }
// Trim spaces and null characters from the end of the ICAO
boost::trim_right_if(p->icao_,
[](char x) { return std::isspace(x) || x == '\0'; });
if (dataValid) if (dataValid)
{ {
logger_->debug("Filename: {}", p->tapeFilename_); logger_->debug("Filename: {}", p->tapeFilename_);
@ -334,55 +337,71 @@ void Ar2vFileImpl::ParseLDMRecords()
void Ar2vFileImpl::ParseLDMRecord(std::istream& is) void Ar2vFileImpl::ParseLDMRecord(std::istream& is)
{ {
static constexpr std::size_t kDefaultSegmentSize = 2432;
static constexpr std::size_t kCtmHeaderSize = 12;
auto ctx = rda::Level2MessageFactory::CreateContext(); auto ctx = rda::Level2MessageFactory::CreateContext();
while (!is.eof() && !is.fail())
{
// The communications manager inserts an extra 12 bytes at the beginning // The communications manager inserts an extra 12 bytes at the beginning
// of each record // of each record
is.seekg(12, std::ios_base::cur); is.seekg(kCtmHeaderSize, std::ios_base::cur);
while (!is.eof()) // Each message requires 2432 bytes of storage, with the exception of
// Message Types 29 and 31.
std::size_t messageSize = kDefaultSegmentSize - kCtmHeaderSize;
// Mark current position
std::streampos messageStart = is.tellg();
// Parse the header
rda::Level2MessageHeader messageHeader;
bool headerValid = messageHeader.Parse(is);
is.seekg(messageStart, std::ios_base::beg);
if (headerValid)
{ {
off_t offset = 0; std::uint8_t messageType = messageHeader.message_type();
std::uint16_t nextSize = 0u;
do // Each message requires 2432 bytes of storage, with the exception of
// Message Types 29 and 31.
if (messageType == 29 || messageType == 31)
{ {
is.read(reinterpret_cast<char*>(&nextSize), 2); if (messageHeader.message_size() == 65535)
if (nextSize == 0)
{ {
offset += 2; messageSize = (static_cast<std::size_t>(
messageHeader.number_of_message_segments())
<< 16) +
messageHeader.message_segment_number();
} }
else else
{ {
is.seekg(-2, std::ios_base::cur); messageSize =
static_cast<std::size_t>(messageHeader.message_size()) * 2;
} }
} while (!is.eof() && !is.fail() && nextSize == 0u);
if (!is.eof() && !is.fail() && offset != 0)
{
logger_->trace("Next record offset by {} bytes", offset);
}
else if (is.eof() || is.fail())
{
break;
} }
// Parse the current message
rda::Level2MessageInfo msgInfo = rda::Level2MessageInfo msgInfo =
rda::Level2MessageFactory::Create(is, ctx); rda::Level2MessageFactory::Create(is, ctx);
if (!msgInfo.headerValid)
{
// Invalid message
break;
}
if (msgInfo.messageValid) if (msgInfo.messageValid)
{ {
HandleMessage(msgInfo.message); HandleMessage(msgInfo.message);
} }
} }
// Skip to next message
is.seekg(messageStart + static_cast<std::streampos>(messageSize),
std::ios_base::beg);
}
} }
void Ar2vFileImpl::HandleMessage(std::shared_ptr<rda::Level2Message>& message) void Ar2vFileImpl::HandleMessage(std::shared_ptr<rda::Level2Message>& message)
{ {
++messageCount_;
switch (message->header().message_type()) switch (message->header().message_type())
{ {
case static_cast<std::uint8_t>(rda::MessageId::VolumeCoveragePatternData): case static_cast<std::uint8_t>(rda::MessageId::VolumeCoveragePatternData):

View file

@ -70,6 +70,30 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is,
info.headerValid = header.Parse(is); info.headerValid = header.Parse(is);
info.messageValid = info.headerValid; info.messageValid = info.headerValid;
std::uint16_t segment = 0;
std::uint16_t totalSegments = 0;
std::size_t dataSize = 0;
if (info.headerValid)
{
if (header.message_size() == 65535)
{
segment = 1;
totalSegments = 1;
dataSize =
(static_cast<std::size_t>(header.number_of_message_segments())
<< 16) +
header.message_segment_number();
}
else
{
segment = header.message_segment_number();
totalSegments = header.number_of_message_segments();
dataSize = static_cast<std::size_t>(header.message_size()) * 2 -
Level2MessageHeader::SIZE;
}
}
if (info.headerValid && create_.find(header.message_type()) == create_.end()) if (info.headerValid && create_.find(header.message_type()) == create_.end())
{ {
logger_->warn("Unknown message type: {}", logger_->warn("Unknown message type: {}",
@ -79,10 +103,7 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is,
if (info.messageValid) if (info.messageValid)
{ {
uint16_t segment = header.message_segment_number(); std::uint8_t messageType = header.message_type();
uint16_t totalSegments = header.number_of_message_segments();
uint8_t messageType = header.message_type();
size_t dataSize = header.message_size() * 2 - Level2MessageHeader::SIZE;
std::istream* messageStream = nullptr; std::istream* messageStream = nullptr;
@ -165,8 +186,7 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is,
else if (info.headerValid) else if (info.headerValid)
{ {
// Seek to the end of the current message // Seek to the end of the current message
is.seekg(header.message_size() * 2 - rda::Level2MessageHeader::SIZE, is.seekg(dataSize, std::ios_base::cur);
std::ios_base::cur);
} }
if (info.message == nullptr) if (info.message == nullptr)

View file

@ -129,8 +129,11 @@ bool Level2MessageHeader::Parse(std::istream& is)
else else
{ {
if (p->messageSize_ < 9) if (p->messageSize_ < 9)
{
if (p->messageSize_ != 0)
{ {
logger_->warn("Invalid message size: {}", p->messageSize_); logger_->warn("Invalid message size: {}", p->messageSize_);
}
headerValid = false; headerValid = false;
} }
if (p->millisecondsOfDay_ > 86'399'999u) if (p->millisecondsOfDay_ > 86'399'999u)