NTP polling

This commit is contained in:
Dan Paulat 2025-08-10 18:01:45 -05:00
parent 258466e02c
commit dfb00b96df
3 changed files with 281 additions and 32 deletions

View file

@ -1,6 +1,8 @@
#pragma once
#include <chrono>
#include <memory>
#include <string>
#include <string_view>
namespace scwx::network
@ -21,8 +23,17 @@ public:
NtpClient(NtpClient&&) noexcept;
NtpClient& operator=(NtpClient&&) noexcept;
void Open(std::string_view host, std::string_view service);
void Poll();
bool error();
std::chrono::system_clock::duration time_offset() const;
void Start();
void Open(std::string_view host, std::string_view service);
void OpenCurrentServer();
void Poll();
std::string RotateServer();
void RunOnce();
static std::shared_ptr<NtpClient> Instance();
private:
class Impl;

View file

@ -4,6 +4,7 @@
#include <scwx/util/threads.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/use_future.hpp>
#include <fmt/chrono.h>
@ -16,6 +17,12 @@ static const auto logger_ = scwx::util::Logger::Create(logPrefix_);
static constexpr std::size_t kReceiveBufferSize_ {48u};
// Reasonable min/max values for polling intervals. We don't want to poll too
// quickly and upset the server, but we don't want to poll too slowly in the
// event of a time jump.
static constexpr std::uint32_t kMinPollInterval_ = 6u; // 2^6 = 64 seconds
static constexpr std::uint32_t kMaxPollInterval_ = 9u; // 2^9 = 512 seconds
class NtpTimestamp
{
public:
@ -97,28 +104,42 @@ public:
Impl(Impl&&) = delete;
Impl& operator=(Impl&&) = delete;
void Open(std::string_view host, std::string_view service);
void Poll();
void ReceivePacket(std::size_t length);
void Open(std::string_view host, std::string_view service);
void OpenCurrentServer();
void Poll();
void ReceivePacket(std::size_t length);
std::string RotateServer();
void Run();
void RunOnce();
boost::asio::thread_pool threadPool_ {2u};
bool enabled_;
boost::asio::steady_timer pollTimer_ {threadPool_};
std::uint32_t pollInterval_ {kMinPollInterval_};
bool enabled_ {true};
bool error_ {false};
bool disableServer_ {false};
bool rotateServer_ {false};
types::ntp::NtpPacket transmitPacket_ {};
boost::asio::ip::udp::socket socket_;
boost::asio::ip::udp::socket socket_ {threadPool_};
std::optional<boost::asio::ip::udp::endpoint> serverEndpoint_ {};
std::array<std::uint8_t, kReceiveBufferSize_> receiveBuffer_ {};
std::chrono::system_clock::duration timeOffset_ {};
std::vector<std::string> serverList_ {"time.nist.gov",
"time.cloudflare.com",
"ntp.pool.org",
"time.aws.com",
"time.windows.com",
"time.apple.com"};
const std::vector<std::string> serverList_ {"time.nist.gov",
"time.cloudflare.com",
"pool.ntp.org",
"time.aws.com",
"time.windows.com",
"time.apple.com"};
std::vector<std::string> disabledServers_ {};
std::vector<std::string>::const_iterator currentServer_ =
serverList_.begin();
};
NtpClient::NtpClient() : p(std::make_unique<Impl>()) {}
@ -127,17 +148,7 @@ NtpClient::~NtpClient() = default;
NtpClient::NtpClient(NtpClient&&) noexcept = default;
NtpClient& NtpClient::operator=(NtpClient&&) noexcept = default;
void NtpClient::Open(std::string_view host, std::string_view service)
{
p->Open(host, service);
}
void NtpClient::Poll()
{
p->Poll();
}
NtpClient::Impl::Impl() : socket_ {threadPool_}
NtpClient::Impl::Impl()
{
using namespace std::chrono_literals;
@ -145,8 +156,8 @@ NtpClient::Impl::Impl() : socket_ {threadPool_}
std::chrono::floor<std::chrono::days>(std::chrono::system_clock::now());
// The NTP timestamp will overflow in 2036. Overflow is handled in such a way
// that should work until 2106. Additional handling for subsequent eras is
// required.
// that dates prior to 1970 result in a Unix timestamp after 2036. Additional
// handling for the year 2106 and subsequent eras is required.
static constexpr auto kMaxYear_ = 2106y;
enabled_ = now < kMaxYear_ / 1 / 1;
@ -160,6 +171,51 @@ NtpClient::Impl::~Impl()
threadPool_.join();
}
bool NtpClient::error()
{
bool returnValue = p->error_;
p->error_ = false;
return returnValue;
}
std::chrono::system_clock::duration NtpClient::time_offset() const
{
return p->timeOffset_;
}
void NtpClient::Start()
{
if (p->enabled_)
{
boost::asio::post(p->threadPool_, [this]() { p->Run(); });
}
}
void NtpClient::Open(std::string_view host, std::string_view service)
{
p->Open(host, service);
}
void NtpClient::OpenCurrentServer()
{
p->OpenCurrentServer();
}
void NtpClient::Poll()
{
p->Poll();
}
std::string NtpClient::RotateServer()
{
return p->RotateServer();
}
void NtpClient::RunOnce()
{
p->RunOnce();
}
void NtpClient::Impl::Open(std::string_view host, std::string_view service)
{
boost::asio::ip::udp::resolver resolver(threadPool_);
@ -176,9 +232,15 @@ void NtpClient::Impl::Open(std::string_view host, std::string_view service)
{
serverEndpoint_ = std::nullopt;
logger_->warn("Could not resolve host {}: {}", host, ec.message());
rotateServer_ = true;
}
}
void NtpClient::Impl::OpenCurrentServer()
{
Open(*currentServer_, "123");
}
void NtpClient::Impl::Poll()
{
using namespace std::chrono_literals;
@ -215,6 +277,7 @@ void NtpClient::Impl::Poll()
case std::future_status::deferred:
logger_->warn("Timeout waiting for NTP response");
socket_.cancel();
error_ = true;
break;
}
}
@ -242,17 +305,29 @@ void NtpClient::Impl::ReceivePacket(std::size_t length)
if (kod == "DENY" || kod == "RSTR")
{
// TODO
// The client MUST demobilize any associations to that server and
// stop sending packets to that server
disableServer_ = true;
}
else if (kod == "RATE")
{
// TODO
// The client MUST immediately reduce its polling interval to that
// server and continue to reduce it each time it receives a RATE
// kiss code
if (pollInterval_ < kMaxPollInterval_)
{
++pollInterval_;
}
else
{
// The server wants us to reduce the polling interval lower than
// what we deem useful. Move to the next server.
rotateServer_ = true;
}
}
// Consider a KoD packet an error
error_ = true;
}
else
{
@ -276,12 +351,161 @@ void NtpClient::Impl::ReceivePacket(std::size_t length)
timeOffset_ = ((t1 - t0) + (t2 - t3)) / 2;
logger_->debug("Time offset updated: {:%jd %T}", timeOffset_);
// TODO: Signal
}
}
else
{
logger_->warn("Received too few bytes: {}", length);
error_ = true;
}
}
std::string NtpClient::Impl::RotateServer()
{
socket_.close();
bool newServerFound = false;
// Save the current server
const auto oldServer = currentServer_;
while (!newServerFound)
{
// Increment the current server
++currentServer_;
// If we are at the end of the list, start over at the beginning
if (currentServer_ == serverList_.end())
{
currentServer_ = serverList_.begin();
}
// If we have reached the end of the list, give up
if (currentServer_ == oldServer)
{
enabled_ = false;
break;
}
// If the current server is disabled, continue searching
while (std::find(disabledServers_.cbegin(),
disabledServers_.cend(),
*currentServer_) != disabledServers_.cend())
{
continue;
}
// A new server has been found
newServerFound = true;
}
pollInterval_ = kMinPollInterval_;
rotateServer_ = false;
return *currentServer_;
}
void NtpClient::Impl::Run()
{
RunOnce();
if (enabled_)
{
std::chrono::seconds pollIntervalSeconds {1u << pollInterval_};
pollTimer_.expires_after(pollIntervalSeconds);
pollTimer_.async_wait(
[this](const boost::system::error_code& e)
{
if (e == boost::asio::error::operation_aborted)
{
logger_->debug("Poll timer cancelled");
}
else if (e != boost::system::errc::success)
{
logger_->warn("Poll timer error: {}", e.message());
}
else
{
try
{
Run();
}
catch (const std::exception& ex)
{
logger_->error(ex.what());
}
}
});
}
}
void NtpClient::Impl::RunOnce()
{
if (disableServer_)
{
// Disable the current server
disabledServers_.push_back(*currentServer_);
// Disable the NTP client if all servers are disabled
enabled_ = disabledServers_.size() == serverList_.size();
if (!enabled_)
{
error_ = true;
}
disableServer_ = false;
rotateServer_ = enabled_;
}
if (!enabled_ && socket_.is_open())
{
// Sockets should be closed if the client is disabled
socket_.close();
}
if (rotateServer_)
{
// Rotate the server if requested
RotateServer();
}
if (enabled_ && !socket_.is_open())
{
// Open the current server if it is not open
OpenCurrentServer();
}
if (socket_.is_open())
{
// Send an NTP message to determine the current time offset
Poll();
}
else if (enabled_)
{
// Did not poll this frame
error_ = true;
}
}
std::shared_ptr<NtpClient> NtpClient::Instance()
{
static std::weak_ptr<NtpClient> ntpClientReference_ {};
static std::mutex instanceMutex_ {};
std::unique_lock lock(instanceMutex_);
std::shared_ptr<NtpClient> ntpClient = ntpClientReference_.lock();
if (ntpClient == nullptr)
{
ntpClient = std::make_shared<NtpClient>();
ntpClientReference_ = ntpClient;
}
return ntpClient;
}
} // namespace scwx::network