Ignore message segments when first segment doesn't start at 1

This commit is contained in:
Dan Paulat 2024-01-19 23:37:06 -06:00
parent dbda117284
commit 53717434a6

View file

@ -53,6 +53,7 @@ struct Level2MessageFactory::Context
size_t bufferedSize_;
util::vectorbuf messageBuffer_;
std::istream messageBufferStream_;
bool bufferingData_ {false};
};
std::shared_ptr<Level2MessageFactory::Context>
@ -102,38 +103,51 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is,
// Estimate total message size
ctx->messageData_.resize(dataSize * totalSegments);
ctx->messageBufferStream_.clear();
ctx->bufferedSize_ = 0;
ctx->bufferedSize_ = 0;
ctx->bufferingData_ = true;
}
if (ctx->messageData_.capacity() < ctx->bufferedSize_ + dataSize)
else if (!ctx->bufferingData_)
{
logger_->debug("Bad size estimate, increasing size");
// Estimate remaining size
uint16_t remainingSegments =
std::max<uint16_t>(totalSegments - segment + 1, 100u);
size_t remainingSize = remainingSegments * dataSize;
ctx->messageData_.resize(ctx->bufferedSize_ + remainingSize);
}
is.read(ctx->messageData_.data() + ctx->bufferedSize_, dataSize);
ctx->bufferedSize_ += dataSize;
if (is.eof())
{
logger_->warn("End of file reached trying to buffer message");
// Segment number did not start at 1
logger_->trace("Ignoring Segment {}/{}, did not start at 1",
segment,
totalSegments);
info.messageValid = false;
ctx->messageData_.shrink_to_fit();
ctx->bufferedSize_ = 0;
}
else if (segment == totalSegments)
{
ctx->messageBuffer_.update_read_pointers(ctx->bufferedSize_);
header.set_message_size(static_cast<uint16_t>(
ctx->bufferedSize_ / 2 + Level2MessageHeader::SIZE));
messageStream = &ctx->messageBufferStream_;
if (ctx->bufferingData_)
{
if (ctx->messageData_.capacity() < ctx->bufferedSize_ + dataSize)
{
logger_->debug("Bad size estimate, increasing size");
// Estimate remaining size
uint16_t remainingSegments =
std::max<uint16_t>(totalSegments - segment + 1, 100u);
size_t remainingSize = remainingSegments * dataSize;
ctx->messageData_.resize(ctx->bufferedSize_ + remainingSize);
}
is.read(ctx->messageData_.data() + ctx->bufferedSize_, dataSize);
ctx->bufferedSize_ += dataSize;
if (is.eof())
{
logger_->warn("End of file reached trying to buffer message");
info.messageValid = false;
ctx->messageData_.shrink_to_fit();
ctx->bufferedSize_ = 0;
ctx->bufferingData_ = false;
}
else if (segment == totalSegments)
{
ctx->messageBuffer_.update_read_pointers(ctx->bufferedSize_);
header.set_message_size(static_cast<uint16_t>(
ctx->bufferedSize_ / 2 + Level2MessageHeader::SIZE));
messageStream = &ctx->messageBufferStream_;
}
}
}
@ -144,7 +158,8 @@ Level2MessageInfo Level2MessageFactory::Create(std::istream& is,
ctx->messageData_.resize(0);
ctx->messageData_.shrink_to_fit();
ctx->messageBufferStream_.clear();
ctx->bufferedSize_ = 0;
ctx->bufferedSize_ = 0;
ctx->bufferingData_ = false;
}
}
else if (info.headerValid)