From 85ccd257054e41350a9472c6a2067df4cca1be92 Mon Sep 17 00:00:00 2001 From: Dan Paulat Date: Sun, 13 Jun 2021 18:06:57 -0500 Subject: [PATCH] Parse RDA message headers --- .../scwx/wsr88d/rda/message_header.hpp | 44 +++++ .../source/scwx/wsr88d/rda/message_header.cpp | 163 ++++++++++++++++++ wxdata/source/scwx/wsr88d/rpg/ar2v_file.cpp | 82 +++++++-- wxdata/wxdata.cmake | 6 + 4 files changed, 284 insertions(+), 11 deletions(-) create mode 100644 wxdata/include/scwx/wsr88d/rda/message_header.hpp create mode 100644 wxdata/source/scwx/wsr88d/rda/message_header.cpp diff --git a/wxdata/include/scwx/wsr88d/rda/message_header.hpp b/wxdata/include/scwx/wsr88d/rda/message_header.hpp new file mode 100644 index 00000000..c1273a7a --- /dev/null +++ b/wxdata/include/scwx/wsr88d/rda/message_header.hpp @@ -0,0 +1,44 @@ +#include +#include + +namespace scwx +{ +namespace wsr88d +{ +namespace rda +{ + +class MessageHeaderImpl; + +class MessageHeader +{ +public: + explicit MessageHeader(); + ~MessageHeader(); + + MessageHeader(const MessageHeader&) = delete; + MessageHeader& operator=(const MessageHeader&) = delete; + + MessageHeader(MessageHeader&&) noexcept; + MessageHeader& operator=(MessageHeader&&); + + uint16_t message_size() const; + uint8_t rda_redundant_channel() const; + uint8_t message_type() const; + uint16_t id_sequence_number() const; + uint16_t julian_date() const; + uint32_t milliseconds_of_day() const; + uint16_t number_of_message_segments() const; + uint16_t message_segment_number() const; + + bool Parse(std::istream& is); + + static const size_t SIZE = 16u; + +private: + std::unique_ptr p; +}; + +} // namespace rda +} // namespace wsr88d +} // namespace scwx diff --git a/wxdata/source/scwx/wsr88d/rda/message_header.cpp b/wxdata/source/scwx/wsr88d/rda/message_header.cpp new file mode 100644 index 00000000..1a3a1c34 --- /dev/null +++ b/wxdata/source/scwx/wsr88d/rda/message_header.cpp @@ -0,0 +1,163 @@ +#include + +#include +#include + +#include + +#ifdef WIN32 +# include +#else +# include +#endif + +namespace scwx +{ +namespace wsr88d +{ +namespace rda +{ + +static const std::string logPrefix_ = "[scwx::wsr88d::rda::message_header] "; + +class MessageHeaderImpl +{ +public: + explicit MessageHeaderImpl() : + messageSize_(), + rdaRedundantChannel_(), + messageType_(), + idSequenceNumber_(), + julianDate_(), + millisecondsOfDay_(), + numberOfMessageSegments_(), + messageSegmentNumber_() {}; + ~MessageHeaderImpl() = default; + + uint16_t messageSize_; + uint8_t rdaRedundantChannel_; + uint8_t messageType_; + uint16_t idSequenceNumber_; + uint16_t julianDate_; + uint32_t millisecondsOfDay_; + uint16_t numberOfMessageSegments_; + uint16_t messageSegmentNumber_; +}; + +MessageHeader::MessageHeader() : + p(std::make_unique()) +{ +} +MessageHeader::~MessageHeader() = default; + +MessageHeader::MessageHeader(MessageHeader&&) noexcept = default; +MessageHeader& MessageHeader::operator=(MessageHeader&&) = default; + +uint16_t MessageHeader::message_size() const +{ + return p->messageSize_; +} + +uint8_t MessageHeader::rda_redundant_channel() const +{ + return p->rdaRedundantChannel_; +} + +uint8_t MessageHeader::message_type() const +{ + return p->messageType_; +} + +uint16_t MessageHeader::id_sequence_number() const +{ + return p->idSequenceNumber_; +} + +uint16_t MessageHeader::julian_date() const +{ + return p->julianDate_; +} + +uint32_t MessageHeader::milliseconds_of_day() const +{ + return p->millisecondsOfDay_; +} + +uint16_t MessageHeader::number_of_message_segments() const +{ + return p->numberOfMessageSegments_; +} + +uint16_t MessageHeader::message_segment_number() const +{ + return p->messageSegmentNumber_; +} + +bool MessageHeader::Parse(std::istream& is) +{ + bool headerValid = true; + + is.read(reinterpret_cast(&p->messageSize_), 2); + is.read(reinterpret_cast(&p->rdaRedundantChannel_), 1); + is.read(reinterpret_cast(&p->messageType_), 1); + is.read(reinterpret_cast(&p->idSequenceNumber_), 2); + is.read(reinterpret_cast(&p->julianDate_), 2); + is.read(reinterpret_cast(&p->millisecondsOfDay_), 4); + is.read(reinterpret_cast(&p->numberOfMessageSegments_), 2); + is.read(reinterpret_cast(&p->messageSegmentNumber_), 2); + + p->messageSize_ = htons(p->messageSize_); + p->idSequenceNumber_ = htons(p->idSequenceNumber_); + p->julianDate_ = htons(p->julianDate_); + p->millisecondsOfDay_ = htonl(p->millisecondsOfDay_); + p->numberOfMessageSegments_ = htons(p->numberOfMessageSegments_); + p->messageSegmentNumber_ = htons(p->messageSegmentNumber_); + + if (is.eof()) + { + BOOST_LOG_TRIVIAL(debug) << logPrefix_ << "Reached end of file"; + headerValid = false; + } + else + { + if (p->messageSize_ < 9) + { + BOOST_LOG_TRIVIAL(warning) + << logPrefix_ << "Invalid message size: " << p->messageSize_; + headerValid = false; + } + if (p->julianDate_ < 1) + { + BOOST_LOG_TRIVIAL(warning) + << logPrefix_ << "Invalid date: " << p->julianDate_; + headerValid = false; + } + if (p->millisecondsOfDay_ > 86'399'999u) + { + BOOST_LOG_TRIVIAL(warning) + << logPrefix_ << "Invalid milliseconds: " << p->millisecondsOfDay_; + headerValid = false; + } + if (p->messageSize_ < 65534 && + p->messageSegmentNumber_ > p->numberOfMessageSegments_) + { + BOOST_LOG_TRIVIAL(warning) + << logPrefix_ << "Invalid segment = " << p->messageSegmentNumber_ + << "/" << p->numberOfMessageSegments_; + headerValid = false; + } + } + + if (headerValid) + { + BOOST_LOG_TRIVIAL(trace) + << logPrefix_ + << "Message type: " << static_cast(p->messageType_); + } + + return headerValid; +} + +} // namespace rda +} // namespace wsr88d +} // namespace scwx diff --git a/wxdata/source/scwx/wsr88d/rpg/ar2v_file.cpp b/wxdata/source/scwx/wsr88d/rpg/ar2v_file.cpp index 382ed076..f3202cae 100644 --- a/wxdata/source/scwx/wsr88d/rpg/ar2v_file.cpp +++ b/wxdata/source/scwx/wsr88d/rpg/ar2v_file.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -33,18 +34,22 @@ public: julianDate_ {0}, milliseconds_ {0}, icao_(), - numRecords_ {0} {}; + numRecords_ {0}, + rawRecords_() {}; ~Ar2vFileImpl() = default; - void ParseLDMRecords(std::ifstream& f); + void LoadLDMRecords(std::ifstream& f); + void ParseLDMRecords(); std::string tapeFilename_; std::string extensionNumber_; - int32_t julianDate_; - int32_t milliseconds_; + uint32_t julianDate_; + uint32_t milliseconds_; std::string icao_; size_t numRecords_; + + std::list rawRecords_; }; Ar2vFile::Ar2vFile() : p(std::make_unique()) {} @@ -101,13 +106,13 @@ bool Ar2vFile::LoadFile(const std::string& filename) << logPrefix_ << "Time: " << p->milliseconds_; BOOST_LOG_TRIVIAL(debug) << logPrefix_ << "ICAO: " << p->icao_; - p->ParseLDMRecords(f); + p->LoadLDMRecords(f); } return fileValid; } -void Ar2vFileImpl::ParseLDMRecords(std::ifstream& f) +void Ar2vFileImpl::LoadLDMRecords(std::ifstream& f) { numRecords_ = 0; @@ -122,7 +127,7 @@ void Ar2vFileImpl::ParseLDMRecords(std::ifstream& f) controlWord = htonl(controlWord); recordSize = std::abs(controlWord); - BOOST_LOG_TRIVIAL(debug) + BOOST_LOG_TRIVIAL(trace) << logPrefix_ << "LDM Record Found: Size = " << recordSize << " bytes"; boost::iostreams::filtering_streambuf in; @@ -130,14 +135,15 @@ void Ar2vFileImpl::ParseLDMRecords(std::ifstream& f) in.push(boost::iostreams::bzip2_decompressor()); in.push(r); - std::ostringstream of; - try { - std::streamsize bytesCopied = boost::iostreams::copy(in, of); - BOOST_LOG_TRIVIAL(debug) + std::stringstream ss; + std::streamsize bytesCopied = boost::iostreams::copy(in, ss); + BOOST_LOG_TRIVIAL(trace) << logPrefix_ << "Decompressed record size = " << bytesCopied << " bytes"; + + rawRecords_.push_back(std::move(ss)); } catch (const boost::iostreams::bzip2_error& ex) { @@ -151,10 +157,64 @@ void Ar2vFileImpl::ParseLDMRecords(std::ifstream& f) ++numRecords_; } + ParseLDMRecords(); + BOOST_LOG_TRIVIAL(debug) << logPrefix_ << "Found " << numRecords_ << " LDM Records"; } +void Ar2vFileImpl::ParseLDMRecords() +{ + size_t count = 0; + + for (auto it = rawRecords_.begin(); it != rawRecords_.end(); it++) + { + std::stringstream& ss = *it; + + BOOST_LOG_TRIVIAL(trace) << logPrefix_ << "Record " << count++; + + // The communications manager inserts an extra 12 bytes at the beginning + // of each record + ss.seekg(12); + + while (!ss.eof()) + { + // TODO: Parse message, not just header + rda::MessageHeader header; + if (!header.Parse(ss)) + { + // Invalid header + break; + } + + // Seek to the end of the current message + ss.seekg(header.message_size() * 2 - rda::MessageHeader::SIZE, + std::ios_base::cur); + + off_t offset = 0; + uint16_t nextSize = 0u; + do + { + ss.read(reinterpret_cast(&nextSize), 2); + if (nextSize == 0) + { + offset += 2; + } + else + { + ss.seekg(-2, std::ios_base::cur); + } + } while (!ss.eof() && nextSize == 0u); + + if (!ss.eof() && offset != 0) + { + BOOST_LOG_TRIVIAL(trace) + << logPrefix_ << "Next record offset by " << offset << " bytes"; + } + } + } +} + } // namespace rpg } // namespace wsr88d } // namespace scwx diff --git a/wxdata/wxdata.cmake b/wxdata/wxdata.cmake index d4631139..3335dd05 100644 --- a/wxdata/wxdata.cmake +++ b/wxdata/wxdata.cmake @@ -6,14 +6,20 @@ set(HDR_UTIL include/scwx/util/rangebuf.hpp) set(SRC_UTIL source/scwx/util/rangebuf.cpp) set(HDR_WSR88D_RPG include/scwx/wsr88d/rpg/ar2v_file.hpp) set(SRC_WSR88D_RPG source/scwx/wsr88d/rpg/ar2v_file.cpp) +set(HDR_WSR88D_RDA include/scwx/wsr88d/rda/message_header.hpp) +set(SRC_WSR88D_RDA source/scwx/wsr88d/rda/message_header.cpp) add_library(wxdata OBJECT ${HDR_UTIL} ${SRC_UTIL} + ${HDR_WSR88D_RDA} + ${SRC_WSR88D_RDA} ${HDR_WSR88D_RPG} ${SRC_WSR88D_RPG}) source_group("Header Files\\util" FILES ${HDR_UTIL}) source_group("Source Files\\util" FILES ${SRC_UTIL}) +source_group("Header Files\\wsr88d\\rda" FILES ${HDR_WSR88D_RDA}) +source_group("Source Files\\wsr88d\\rda" FILES ${SRC_WSR88D_RDA}) source_group("Header Files\\wsr88d\\rpg" FILES ${HDR_WSR88D_RPG}) source_group("Source Files\\wsr88d\\rpg" FILES ${SRC_WSR88D_RPG})