Fixing concurrency issue when parsing level 2 data

This commit is contained in:
Dan Paulat 2022-05-28 01:32:35 -05:00
parent 078b9c407c
commit be2f9fe674
3 changed files with 48 additions and 22 deletions

View file

@ -313,6 +313,8 @@ void Ar2vFileImpl::ParseLDMRecords()
void Ar2vFileImpl::ParseLDMRecord(std::istream& is)
{
auto ctx = rda::Level2MessageFactory::CreateContext();
// The communications manager inserts an extra 12 bytes at the beginning
// of each record
is.seekg(12, std::ios_base::cur);
@ -343,7 +345,8 @@ void Ar2vFileImpl::ParseLDMRecord(std::istream& is)
break;
}
rda::Level2MessageInfo msgInfo = rda::Level2MessageFactory::Create(is);
rda::Level2MessageInfo msgInfo =
rda::Level2MessageFactory::Create(is, ctx);
if (!msgInfo.headerValid)
{
// Invalid message

View file

@ -37,12 +37,30 @@ static const std::unordered_map<uint8_t, CreateLevel2MessageFunction> create_ {
{18, RdaAdaptationData::Create},
{31, DigitalRadarData::Create}};
static std::vector<char> messageData_;
static size_t bufferedSize_;
static util::vectorbuf messageBuffer_(messageData_);
static std::istream messageBufferStream_(&messageBuffer_);
struct Level2MessageFactory::Context
{
Context() :
messageData_ {},
bufferedSize_ {},
messageBuffer_ {messageData_},
messageBufferStream_ {&messageBuffer_}
{
}
Level2MessageInfo Level2MessageFactory::Create(std::istream& is)
std::vector<char> messageData_;
size_t bufferedSize_;
util::vectorbuf messageBuffer_;
std::istream messageBufferStream_;
};
std::shared_ptr<Level2MessageFactory::Context>
Level2MessageFactory::CreateContext()
{
return std::make_shared<Context>();
}
Level2MessageInfo Level2MessageFactory::Create(std::istream& is,
std::shared_ptr<Context> ctx)
{
Level2MessageInfo info;
Level2MessageHeader header;
@ -80,12 +98,12 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is)
if (segment == 1)
{
// Estimate total message size
messageData_.resize(dataSize * totalSegments);
messageBufferStream_.clear();
bufferedSize_ = 0;
ctx->messageData_.resize(dataSize * totalSegments);
ctx->messageBufferStream_.clear();
ctx->bufferedSize_ = 0;
}
if (messageData_.capacity() < bufferedSize_ + dataSize)
if (ctx->messageData_.capacity() < ctx->bufferedSize_ + dataSize)
{
logger_->debug("Bad size estimate, increasing size");
@ -94,26 +112,26 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is)
std::max<uint16_t>(totalSegments - segment + 1, 100u);
size_t remainingSize = remainingSegments * dataSize;
messageData_.resize(bufferedSize_ + remainingSize);
ctx->messageData_.resize(ctx->bufferedSize_ + remainingSize);
}
is.read(messageData_.data() + bufferedSize_, dataSize);
bufferedSize_ += dataSize;
is.read(ctx->messageData_.data() + ctx->bufferedSize_, dataSize);
ctx->bufferedSize_ += dataSize;
if (is.eof())
{
logger_->warn("End of file reached trying to buffer message");
info.messageValid = false;
messageData_.shrink_to_fit();
bufferedSize_ = 0;
ctx->messageData_.shrink_to_fit();
ctx->bufferedSize_ = 0;
}
else if (segment == totalSegments)
{
messageBuffer_.update_read_pointers(bufferedSize_);
ctx->messageBuffer_.update_read_pointers(ctx->bufferedSize_);
header.set_message_size(static_cast<uint16_t>(
bufferedSize_ / 2 + Level2MessageHeader::SIZE));
ctx->bufferedSize_ / 2 + Level2MessageHeader::SIZE));
messageStream = &messageBufferStream_;
messageStream = &ctx->messageBufferStream_;
}
}
@ -121,9 +139,10 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is)
{
info.message =
create_.at(messageType)(std::move(header), *messageStream);
messageData_.shrink_to_fit();
messageBufferStream_.clear();
bufferedSize_ = 0;
ctx->messageData_.resize(0);
ctx->messageData_.shrink_to_fit();
ctx->messageBufferStream_.clear();
ctx->bufferedSize_ = 0;
}
}
else if (info.headerValid)