From dfb00b96df4a9f1113ff744ab3d14c42147d777d Mon Sep 17 00:00:00 2001 From: Dan Paulat Date: Sun, 10 Aug 2025 18:01:45 -0500 Subject: [PATCH] NTP polling --- test/source/scwx/network/ntp_client.test.cpp | 22 +- wxdata/include/scwx/network/ntp_client.hpp | 15 +- wxdata/source/scwx/network/ntp_client.cpp | 276 +++++++++++++++++-- 3 files changed, 281 insertions(+), 32 deletions(-) diff --git a/test/source/scwx/network/ntp_client.test.cpp b/test/source/scwx/network/ntp_client.test.cpp index cebd8cc2..bdfcb4ae 100644 --- a/test/source/scwx/network/ntp_client.test.cpp +++ b/test/source/scwx/network/ntp_client.test.cpp @@ -11,10 +11,24 @@ TEST(NtpClient, Poll) { NtpClient client {}; - client.Open("time.nist.gov", "123"); - //client.Open("pool.ntp.org", "123"); - //client.Open("time.windows.com", "123"); - client.Poll(); + const std::string firstServer = client.RotateServer(); + std::string currentServer = firstServer; + std::string lastServer = firstServer; + bool error = false; + + do + { + client.RunOnce(); + error = client.error(); + + EXPECT_EQ(error, false); + + // Loop until the current server repeats the first server, or fails to + // rotate + lastServer = currentServer; + currentServer = client.RotateServer(); + } while (currentServer != firstServer && currentServer != lastServer && + !error); } } // namespace network diff --git a/wxdata/include/scwx/network/ntp_client.hpp b/wxdata/include/scwx/network/ntp_client.hpp index 55ef4204..6a0a78b7 100644 --- a/wxdata/include/scwx/network/ntp_client.hpp +++ b/wxdata/include/scwx/network/ntp_client.hpp @@ -1,6 +1,8 @@ #pragma once +#include #include +#include #include namespace scwx::network @@ -21,8 +23,17 @@ public: NtpClient(NtpClient&&) noexcept; NtpClient& operator=(NtpClient&&) noexcept; - void Open(std::string_view host, std::string_view service); - void Poll(); + bool error(); + std::chrono::system_clock::duration time_offset() const; + + void Start(); + void Open(std::string_view host, std::string_view service); + void OpenCurrentServer(); + void Poll(); + std::string RotateServer(); + void RunOnce(); + + static std::shared_ptr Instance(); private: class Impl; diff --git a/wxdata/source/scwx/network/ntp_client.cpp b/wxdata/source/scwx/network/ntp_client.cpp index a20540c4..89aba5d0 100644 --- a/wxdata/source/scwx/network/ntp_client.cpp +++ b/wxdata/source/scwx/network/ntp_client.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -16,6 +17,12 @@ static const auto logger_ = scwx::util::Logger::Create(logPrefix_); static constexpr std::size_t kReceiveBufferSize_ {48u}; +// Reasonable min/max values for polling intervals. We don't want to poll too +// quickly and upset the server, but we don't want to poll too slowly in the +// event of a time jump. +static constexpr std::uint32_t kMinPollInterval_ = 6u; // 2^6 = 64 seconds +static constexpr std::uint32_t kMaxPollInterval_ = 9u; // 2^9 = 512 seconds + class NtpTimestamp { public: @@ -97,28 +104,42 @@ public: Impl(Impl&&) = delete; Impl& operator=(Impl&&) = delete; - void Open(std::string_view host, std::string_view service); - void Poll(); - void ReceivePacket(std::size_t length); + void Open(std::string_view host, std::string_view service); + void OpenCurrentServer(); + void Poll(); + void ReceivePacket(std::size_t length); + std::string RotateServer(); + void Run(); + void RunOnce(); boost::asio::thread_pool threadPool_ {2u}; - bool enabled_; + boost::asio::steady_timer pollTimer_ {threadPool_}; + std::uint32_t pollInterval_ {kMinPollInterval_}; + + bool enabled_ {true}; + bool error_ {false}; + bool disableServer_ {false}; + bool rotateServer_ {false}; types::ntp::NtpPacket transmitPacket_ {}; - boost::asio::ip::udp::socket socket_; + boost::asio::ip::udp::socket socket_ {threadPool_}; std::optional serverEndpoint_ {}; std::array receiveBuffer_ {}; std::chrono::system_clock::duration timeOffset_ {}; - std::vector serverList_ {"time.nist.gov", - "time.cloudflare.com", - "ntp.pool.org", - "time.aws.com", - "time.windows.com", - "time.apple.com"}; + const std::vector serverList_ {"time.nist.gov", + "time.cloudflare.com", + "pool.ntp.org", + "time.aws.com", + "time.windows.com", + "time.apple.com"}; + std::vector disabledServers_ {}; + + std::vector::const_iterator currentServer_ = + serverList_.begin(); }; NtpClient::NtpClient() : p(std::make_unique()) {} @@ -127,17 +148,7 @@ NtpClient::~NtpClient() = default; NtpClient::NtpClient(NtpClient&&) noexcept = default; NtpClient& NtpClient::operator=(NtpClient&&) noexcept = default; -void NtpClient::Open(std::string_view host, std::string_view service) -{ - p->Open(host, service); -} - -void NtpClient::Poll() -{ - p->Poll(); -} - -NtpClient::Impl::Impl() : socket_ {threadPool_} +NtpClient::Impl::Impl() { using namespace std::chrono_literals; @@ -145,8 +156,8 @@ NtpClient::Impl::Impl() : socket_ {threadPool_} std::chrono::floor(std::chrono::system_clock::now()); // The NTP timestamp will overflow in 2036. Overflow is handled in such a way - // that should work until 2106. Additional handling for subsequent eras is - // required. + // that dates prior to 1970 result in a Unix timestamp after 2036. Additional + // handling for the year 2106 and subsequent eras is required. static constexpr auto kMaxYear_ = 2106y; enabled_ = now < kMaxYear_ / 1 / 1; @@ -160,6 +171,51 @@ NtpClient::Impl::~Impl() threadPool_.join(); } +bool NtpClient::error() +{ + bool returnValue = p->error_; + p->error_ = false; + return returnValue; +} + +std::chrono::system_clock::duration NtpClient::time_offset() const +{ + return p->timeOffset_; +} + +void NtpClient::Start() +{ + if (p->enabled_) + { + boost::asio::post(p->threadPool_, [this]() { p->Run(); }); + } +} + +void NtpClient::Open(std::string_view host, std::string_view service) +{ + p->Open(host, service); +} + +void NtpClient::OpenCurrentServer() +{ + p->OpenCurrentServer(); +} + +void NtpClient::Poll() +{ + p->Poll(); +} + +std::string NtpClient::RotateServer() +{ + return p->RotateServer(); +} + +void NtpClient::RunOnce() +{ + p->RunOnce(); +} + void NtpClient::Impl::Open(std::string_view host, std::string_view service) { boost::asio::ip::udp::resolver resolver(threadPool_); @@ -176,9 +232,15 @@ void NtpClient::Impl::Open(std::string_view host, std::string_view service) { serverEndpoint_ = std::nullopt; logger_->warn("Could not resolve host {}: {}", host, ec.message()); + rotateServer_ = true; } } +void NtpClient::Impl::OpenCurrentServer() +{ + Open(*currentServer_, "123"); +} + void NtpClient::Impl::Poll() { using namespace std::chrono_literals; @@ -215,6 +277,7 @@ void NtpClient::Impl::Poll() case std::future_status::deferred: logger_->warn("Timeout waiting for NTP response"); socket_.cancel(); + error_ = true; break; } } @@ -242,17 +305,29 @@ void NtpClient::Impl::ReceivePacket(std::size_t length) if (kod == "DENY" || kod == "RSTR") { - // TODO // The client MUST demobilize any associations to that server and // stop sending packets to that server + disableServer_ = true; } else if (kod == "RATE") { - // TODO // The client MUST immediately reduce its polling interval to that // server and continue to reduce it each time it receives a RATE // kiss code + if (pollInterval_ < kMaxPollInterval_) + { + ++pollInterval_; + } + else + { + // The server wants us to reduce the polling interval lower than + // what we deem useful. Move to the next server. + rotateServer_ = true; + } } + + // Consider a KoD packet an error + error_ = true; } else { @@ -276,12 +351,161 @@ void NtpClient::Impl::ReceivePacket(std::size_t length) timeOffset_ = ((t1 - t0) + (t2 - t3)) / 2; logger_->debug("Time offset updated: {:%jd %T}", timeOffset_); + + // TODO: Signal } } else { logger_->warn("Received too few bytes: {}", length); + error_ = true; } } +std::string NtpClient::Impl::RotateServer() +{ + socket_.close(); + + bool newServerFound = false; + + // Save the current server + const auto oldServer = currentServer_; + + while (!newServerFound) + { + // Increment the current server + ++currentServer_; + + // If we are at the end of the list, start over at the beginning + if (currentServer_ == serverList_.end()) + { + currentServer_ = serverList_.begin(); + } + + // If we have reached the end of the list, give up + if (currentServer_ == oldServer) + { + enabled_ = false; + break; + } + + // If the current server is disabled, continue searching + while (std::find(disabledServers_.cbegin(), + disabledServers_.cend(), + *currentServer_) != disabledServers_.cend()) + { + continue; + } + + // A new server has been found + newServerFound = true; + } + + pollInterval_ = kMinPollInterval_; + rotateServer_ = false; + + return *currentServer_; +} + +void NtpClient::Impl::Run() +{ + RunOnce(); + + if (enabled_) + { + std::chrono::seconds pollIntervalSeconds {1u << pollInterval_}; + pollTimer_.expires_after(pollIntervalSeconds); + pollTimer_.async_wait( + [this](const boost::system::error_code& e) + { + if (e == boost::asio::error::operation_aborted) + { + logger_->debug("Poll timer cancelled"); + } + else if (e != boost::system::errc::success) + { + logger_->warn("Poll timer error: {}", e.message()); + } + else + { + try + { + Run(); + } + catch (const std::exception& ex) + { + logger_->error(ex.what()); + } + } + }); + } +} + +void NtpClient::Impl::RunOnce() +{ + if (disableServer_) + { + // Disable the current server + disabledServers_.push_back(*currentServer_); + + // Disable the NTP client if all servers are disabled + enabled_ = disabledServers_.size() == serverList_.size(); + + if (!enabled_) + { + error_ = true; + } + + disableServer_ = false; + rotateServer_ = enabled_; + } + + if (!enabled_ && socket_.is_open()) + { + // Sockets should be closed if the client is disabled + socket_.close(); + } + + if (rotateServer_) + { + // Rotate the server if requested + RotateServer(); + } + + if (enabled_ && !socket_.is_open()) + { + // Open the current server if it is not open + OpenCurrentServer(); + } + + if (socket_.is_open()) + { + // Send an NTP message to determine the current time offset + Poll(); + } + else if (enabled_) + { + // Did not poll this frame + error_ = true; + } +} + +std::shared_ptr NtpClient::Instance() +{ + static std::weak_ptr ntpClientReference_ {}; + static std::mutex instanceMutex_ {}; + + std::unique_lock lock(instanceMutex_); + + std::shared_ptr ntpClient = ntpClientReference_.lock(); + + if (ntpClient == nullptr) + { + ntpClient = std::make_shared(); + ntpClientReference_ = ntpClient; + } + + return ntpClient; +} + } // namespace scwx::network