From be2f9fe6748ec3c84c0efcdcacfafbafba9b6db6 Mon Sep 17 00:00:00 2001 From: Dan Paulat Date: Sat, 28 May 2022 01:32:35 -0500 Subject: [PATCH] Fixing concurrency issue when parsing level 2 data --- .../wsr88d/rda/level2_message_factory.hpp | 6 +- wxdata/source/scwx/wsr88d/ar2v_file.cpp | 5 +- .../wsr88d/rda/level2_message_factory.cpp | 59 ++++++++++++------- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/wxdata/include/scwx/wsr88d/rda/level2_message_factory.hpp b/wxdata/include/scwx/wsr88d/rda/level2_message_factory.hpp index 2e1690ec..a6d87c54 100644 --- a/wxdata/include/scwx/wsr88d/rda/level2_message_factory.hpp +++ b/wxdata/include/scwx/wsr88d/rda/level2_message_factory.hpp @@ -34,7 +34,11 @@ private: Level2MessageFactory& operator=(Level2MessageFactory&&) noexcept = delete; public: - static Level2MessageInfo Create(std::istream& is); + struct Context; + + static std::shared_ptr CreateContext(); + static Level2MessageInfo Create(std::istream& is, + std::shared_ptr ctx); }; } // namespace rda diff --git a/wxdata/source/scwx/wsr88d/ar2v_file.cpp b/wxdata/source/scwx/wsr88d/ar2v_file.cpp index 7b064de9..72f501ff 100644 --- a/wxdata/source/scwx/wsr88d/ar2v_file.cpp +++ b/wxdata/source/scwx/wsr88d/ar2v_file.cpp @@ -313,6 +313,8 @@ void Ar2vFileImpl::ParseLDMRecords() void Ar2vFileImpl::ParseLDMRecord(std::istream& is) { + auto ctx = rda::Level2MessageFactory::CreateContext(); + // The communications manager inserts an extra 12 bytes at the beginning // of each record is.seekg(12, std::ios_base::cur); @@ -343,7 +345,8 @@ void Ar2vFileImpl::ParseLDMRecord(std::istream& is) break; } - rda::Level2MessageInfo msgInfo = rda::Level2MessageFactory::Create(is); + rda::Level2MessageInfo msgInfo = + rda::Level2MessageFactory::Create(is, ctx); if (!msgInfo.headerValid) { // Invalid message diff --git a/wxdata/source/scwx/wsr88d/rda/level2_message_factory.cpp b/wxdata/source/scwx/wsr88d/rda/level2_message_factory.cpp index f56df578..520a3c43 100644 --- a/wxdata/source/scwx/wsr88d/rda/level2_message_factory.cpp +++ b/wxdata/source/scwx/wsr88d/rda/level2_message_factory.cpp @@ -37,12 +37,30 @@ static const std::unordered_map create_ { {18, RdaAdaptationData::Create}, {31, DigitalRadarData::Create}}; -static std::vector messageData_; -static size_t bufferedSize_; -static util::vectorbuf messageBuffer_(messageData_); -static std::istream messageBufferStream_(&messageBuffer_); +struct Level2MessageFactory::Context +{ + Context() : + messageData_ {}, + bufferedSize_ {}, + messageBuffer_ {messageData_}, + messageBufferStream_ {&messageBuffer_} + { + } -Level2MessageInfo Level2MessageFactory::Create(std::istream& is) + std::vector messageData_; + size_t bufferedSize_; + util::vectorbuf messageBuffer_; + std::istream messageBufferStream_; +}; + +std::shared_ptr +Level2MessageFactory::CreateContext() +{ + return std::make_shared(); +} + +Level2MessageInfo Level2MessageFactory::Create(std::istream& is, + std::shared_ptr ctx) { Level2MessageInfo info; Level2MessageHeader header; @@ -80,12 +98,12 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is) if (segment == 1) { // Estimate total message size - messageData_.resize(dataSize * totalSegments); - messageBufferStream_.clear(); - bufferedSize_ = 0; + ctx->messageData_.resize(dataSize * totalSegments); + ctx->messageBufferStream_.clear(); + ctx->bufferedSize_ = 0; } - if (messageData_.capacity() < bufferedSize_ + dataSize) + if (ctx->messageData_.capacity() < ctx->bufferedSize_ + dataSize) { logger_->debug("Bad size estimate, increasing size"); @@ -94,26 +112,26 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is) std::max(totalSegments - segment + 1, 100u); size_t remainingSize = remainingSegments * dataSize; - messageData_.resize(bufferedSize_ + remainingSize); + ctx->messageData_.resize(ctx->bufferedSize_ + remainingSize); } - is.read(messageData_.data() + bufferedSize_, dataSize); - bufferedSize_ += dataSize; + is.read(ctx->messageData_.data() + ctx->bufferedSize_, dataSize); + ctx->bufferedSize_ += dataSize; if (is.eof()) { logger_->warn("End of file reached trying to buffer message"); info.messageValid = false; - messageData_.shrink_to_fit(); - bufferedSize_ = 0; + ctx->messageData_.shrink_to_fit(); + ctx->bufferedSize_ = 0; } else if (segment == totalSegments) { - messageBuffer_.update_read_pointers(bufferedSize_); + ctx->messageBuffer_.update_read_pointers(ctx->bufferedSize_); header.set_message_size(static_cast( - bufferedSize_ / 2 + Level2MessageHeader::SIZE)); + ctx->bufferedSize_ / 2 + Level2MessageHeader::SIZE)); - messageStream = &messageBufferStream_; + messageStream = &ctx->messageBufferStream_; } } @@ -121,9 +139,10 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is) { info.message = create_.at(messageType)(std::move(header), *messageStream); - messageData_.shrink_to_fit(); - messageBufferStream_.clear(); - bufferedSize_ = 0; + ctx->messageData_.resize(0); + ctx->messageData_.shrink_to_fit(); + ctx->messageBufferStream_.clear(); + ctx->bufferedSize_ = 0; } } else if (info.headerValid)