diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py index dca6f9712fb..321edfcbc5b 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py @@ -19,6 +19,7 @@ # Etcd-based transport for the Calico/OpenStack Plugin. import collections +from datetime import datetime, timezone import json from oslo_log import log @@ -26,6 +27,18 @@ from networking_calico import datamodel_v2 from networking_calico import etcdutils from networking_calico.common import config as calico_config +from networking_calico.monotonic import monotonic_time + + +# If a Felix status update we receive from etcd has a "time" field more than this many +# seconds in the past, we are running behind and should warn the operator. Felix writes +# status updates every 30s by default, so anything materially above that indicates a +# processing backlog. +STALE_STATUS_WARN_SECS = 300 + +# Rate-limit stale-status warnings to at most one per this many seconds, to avoid +# flooding the log when every update in a large batch is stale. +STALE_STATUS_WARN_INTERVAL_SECS = 300 LOG = log.getLogger(__name__) @@ -79,6 +92,14 @@ def __init__(self, calico_driver): # deduplicate before passing on to the Neutron DB. self._felix_live_rev = {} + # Monotonic time of the last stale-status WARNING we logged. Used to rate-limit + # the warning so we do not flood the log when the whole cluster is backlogged. + # Initialised to -inf so the first stale-update encountered always passes the + # rate-limit check, regardless of system uptime: on Linux ``monotonic_time()`` + # is seconds-since-boot, so a 0.0 sentinel would suppress the first warning on + # any host with uptime below STALE_STATUS_WARN_INTERVAL_SECS. + self._last_stale_warn = float("-inf") + def _pre_snapshot_hook(self): # Save off current endpoint status, then reset current state, so we # will be able to identify any changes in the new snapshot. @@ -104,6 +125,67 @@ def _post_snapshot_hook(self, old_endpoints_by_host): ) self.processing_snapshot = False + def _check_for_stale_status(self, hostname, value): + """Warn the operator if we are processing materially stale updates. + + If the "time" field inside the status value is significantly older than + wall-clock now, this StatusWatcher is processing events slower than Felix is + producing them, and a backlog is building up. Left unaddressed this causes + Neutron to see agent up/down transitions hours after they actually happened. + Warn the operator so they can tune ReportingIntervalSecs / agent_down_time or + investigate why processing is slow. + + Rate-limited to one warning per ``STALE_STATUS_WARN_INTERVAL_SECS``. + """ + if self.processing_snapshot: + # During an initial-snapshot replay the "time" values will legitimately look + # old: Felix wrote them some time ago and we are only now reading the + # subtree. That is not evidence of a processing backlog -- skip the check + # in this case. + return + + status_time_str = value.get("time") + if not status_time_str: + return + + try: + # Felix writes the time in RFC3339 with a trailing "Z"; convert to a +00:00 + # offset for datetime.fromisoformat (which has only accepted the bare "Z" + # suffix since Python 3.11). + status_time = datetime.fromisoformat(status_time_str.replace("Z", "+00:00")) + except ValueError: + LOG.warning( + "Could not parse status time %r for host %s", + status_time_str, + hostname, + ) + return + + if status_time.tzinfo is None: + # Treat naive timestamps (no timezone info) as UTC so that the subtraction + # below does not raise TypeError. + status_time = status_time.replace(tzinfo=timezone.utc) + + lag = (datetime.now(tz=timezone.utc) - status_time).total_seconds() + if lag <= STALE_STATUS_WARN_SECS: + return + + now_mono = monotonic_time() + if now_mono - self._last_stale_warn < STALE_STATUS_WARN_INTERVAL_SECS: + return + + self._last_stale_warn = now_mono + LOG.warning( + "Processing stale Felix status update for host %s: the update" + " was written %.0fs ago (threshold %ds). StatusWatcher is not" + " keeping up with the rate of updates; consider raising" + " ReportingIntervalSecs and agent_down_time in Neutron / Felix" + " config.", + hostname, + lag, + STALE_STATUS_WARN_SECS, + ) + class AgentStatusWatcher(StatusWatcher): @@ -125,6 +207,7 @@ def _on_status_set(self, response, hostname): except (ValueError, TypeError): LOG.warning("Bad JSON data for key %s: %s", response.key, response.value) else: + self._check_for_stale_status(hostname, value) mod_revision = response.mod_revision if self._felix_live_rev.get(hostname) != mod_revision: self.calico_driver.on_felix_alive( diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py new file mode 100644 index 00000000000..3aedb0978e1 --- /dev/null +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2026 Tigera, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Targeted unit tests for StatusWatcher helpers that do not need the full plugin-etcd test +harness. See test_plugin_etcd.py for end-to-end watcher tests. +""" +from datetime import datetime, timedelta, timezone +import unittest + +import mock + +from networking_calico.plugins.ml2.drivers.calico import status + + +class TestCheckForStaleStatus(unittest.TestCase): + """Exercise StatusWatcher._check_for_stale_status in isolation. + + The real __init__ pulls in config and an EtcdWatcher; we skip it via __new__ and set + only the attributes the method reads. + """ + + def setUp(self): + super(TestCheckForStaleStatus, self).setUp() + self.watcher = status.StatusWatcher.__new__(status.StatusWatcher) + # Match the production sentinel so the rate-limit check does not eat + # the first stale-update warning on hosts whose uptime is below + # STALE_STATUS_WARN_INTERVAL_SECS. + self.watcher._last_stale_warn = float("-inf") + self.watcher.processing_snapshot = False + + def _fmt(self, dt): + return dt.strftime("%Y-%m-%dT%H:%M:%SZ") + + def test_fresh_update_does_not_warn(self): + fresh = self._fmt(datetime.now(tz=timezone.utc)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": fresh}) + m_warn.assert_not_called() + self.assertEqual(float("-inf"), self.watcher._last_stale_warn) + + def test_stale_update_warns(self): + stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + m_warn.assert_called_once() + # First positional arg of the single call is the log format string. + self.assertIn("stale Felix status update", m_warn.call_args.args[0]) + self.assertGreater(self.watcher._last_stale_warn, float("-inf")) + + def test_stale_update_is_rate_limited(self): + stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + self.watcher._check_for_stale_status("host2", {"time": stale}) + self.watcher._check_for_stale_status("host3", {"time": stale}) + # Only one warning across several stale updates within the rate-limit window. + self.assertEqual(1, m_warn.call_count) + + def test_snapshot_processing_skips_check(self): + self.watcher.processing_snapshot = True + stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + m_warn.assert_not_called() + + def test_missing_time_field_is_silent(self): + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status( + "host1", {"uptime": 10, "first_update": False} + ) + m_warn.assert_not_called() + + def test_stale_naive_timestamp_warns(self): + """A timezone-less timestamp (no trailing Z) should not crash.""" + stale = (datetime.now(tz=timezone.utc) - timedelta(hours=1)).strftime( + "%Y-%m-%dT%H:%M:%S" + ) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + m_warn.assert_called_once() + self.assertIn("stale Felix status update", m_warn.call_args.args[0]) + + def test_unparseable_time_logs_separate_warning(self): + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": "not a date"}) + m_warn.assert_called_once() + self.assertIn("Could not parse status time", m_warn.call_args.args[0]) + # An unparseable time does not count as a stale-status warning for rate-limiting + # purposes. + self.assertEqual(float("-inf"), self.watcher._last_stale_warn)