diff --git a/wxdata/include/scwx/wsr88d/ar2v_file.hpp b/wxdata/include/scwx/wsr88d/ar2v_file.hpp index 0df24765..1f3ab0cc 100644 --- a/wxdata/include/scwx/wsr88d/ar2v_file.hpp +++ b/wxdata/include/scwx/wsr88d/ar2v_file.hpp @@ -32,15 +32,18 @@ public: Ar2vFile(Ar2vFile&&) noexcept; Ar2vFile& operator=(Ar2vFile&&) noexcept; - uint32_t julian_date() const; - uint32_t milliseconds() const; - std::string icao() const; + std::uint32_t julian_date() const; + std::uint32_t milliseconds() const; + std::string icao() const; + + std::size_t message_count() const; std::chrono::system_clock::time_point start_time() const; std::chrono::system_clock::time_point end_time() const; - std::map> radar_data() const; - std::shared_ptr vcp_data() const; + std::map> + radar_data() const; + std::shared_ptr vcp_data() const; std::tuple, float, std::vector> GetElevationScan(rda::DataBlockType dataBlockType, diff --git a/wxdata/source/scwx/wsr88d/ar2v_file.cpp b/wxdata/source/scwx/wsr88d/ar2v_file.cpp index 714e15a1..6d951d6c 100644 --- a/wxdata/source/scwx/wsr88d/ar2v_file.cpp +++ b/wxdata/source/scwx/wsr88d/ar2v_file.cpp @@ -19,6 +19,7 @@ # pragma GCC diagnostic ignored "-Wdeprecated-copy" #endif +#include #include #include #include @@ -42,16 +43,7 @@ static const auto logger_ = util::Logger::Create(logPrefix_); class Ar2vFileImpl { public: - explicit Ar2vFileImpl() : - tapeFilename_ {}, - extensionNumber_ {}, - julianDate_ {0}, - milliseconds_ {0}, - icao_ {}, - vcpData_ {nullptr}, - radarData_ {}, - index_ {}, - rawRecords_ {} {}; + explicit Ar2vFileImpl() {}; ~Ar2vFileImpl() = default; std::size_t DecompressLDMRecords(std::istream& is); @@ -61,20 +53,22 @@ public: void ParseLDMRecord(std::istream& is); void ProcessRadarData(const std::shared_ptr& message); - std::string tapeFilename_; - std::string extensionNumber_; - std::uint32_t julianDate_; - std::uint32_t milliseconds_; - std::string icao_; + std::string tapeFilename_ {}; + std::string extensionNumber_ {}; + std::uint32_t julianDate_ {0}; + std::uint32_t milliseconds_ {0}; + std::string icao_ {}; - std::shared_ptr vcpData_; - std::map> radarData_; + std::size_t messageCount_ {0}; + + std::shared_ptr vcpData_ {}; + std::map> radarData_ {}; std::map>> - index_; + index_ {}; - std::list rawRecords_; + std::list rawRecords_ {}; }; Ar2vFile::Ar2vFile() : p(std::make_unique()) {} @@ -98,6 +92,11 @@ std::string Ar2vFile::icao() const return p->icao_; } +std::size_t Ar2vFile::message_count() const +{ + return p->messageCount_; +} + std::chrono::system_clock::time_point Ar2vFile::start_time() const { return util::TimePoint(p->julianDate_, p->milliseconds_); @@ -235,6 +234,10 @@ bool Ar2vFile::LoadData(std::istream& is) dataValid = false; } + // Trim spaces and null characters from the end of the ICAO + boost::trim_right_if(p->icao_, + [](char x) { return std::isspace(x) || x == '\0'; }); + if (dataValid) { logger_->debug("Filename: {}", p->tapeFilename_); @@ -334,55 +337,71 @@ void Ar2vFileImpl::ParseLDMRecords() void Ar2vFileImpl::ParseLDMRecord(std::istream& is) { + static constexpr std::size_t kDefaultSegmentSize = 2432; + static constexpr std::size_t kCtmHeaderSize = 12; + auto ctx = rda::Level2MessageFactory::CreateContext(); - // The communications manager inserts an extra 12 bytes at the beginning - // of each record - is.seekg(12, std::ios_base::cur); - - while (!is.eof()) + while (!is.eof() && !is.fail()) { - off_t offset = 0; - std::uint16_t nextSize = 0u; - do + // The communications manager inserts an extra 12 bytes at the beginning + // of each record + is.seekg(kCtmHeaderSize, std::ios_base::cur); + + // Each message requires 2432 bytes of storage, with the exception of + // Message Types 29 and 31. + std::size_t messageSize = kDefaultSegmentSize - kCtmHeaderSize; + + // Mark current position + std::streampos messageStart = is.tellg(); + + // Parse the header + rda::Level2MessageHeader messageHeader; + bool headerValid = messageHeader.Parse(is); + is.seekg(messageStart, std::ios_base::beg); + + if (headerValid) { - is.read(reinterpret_cast(&nextSize), 2); - if (nextSize == 0) + std::uint8_t messageType = messageHeader.message_type(); + + // Each message requires 2432 bytes of storage, with the exception of + // Message Types 29 and 31. + if (messageType == 29 || messageType == 31) { - offset += 2; + if (messageHeader.message_size() == 65535) + { + messageSize = (static_cast( + messageHeader.number_of_message_segments()) + << 16) + + messageHeader.message_segment_number(); + } + else + { + messageSize = + static_cast(messageHeader.message_size()) * 2; + } } - else + + // Parse the current message + rda::Level2MessageInfo msgInfo = + rda::Level2MessageFactory::Create(is, ctx); + + if (msgInfo.messageValid) { - is.seekg(-2, std::ios_base::cur); + HandleMessage(msgInfo.message); } - } while (!is.eof() && !is.fail() && nextSize == 0u); - - if (!is.eof() && !is.fail() && offset != 0) - { - logger_->trace("Next record offset by {} bytes", offset); - } - else if (is.eof() || is.fail()) - { - break; } - rda::Level2MessageInfo msgInfo = - rda::Level2MessageFactory::Create(is, ctx); - if (!msgInfo.headerValid) - { - // Invalid message - break; - } - - if (msgInfo.messageValid) - { - HandleMessage(msgInfo.message); - } + // Skip to next message + is.seekg(messageStart + static_cast(messageSize), + std::ios_base::beg); } } void Ar2vFileImpl::HandleMessage(std::shared_ptr& message) { + ++messageCount_; + switch (message->header().message_type()) { case static_cast(rda::MessageId::VolumeCoveragePatternData): diff --git a/wxdata/source/scwx/wsr88d/rda/level2_message_factory.cpp b/wxdata/source/scwx/wsr88d/rda/level2_message_factory.cpp index 56141da9..2a478a2f 100644 --- a/wxdata/source/scwx/wsr88d/rda/level2_message_factory.cpp +++ b/wxdata/source/scwx/wsr88d/rda/level2_message_factory.cpp @@ -70,6 +70,30 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is, info.headerValid = header.Parse(is); info.messageValid = info.headerValid; + std::uint16_t segment = 0; + std::uint16_t totalSegments = 0; + std::size_t dataSize = 0; + + if (info.headerValid) + { + if (header.message_size() == 65535) + { + segment = 1; + totalSegments = 1; + dataSize = + (static_cast(header.number_of_message_segments()) + << 16) + + header.message_segment_number(); + } + else + { + segment = header.message_segment_number(); + totalSegments = header.number_of_message_segments(); + dataSize = static_cast(header.message_size()) * 2 - + Level2MessageHeader::SIZE; + } + } + if (info.headerValid && create_.find(header.message_type()) == create_.end()) { logger_->warn("Unknown message type: {}", @@ -79,10 +103,7 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is, if (info.messageValid) { - uint16_t segment = header.message_segment_number(); - uint16_t totalSegments = header.number_of_message_segments(); - uint8_t messageType = header.message_type(); - size_t dataSize = header.message_size() * 2 - Level2MessageHeader::SIZE; + std::uint8_t messageType = header.message_type(); std::istream* messageStream = nullptr; @@ -165,8 +186,7 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is, else if (info.headerValid) { // Seek to the end of the current message - is.seekg(header.message_size() * 2 - rda::Level2MessageHeader::SIZE, - std::ios_base::cur); + is.seekg(dataSize, std::ios_base::cur); } if (info.message == nullptr) diff --git a/wxdata/source/scwx/wsr88d/rda/level2_message_header.cpp b/wxdata/source/scwx/wsr88d/rda/level2_message_header.cpp index 5aabc20c..a8ed4125 100644 --- a/wxdata/source/scwx/wsr88d/rda/level2_message_header.cpp +++ b/wxdata/source/scwx/wsr88d/rda/level2_message_header.cpp @@ -130,7 +130,10 @@ bool Level2MessageHeader::Parse(std::istream& is) { if (p->messageSize_ < 9) { - logger_->warn("Invalid message size: {}", p->messageSize_); + if (p->messageSize_ != 0) + { + logger_->warn("Invalid message size: {}", p->messageSize_); + } headerValid = false; } if (p->millisecondsOfDay_ > 86'399'999u)