Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@
# Etcd-based transport for the Calico/OpenStack Plugin.

import collections
from datetime import datetime, timezone
import json

from oslo_log import log

from networking_calico import datamodel_v2
from networking_calico import etcdutils
from networking_calico.common import config as calico_config
from networking_calico.monotonic import monotonic_time


# If a Felix status update we receive from etcd has a "time" field more than this many
# seconds in the past, we are running behind and should warn the operator. Felix writes
# status updates every 30s by default, so anything materially above that indicates a
# processing backlog.
STALE_STATUS_WARN_SECS = 300

# Rate-limit stale-status warnings to at most one per this many seconds, to avoid
# flooding the log when every update in a large batch is stale.
STALE_STATUS_WARN_INTERVAL_SECS = 300


LOG = log.getLogger(__name__)
Expand Down Expand Up @@ -79,6 +92,14 @@ def __init__(self, calico_driver):
# deduplicate before passing on to the Neutron DB.
self._felix_live_rev = {}

# Monotonic time of the last stale-status WARNING we logged. Used to rate-limit
# the warning so we do not flood the log when the whole cluster is backlogged.
# Initialised to -inf so the first stale-update encountered always passes the
# rate-limit check, regardless of system uptime: on Linux ``monotonic_time()``
# is seconds-since-boot, so a 0.0 sentinel would suppress the first warning on
# any host with uptime below STALE_STATUS_WARN_INTERVAL_SECS.
self._last_stale_warn = float("-inf")

def _pre_snapshot_hook(self):
# Save off current endpoint status, then reset current state, so we
# will be able to identify any changes in the new snapshot.
Expand All @@ -104,6 +125,67 @@ def _post_snapshot_hook(self, old_endpoints_by_host):
)
self.processing_snapshot = False

def _check_for_stale_status(self, hostname, value):
"""Warn the operator if we are processing materially stale updates.

If the "time" field inside the status value is significantly older than
wall-clock now, this StatusWatcher is processing events slower than Felix is
producing them, and a backlog is building up. Left unaddressed this causes
Neutron to see agent up/down transitions hours after they actually happened.
Warn the operator so they can tune ReportingIntervalSecs / agent_down_time or
investigate why processing is slow.

Rate-limited to one warning per ``STALE_STATUS_WARN_INTERVAL_SECS``.
"""
if self.processing_snapshot:
# During an initial-snapshot replay the "time" values will legitimately look
# old: Felix wrote them some time ago and we are only now reading the
# subtree. That is not evidence of a processing backlog -- skip the check
# in this case.
return

status_time_str = value.get("time")
if not status_time_str:
return

try:
# Felix writes the time in RFC3339 with a trailing "Z"; convert to a +00:00
# offset for datetime.fromisoformat (which has only accepted the bare "Z"
# suffix since Python 3.11).
status_time = datetime.fromisoformat(status_time_str.replace("Z", "+00:00"))
except ValueError:
LOG.warning(
"Could not parse status time %r for host %s",
status_time_str,
hostname,
)
return

if status_time.tzinfo is None:
# Treat naive timestamps (no timezone info) as UTC so that the subtraction
# below does not raise TypeError.
status_time = status_time.replace(tzinfo=timezone.utc)

lag = (datetime.now(tz=timezone.utc) - status_time).total_seconds()
if lag <= STALE_STATUS_WARN_SECS:
return

now_mono = monotonic_time()
if now_mono - self._last_stale_warn < STALE_STATUS_WARN_INTERVAL_SECS:
return

self._last_stale_warn = now_mono
LOG.warning(
"Processing stale Felix status update for host %s: the update"
" was written %.0fs ago (threshold %ds). StatusWatcher is not"
" keeping up with the rate of updates; consider raising"
" ReportingIntervalSecs and agent_down_time in Neutron / Felix"
" config.",
hostname,
lag,
STALE_STATUS_WARN_SECS,
)


class AgentStatusWatcher(StatusWatcher):

Expand All @@ -125,6 +207,7 @@ def _on_status_set(self, response, hostname):
except (ValueError, TypeError):
LOG.warning("Bad JSON data for key %s: %s", response.key, response.value)
else:
self._check_for_stale_status(hostname, value)
mod_revision = response.mod_revision
if self._felix_live_rev.get(hostname) != mod_revision:
self.calico_driver.on_felix_alive(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2026 Tigera, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Targeted unit tests for StatusWatcher helpers that do not need the full plugin-etcd test
harness. See test_plugin_etcd.py for end-to-end watcher tests.
"""
from datetime import datetime, timedelta, timezone
import unittest

import mock

from networking_calico.plugins.ml2.drivers.calico import status


class TestCheckForStaleStatus(unittest.TestCase):
"""Exercise StatusWatcher._check_for_stale_status in isolation.

The real __init__ pulls in config and an EtcdWatcher; we skip it via __new__ and set
only the attributes the method reads.
"""

def setUp(self):
super(TestCheckForStaleStatus, self).setUp()
self.watcher = status.StatusWatcher.__new__(status.StatusWatcher)
# Match the production sentinel so the rate-limit check does not eat
# the first stale-update warning on hosts whose uptime is below
# STALE_STATUS_WARN_INTERVAL_SECS.
self.watcher._last_stale_warn = float("-inf")
self.watcher.processing_snapshot = False

def _fmt(self, dt):
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")

def test_fresh_update_does_not_warn(self):
fresh = self._fmt(datetime.now(tz=timezone.utc))
with mock.patch.object(status.LOG, "warning") as m_warn:
self.watcher._check_for_stale_status("host1", {"time": fresh})
m_warn.assert_not_called()
self.assertEqual(float("-inf"), self.watcher._last_stale_warn)

def test_stale_update_warns(self):
stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1))
with mock.patch.object(status.LOG, "warning") as m_warn:
self.watcher._check_for_stale_status("host1", {"time": stale})
m_warn.assert_called_once()
# First positional arg of the single call is the log format string.
self.assertIn("stale Felix status update", m_warn.call_args.args[0])
self.assertGreater(self.watcher._last_stale_warn, float("-inf"))

def test_stale_update_is_rate_limited(self):
stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1))
with mock.patch.object(status.LOG, "warning") as m_warn:
self.watcher._check_for_stale_status("host1", {"time": stale})
self.watcher._check_for_stale_status("host2", {"time": stale})
self.watcher._check_for_stale_status("host3", {"time": stale})
# Only one warning across several stale updates within the rate-limit window.
self.assertEqual(1, m_warn.call_count)

def test_snapshot_processing_skips_check(self):
self.watcher.processing_snapshot = True
stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1))
with mock.patch.object(status.LOG, "warning") as m_warn:
self.watcher._check_for_stale_status("host1", {"time": stale})
m_warn.assert_not_called()

def test_missing_time_field_is_silent(self):
with mock.patch.object(status.LOG, "warning") as m_warn:
self.watcher._check_for_stale_status(
"host1", {"uptime": 10, "first_update": False}
)
m_warn.assert_not_called()

def test_stale_naive_timestamp_warns(self):
"""A timezone-less timestamp (no trailing Z) should not crash."""
stale = (datetime.now(tz=timezone.utc) - timedelta(hours=1)).strftime(
"%Y-%m-%dT%H:%M:%S"
)
with mock.patch.object(status.LOG, "warning") as m_warn:
self.watcher._check_for_stale_status("host1", {"time": stale})
m_warn.assert_called_once()
self.assertIn("stale Felix status update", m_warn.call_args.args[0])

def test_unparseable_time_logs_separate_warning(self):
with mock.patch.object(status.LOG, "warning") as m_warn:
self.watcher._check_for_stale_status("host1", {"time": "not a date"})
m_warn.assert_called_once()
self.assertIn("Could not parse status time", m_warn.call_args.args[0])
# An unparseable time does not count as a stale-status warning for rate-limiting
# purposes.
self.assertEqual(float("-inf"), self.watcher._last_stale_warn)
Loading