From 5cffab3c12c989728158113aeee8cdf73aef501c Mon Sep 17 00:00:00 2001 From: Nell Jerram Date: Mon, 13 Apr 2026 16:45:26 +0100 Subject: [PATCH 1/6] networking-calico: defensive fixes for silent elector death and status backlog For CORE-12651 and CI-1892 Three fixes for the issues investigated under CI-1892. None of them changes the election wire protocol or the existing behaviour in the common case; they add extra checks that catch known failure modes. 1. Elector.healthy() self._master is a local Python flag that only gets cleared if the election greenlet exits normally via _attempt_step_down() or the refresh-loop finally clause. If the greenlet dies silently - e.g. due to an eventlet-level issue that drops the frame without unwinding Python exceptions - _master stays True indefinitely and the rest of the driver keeps running master-only work (periodic resync, compaction, StatusWatcher) on a node whose etcd lease has long since expired, causing split-brain. Elector now stamps self._last_refresh on each successful lease refresh, and healthy() returns True only if the greenlet is still alive AND the lease has been refreshed within the last TTL. mech_calico.py callsites that decide whether to do master-only work switch from master() to healthy(). 2. Elector.confirmed_master() Before each iteration of periodic_resync_thread, also re-read the election key from etcd and confirm the value matches our id_string. If etcd disagrees, clear _master locally and skip the resync cycle. This defends against any residual disagreement between our in-process flag and etcd's ground truth, at the cost of one extra etcd GET per resync tick (negligible vs. the resync itself). 3. StatusWatcher stale-update WARNING When an incoming Felix status update has a "time" field more than 5 mins in the past, log a rate-limited WARNING so operators see the backlog building up long before it grows to hours. Skipped during initial-snapshot replay to avoid false positives. Complements the ReportingIntervalSecs / agent_down_time config tuning that was recommended to the customer. Also promote the surviving LOG.debug calls in election.py to LOG.info so election state transitions and refresh activity are visible in production logs without toggling debug logging. Unit tests added for healthy(), confirmed_master() and the stale status check. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../plugins/ml2/drivers/calico/election.py | 117 ++++++++++++++-- .../plugins/ml2/drivers/calico/mech_calico.py | 22 +-- .../plugins/ml2/drivers/calico/status.py | 70 ++++++++++ .../plugins/ml2/drivers/calico/test/lib.py | 6 + .../ml2/drivers/calico/test/test_election.py | 131 ++++++++++++++++++ .../calico/test/test_monitor_thread.py | 20 ++- .../drivers/calico/test/test_plugin_etcd.py | 6 + .../ml2/drivers/calico/test/test_status.py | 91 ++++++++++++ 8 files changed, 445 insertions(+), 18 deletions(-) create mode 100644 networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py index 1fd573027b4..0c75f10c7a8 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py @@ -36,6 +36,7 @@ from networking_calico import etcdv3 from networking_calico.common import config as calico_config +from networking_calico.monotonic import monotonic_time LOG = log.getLogger(__name__) @@ -101,6 +102,13 @@ def __init__(self, server_id, election_key, old_key=None, interval=30, ttl=60): # Is this the master? To start with, no self._master = False + # Monotonic timestamp of the last successful lease refresh while + # master. Used by healthy() to detect a silently dead election + # greenlet - if _master stays True but the greenlet has stopped + # refreshing the lease, we are no longer actually the master even + # though self._master says we are. + self._last_refresh = 0.0 + # Keep the greenlet ID handy to ease UT. self._greenlet = eventlet.spawn(self._run) @@ -147,7 +155,7 @@ def _vote(self): value, mod_revision = etcdv3.get(self._key) mod_revision = int(mod_revision) except etcdv3.KeyNotFound: - LOG.debug("Try to become the master - key not found") + LOG.info("Try to become the master - key not found") self._become_master() assert False, "_become_master() should not return." except Etcd3Exception as e: @@ -155,7 +163,7 @@ def _vote(self): self._log_exception("read current master", e) return - LOG.debug("ID of elected master is : %s", value) + LOG.info("ID of elected master is : %s", value) if value: # If we happen to be on the same server, check if the master # process is still alive. @@ -186,7 +194,7 @@ def _vote(self): timeout=self._interval * 2, start_revision=mod_revision + 1, ) - LOG.debug("election event: %s", event) + LOG.info("election event: %s", event) action = event.get("type", "SET").lower() value = event["kv"].get("value") mod_revision = int(event["kv"].get("mod_revision", "0")) @@ -199,7 +207,7 @@ def _vote(self): # Something bad and unexpected. Log and reconnect. self._log_exception("wait for master change", e) return - LOG.debug("Election key action: %s; new value %s", action, value) + LOG.info("Election key action: %s; new value %s", action, value) if action in ETCD_DELETE_ACTIONS or value is None: # Deleted - try and become the master. LOG.info( @@ -223,12 +231,12 @@ def _check_master_process(self, master_id): return host = match.group("host") pid = int(match.group("pid")) - LOG.debug("Parsed key as host = %s, PID = %s", host, pid) + LOG.info("Parsed key as host = %s, PID = %s", host, pid) if host == self._server_id: # Check if the PID is still running. - LOG.debug("Previous master was on this server %s", host) + LOG.info("Previous master was on this server %s", host) if os.path.exists("/proc/%s" % pid): - LOG.debug("Master still running") + LOG.info("Master still running") else: LOG.warning( "Master was on this server but cannot find its " @@ -271,6 +279,10 @@ def _become_master(self): LOG.info("Race: someone else beat us to be master") raise RestartElection() + # We are now master; start the healthy() watchdog clock. This must + # be kept up to date by the lease-refresh loop below. + self._last_refresh = monotonic_time() + LOG.info( "Successfully become master - key %s, value %s", self._key, self.id_string ) @@ -281,7 +293,7 @@ def _become_master(self): try: while not self._stopped: try: - LOG.debug("Refreshing master role") + LOG.info("Refreshing master role") # Refresh the lease. ttl = ttl_lease.refresh() # Also rewrite the key, so that non-masters see an event on @@ -294,7 +306,10 @@ def _become_master(self): ): LOG.warning("Key changed or deleted; restart election") raise RestartElection() - LOG.debug("Refreshed master role, TTL now is %d", ttl) + LOG.info("Refreshed master role, TTL now is %d", ttl) + # Record that the refresh succeeded. healthy() uses this + # to detect if the refresh loop silently stops running. + self._last_refresh = monotonic_time() except RestartElection: raise except Exception as e: @@ -362,6 +377,90 @@ def master(self): """ return self._master and not self._stopped + def confirmed_master(self): + """Am I healthily the master AND does etcd still agree? + + Performs a healthy() check first (cheap, local). If that passes, + also re-reads the election key from etcd and confirms that its + value matches our id_string. Intended for callers that are about + to start expensive master-only work (e.g. a periodic resync) + and want an extra belt-and-braces check against an in-process + state disagreement with etcd. + + This involves a synchronous etcd GET, so do not call in a hot + loop - use healthy() for that. + + returns: True if we are confirmed master according to both our + own local state and etcd's current view. + """ + if not self.healthy(): + return False + try: + value, _mod_revision = etcdv3.get(self._key) + except etcdv3.KeyNotFound: + LOG.warning( + "Election key %s not present in etcd but _master is True; " + "treating as no longer master", + self._key, + ) + self._master = False + return False + except Etcd3Exception as e: + # Treat a transient etcd error as "don't know"; be conservative + # and skip master-only work this time. We will retry soon. + self._log_exception("confirm master", e) + return False + if value != self.id_string: + LOG.warning( + "Election key %s in etcd has value %r but we expected %r; " + "treating as no longer master", + self._key, + value, + self.id_string, + ) + self._master = False + return False + return True + + def healthy(self): + """Am I healthily the master? + + Stricter than master(). Returns True only if (a) _master is set, + (b) we have not been stopped, (c) the election greenlet is still + alive, and (d) the lease was refreshed within the last self._ttl + seconds. + + master() alone is unsafe because self._master is a local Python + flag that is set to True when we win the election and only cleared + if the greenlet exits normally via _attempt_step_down() or the + refresh loop's finally clause. If the greenlet dies silently - + e.g. due to an eventlet-level issue that drops the frame without + unwinding Python exceptions - _master stays True indefinitely. + healthy() catches that case by cross-checking the greenlet state + and the refresh timestamp. + + returns: True if this is the master and the election greenlet is + confirmed to still be working. + """ + if not self._master or self._stopped: + return False + if self._greenlet is None or self._greenlet.dead: + LOG.warning( + "Election greenlet is dead but _master is still True; " + "treating as no longer master" + ) + return False + since_refresh = monotonic_time() - self._last_refresh + if since_refresh > self._ttl: + LOG.warning( + "Election lease has not been refreshed for %.1fs (ttl %ds); " + "treating as no longer master", + since_refresh, + self._ttl, + ) + return False + return True + def stop(self): self._stopped = True if self._greenlet and not self._greenlet.dead: diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index b49c0bdc4be..e31768d80d3 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -493,8 +493,9 @@ def _status_updating_thread(self, expected_epoch): TrackTask("STATUS_UPDATING") LOG.info("Status updating thread started.") while self._epoch == expected_epoch: - # Only handle updates if we are the master node. - if self.elector.master(): + # Only handle updates if we are healthily the master node. See + # Elector.healthy() for why we use healthy() rather than master(). + if self.elector.healthy(): if self._etcd_watcher is None: LOG.info("Became the master, starting StatusWatcher") self._etcd_watcher = StatusWatcher(self) @@ -1232,8 +1233,8 @@ def resync_monitor_thread(self, launch_epoch): LOG.info("Resync monitor thread started") while self._epoch == launch_epoch: - # Only monitor the resync if we are the master node. - if self.elector.master(): + # Only monitor the resync if we are healthily the master node. + if self.elector.healthy(): LOG.info("I am master: monitoring periodic resync") curr_time = datetime.now() @@ -1276,8 +1277,13 @@ def periodic_resync_thread(self, launch_epoch): try: LOG.info("Periodic resync thread started") while self._epoch == launch_epoch: - # Only do the resync if we are the master node. - if self.elector.master(): + # Only do the resync if we are healthily the master node AND + # etcd still agrees that we are the master. The extra etcd + # read (vs. plain healthy()) defends against the in-process + # flag disagreeing with etcd's ground truth; resync is much + # more expensive than a single etcd GET, so the extra check + # is cheap by comparison. + if self.elector.confirmed_master(): LOG.info("I am master: doing periodic resync") start_time = datetime.now() @@ -1343,8 +1349,8 @@ def periodic_compaction_thread(self, launch_epoch): try: LOG.info("Periodic compaction thread started") while self._epoch == launch_epoch: - # Only do the compaction if we are the master node. - if self.elector.master(): + # Only do the compaction if we are healthily the master node. + if self.elector.healthy(): LOG.info("I am master: doing periodic compaction") try: diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py index bdcd88a4c6e..0adf755886d 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py @@ -19,6 +19,7 @@ # Etcd-based transport for the Calico/OpenStack Plugin. import collections +from datetime import datetime, timezone import json from oslo_log import log @@ -26,6 +27,18 @@ from networking_calico import datamodel_v2 from networking_calico import etcdutils from networking_calico.common import config as calico_config +from networking_calico.monotonic import monotonic_time + + +# If a Felix status update we receive from etcd has a "time" field more than +# this many seconds in the past, we are running behind and should warn the +# operator. Felix writes status updates every 30s by default, so anything +# materially above that indicates a processing backlog. +STALE_STATUS_WARN_SECS = 300 + +# Rate-limit stale-status warnings to at most one per this many seconds, to +# avoid flooding the log when every update in a large batch is stale. +STALE_STATUS_WARN_INTERVAL_SECS = 300 LOG = log.getLogger(__name__) @@ -79,6 +92,11 @@ def __init__(self, calico_driver): # deduplicate before passing on to the Neutron DB. self._felix_live_rev = {} + # Monotonic time of the last stale-status WARNING we logged. Used to + # rate-limit the warning so we do not flood the log when the whole + # cluster is backlogged. + self._last_stale_warn = 0.0 + # Register for felix uptime updates. self.register_path( status_path + "//status", @@ -127,6 +145,7 @@ def _on_status_set(self, response, hostname): except (ValueError, TypeError): LOG.warning("Bad JSON data for key %s: %s", response.key, response.value) else: + self._check_for_stale_status(hostname, value) mod_revision = response.mod_revision if self._felix_live_rev.get(hostname) != mod_revision: self.calico_driver.on_felix_alive( @@ -135,6 +154,57 @@ def _on_status_set(self, response, hostname): ) self._felix_live_rev[hostname] = mod_revision + def _check_for_stale_status(self, hostname, value): + """Warn the operator if we are processing materially stale updates. + + If the "time" field inside the status value is significantly older + than wall-clock now, this StatusWatcher is processing events slower + than Felix is producing them, and a backlog is building up. Left + unaddressed this causes neutron to see agent up/down transitions + hours after they actually happened. Warn the operator so they can + tune ReportingIntervalSecs / agent_down_time or investigate why + processing is slow. + + Rate-limited to one warning per STALE_STATUS_WARN_INTERVAL_SECS. + """ + if self.processing_snapshot: + # During an initial-snapshot replay the "time" values will + # legitimately look old: Felix wrote them some time ago and + # we're only now reading the subtree. That is not evidence of + # a processing backlog - skip the check in this case. + return + status_time_str = value.get("time") + if not status_time_str: + return + try: + # Felix writes the time in RFC3339 with a trailing "Z"; convert + # to a +00:00 offset for datetime.fromisoformat (which has only + # accepted the bare "Z" suffix since Python 3.11). + status_time = datetime.fromisoformat(status_time_str.replace("Z", "+00:00")) + except ValueError: + LOG.warning( + "Could not parse status time %r for host %s", + status_time_str, + hostname, + ) + return + lag = (datetime.now(tz=timezone.utc) - status_time).total_seconds() + if lag <= STALE_STATUS_WARN_SECS: + return + now_mono = monotonic_time() + if now_mono - self._last_stale_warn < STALE_STATUS_WARN_INTERVAL_SECS: + return + self._last_stale_warn = now_mono + LOG.warning( + "Processing stale Felix status update for host %s: the update was" + " written %.0fs ago (threshold %ds). StatusWatcher is not keeping" + " up with the rate of updates; consider raising ReportingIntervalSecs" + " and agent_down_time in Neutron / Felix config.", + hostname, + lag, + STALE_STATUS_WARN_SECS, + ) + def _on_status_del(self, response, hostname): """Called when Felix's status key expires. Implies felix is dead.""" # Notes: diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py index 898b050b7b1..a0606676584 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py @@ -213,6 +213,12 @@ def __init__(self, *args, **kwargs): def master(self): return True + def healthy(self): + return True + + def confirmed_master(self): + return True + def stop(self): pass diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py index 34762121575..a228bb01faf 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py @@ -267,3 +267,134 @@ def test_check_master_process_still_alive(self, m_exists): self.assertEqual([], client.transaction.mock_calls) client.failure = None self._wait_and_stop(client, elector) + + +class TestHealthyAndConfirmedMaster(unittest.TestCase): + """Tests for Elector.healthy() and Elector.confirmed_master(). + + These don't exercise _run / _vote / _become_master; they construct an + Elector, immediately kill its greenlet, and then drive the new methods + directly by manipulating the internal state the methods read. + """ + + def setUp(self): + super(TestHealthyAndConfirmedMaster, self).setUp() + # Prevent sys.exit() in _run from interfering if the greenlet we + # spawn hits an exception before we kill it. + self.sys_exit_p = mock.patch("sys.exit") + self.sys_exit_p.start() + etcdv3._client = mock.Mock() + + def tearDown(self): + self.sys_exit_p.stop() + etcdv3._client = None + super(TestHealthyAndConfirmedMaster, self).tearDown() + + def _make_elector(self, ttl=15): + elector = election.Elector("server-id", "/bloop", interval=5, ttl=ttl) + # Kill the spawned greenlet so we can drive state manually without + # racing against _run. Don't call wait() — it would re-raise the + # greenlet's GreenletExit into the test, which is not what we're + # testing. + elector._greenlet.kill() + return elector + + @staticmethod + def _alive_greenlet_stub(): + """Return a stub greenlet object that looks alive to healthy().""" + stub = mock.Mock() + stub.dead = False + return stub + + def test_healthy_false_when_not_master(self): + elector = self._make_elector() + self.assertFalse(elector.master()) + self.assertFalse(elector.healthy()) + + def test_healthy_false_when_stopped(self): + elector = self._make_elector() + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + elector._stopped = True + self.assertFalse(elector.healthy()) + + def test_healthy_false_when_greenlet_dead(self): + elector = self._make_elector() + elector._master = True + elector._last_refresh = election.monotonic_time() + # _make_elector has already killed the greenlet; healthy() should + # notice that it is dead and return False despite _master being set. + self.assertTrue(elector._greenlet.dead) + self.assertFalse(elector.healthy()) + + def test_healthy_false_when_lease_stale(self): + elector = self._make_elector(ttl=15) + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + # Pretend the last refresh was much longer ago than the ttl. + elector._last_refresh = election.monotonic_time() - 100 + self.assertFalse(elector.healthy()) + + def test_healthy_true_when_master_and_fresh(self): + elector = self._make_elector(ttl=15) + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + self.assertTrue(elector.healthy()) + + def test_confirmed_master_false_when_unhealthy(self): + elector = self._make_elector() + self.assertFalse(elector.confirmed_master()) + + def test_confirmed_master_true_when_etcd_agrees(self): + elector = self._make_elector() + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + expected = elector.id_string + with mock.patch( + "networking_calico.etcdv3.get", + return_value=(expected, 123), + ): + self.assertTrue(elector.confirmed_master()) + + def test_confirmed_master_clears_flag_when_etcd_has_different_value(self): + elector = self._make_elector() + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + with mock.patch( + "networking_calico.etcdv3.get", + return_value=("someone-else:99", 123), + ): + self.assertFalse(elector.confirmed_master()) + # The mismatch should have cleared our local _master flag so future + # healthy() calls return False without needing another etcd read. + self.assertFalse(elector._master) + + def test_confirmed_master_clears_flag_when_etcd_has_no_key(self): + elector = self._make_elector() + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + with mock.patch( + "networking_calico.etcdv3.get", + side_effect=etcdv3.KeyNotFound(), + ): + self.assertFalse(elector.confirmed_master()) + self.assertFalse(elector._master) + + def test_confirmed_master_false_on_etcd_error_but_keeps_flag(self): + elector = self._make_elector() + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + with mock.patch( + "networking_calico.etcdv3.get", + side_effect=e3e.ConnectionFailedError(), + ): + # Transient etcd error: skip this cycle but don't forcibly + # demote ourselves - the next healthy() check will decide. + self.assertFalse(elector.confirmed_master()) + self.assertTrue(elector._master) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_monitor_thread.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_monitor_thread.py index 0ff731f0194..cce28c92a88 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_monitor_thread.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_monitor_thread.py @@ -66,6 +66,8 @@ def increment_epoch(actual_sleep_time): def test_monitor_does_nothing_when_not_master(self): """Test that a driver that is not master does not monitor.""" self.driver.elector.master.return_value = False + self.driver.elector.healthy.return_value = False + self.driver.elector.confirmed_master.return_value = False self.mock_sleep.side_effect = self.simulate_epoch_progression() self.driver.resync_monitor_thread(INITIAL_EPOCH) @@ -77,6 +79,8 @@ def test_monitor_logs_error_when_over_max(self): """Test that an error is logged when interval surpasses maximum.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True fake_resync_time = datetime.now() - timedelta(seconds=TEST_MAX_INTERVAL + 1) self.driver.last_resync_time = fake_resync_time self.mock_sleep.side_effect = self.simulate_epoch_progression() @@ -93,6 +97,8 @@ def test_monitor_no_error_if_interval_under_max(self): """If interval is below max, no error should be logged.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True self.mock_sleep.side_effect = self.simulate_epoch_progression() self.driver.resync_monitor_thread(INITIAL_EPOCH) @@ -102,9 +108,13 @@ def test_monitor_no_error_if_interval_under_max(self): def test_monitor_exception_stops_elector(self): """On unexpected exception, elector.stop() must be called.""" self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True with mock.patch.object(self.driver, "elector") as mock_elector: - mock_elector.master.side_effect = Exception("Test exception") + # resync_monitor_thread calls elector.healthy(); make it raise + # to exercise the exception-handling path. + mock_elector.healthy.side_effect = Exception("Test exception") with self.assertRaises(Exception): self.driver.resync_monitor_thread(INITIAL_EPOCH) @@ -115,6 +125,8 @@ def test_resync_resets_time(self): """Test that resync resets current interval duration to below max.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True fake_resync_time = datetime.now() - timedelta(seconds=TEST_MAX_INTERVAL + 1) self.driver.last_resync_time = fake_resync_time @@ -140,6 +152,8 @@ def test_errors_continue_to_log(self): """Test that errors continue logging if resync does not occur.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True fake_resync_time = datetime.now() - timedelta(seconds=TEST_MAX_INTERVAL + 1) self.driver.last_resync_time = fake_resync_time @@ -166,6 +180,8 @@ def test_sleep_time_logic_before_deadline(self, mock_datetime): """Test that we sleep until deadline if there is time left.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True curr_time = datetime.now() self.driver.last_resync_time = curr_time @@ -181,6 +197,8 @@ def test_sleep_time_logic_after_deadline(self): """Test that we poll if the deadline has passed.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True fake_resync_time = datetime.now() - timedelta(seconds=TEST_MAX_INTERVAL + 1) self.driver.last_resync_time = fake_resync_time diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py index 2635c381fcc..7e7df715328 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py @@ -1579,6 +1579,8 @@ def test_not_master_does_not_resync(self): with mock.patch.object(self.driver, "elector") as m_elector: m_elector.master.return_value = False + m_elector.healthy.return_value = False + m_elector.confirmed_master.return_value = False # Allow the etcd transport's resync thread to run. Nothing will # happen. @@ -2266,7 +2268,10 @@ def test_status_thread_mainline(self, m_StatusWatcher): count = [0] with mock.patch.object(self.driver, "elector") as m_elector: + # _status_updating_thread checks elector.healthy() rather than + # master() - configure both for safety. m_elector.master.return_value = True + m_elector.healthy.return_value = True def maybe_end_loop(*args, **kwargs): if count[0] == 2: @@ -2275,6 +2280,7 @@ def maybe_end_loop(*args, **kwargs): if count[0] == 4: # After a few loops, stop being the master... m_elector.master.return_value = False + m_elector.healthy.return_value = False if count[0] > 6: # Then terminate the loop after a few more... self.driver._epoch += 1 diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py new file mode 100644 index 00000000000..d0a0a02555b --- /dev/null +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2026 Tigera, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Targeted unit tests for StatusWatcher helpers that do not need the full +plugin-etcd test harness. See test_plugin_etcd.py for end-to-end watcher +tests. +""" +from datetime import datetime, timedelta, timezone +import unittest + +import mock + +from networking_calico.plugins.ml2.drivers.calico import status + + +class TestCheckForStaleStatus(unittest.TestCase): + """Exercise StatusWatcher._check_for_stale_status in isolation. + + The real __init__ pulls in config and an EtcdWatcher; we skip it via + __new__ and set only the attributes the method reads. + """ + + def setUp(self): + super(TestCheckForStaleStatus, self).setUp() + self.watcher = status.StatusWatcher.__new__(status.StatusWatcher) + self.watcher._last_stale_warn = 0.0 + self.watcher.processing_snapshot = False + + def _fmt(self, dt): + return dt.strftime("%Y-%m-%dT%H:%M:%SZ") + + def test_fresh_update_does_not_warn(self): + fresh = self._fmt(datetime.now(tz=timezone.utc)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": fresh}) + m_warn.assert_not_called() + self.assertEqual(0.0, self.watcher._last_stale_warn) + + def test_stale_update_warns(self): + stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + m_warn.assert_called_once() + # First positional arg of the single call is the log format string. + self.assertIn("stale Felix status update", m_warn.call_args.args[0]) + self.assertGreater(self.watcher._last_stale_warn, 0.0) + + def test_stale_update_is_rate_limited(self): + stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + self.watcher._check_for_stale_status("host2", {"time": stale}) + self.watcher._check_for_stale_status("host3", {"time": stale}) + # Only one warning across several stale updates within the + # rate-limit window. + self.assertEqual(1, m_warn.call_count) + + def test_snapshot_processing_skips_check(self): + self.watcher.processing_snapshot = True + stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + m_warn.assert_not_called() + + def test_missing_time_field_is_silent(self): + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status( + "host1", {"uptime": 10, "first_update": False} + ) + m_warn.assert_not_called() + + def test_unparseable_time_logs_separate_warning(self): + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": "not a date"}) + m_warn.assert_called_once() + self.assertIn("Could not parse status time", m_warn.call_args.args[0]) + # An unparseable time does not count as a stale-status warning for + # rate-limiting purposes. + self.assertEqual(0.0, self.watcher._last_stale_warn) From 33cd9c1793757cb9f7e64d44f97e6793af0cbe35 Mon Sep 17 00:00:00 2001 From: Nell Jerram Date: Tue, 14 Apr 2026 11:27:26 +0100 Subject: [PATCH 2/6] networking-calico: add mutual watchdog for long-running driver greenlets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The silent-greenlet-death issue (CI-1892) can theoretically affect any of the driver's long-running greenlets, not just the elector. If periodic_resync_thread dies silently, Neutron DB / etcd drift goes undetected; if _status_updating_thread dies, Felix agent status stops reaching Neutron; and so on. Track all spawned worker greenlets in self._greenlets as (name, greenlet) pairs. Add _check_greenlets_alive(), which iterates the list and calls sys.exit(1) if any greenlet has .dead == True — the same recovery strategy the elector already uses in its BaseException handler. The process manager (systemd) restarts neutron-server, which is the safest recovery from an unknown-state failure. The check is called from _status_updating_thread (every 5s) and resync_monitor_thread (every resync_max_interval_secs / 5), giving mutual watchdogging: either can detect the other's death, and both can detect death of the remaining greenlets (periodic_resync, periodic_compaction, port_status workers). A greenlet checking itself always sees .dead == False, so no self-check special-casing is needed. The elector greenlet is not included — it has its own healthy() watchdog mechanism. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../plugins/ml2/drivers/calico/mech_calico.py | 63 +++++++++++++++++-- 1 file changed, 57 insertions(+), 6 deletions(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index e31768d80d3..cae7e0ff35a 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -28,6 +28,7 @@ from datetime import datetime, timedelta import os import re +import sys import threading import uuid from functools import wraps @@ -341,6 +342,11 @@ def __init__(self): # Last resync completion time self.last_resync_time = datetime.now() + # List of (name, greenlet) for the long-running worker greenlets + # spawned by _post_fork_init. Used by _check_greenlets_alive() to + # detect silent greenlet death. + self._greenlets = [] + # Tell the monkeypatch where we are. global mech_driver assert mech_driver is None @@ -462,13 +468,33 @@ def _post_fork_init(self, voting=False): # We deliberately do this last, to ensure that all of the setup # above is complete before we start running. self._epoch += 1 - eventlet.spawn(self.resync_monitor_thread, self._epoch) - eventlet.spawn(self.periodic_resync_thread, self._epoch) + self._greenlets = [] + self._greenlets.append(( + "resync_monitor", + eventlet.spawn(self.resync_monitor_thread, self._epoch), + )) + self._greenlets.append(( + "periodic_resync", + eventlet.spawn(self.periodic_resync_thread, self._epoch), + )) if cfg.CONF.calico.etcd_compaction_period_mins > 0: - eventlet.spawn(self.periodic_compaction_thread, self._epoch) - eventlet.spawn(self._status_updating_thread, self._epoch) - for _ in range(cfg.CONF.calico.num_port_status_threads): - eventlet.spawn(self._loop_writing_port_statuses, self._epoch) + self._greenlets.append(( + "periodic_compaction", + eventlet.spawn( + self.periodic_compaction_thread, self._epoch + ), + )) + self._greenlets.append(( + "status_updating", + eventlet.spawn(self._status_updating_thread, self._epoch), + )) + for i in range(cfg.CONF.calico.num_port_status_threads): + self._greenlets.append(( + "port_status_%d" % i, + eventlet.spawn( + self._loop_writing_port_statuses, self._epoch + ), + )) else: LOG.info( "PID %s: Not a voting participant; " @@ -482,6 +508,29 @@ def _post_fork_init(self, voting=False): "Calico mechanism driver initialisation done in process %s", current_pid ) + def _check_greenlets_alive(self): + """Detect if any long-running worker greenlet has silently died. + + Under eventlet, a greenlet can occasionally die without unwinding + its Python frames (e.g. due to a hub-level error), leaving no + traceback in the log and no state cleanup. This method provides + mutual watchdogging: each of the driver's long-running loops + calls it periodically to verify that the others are still alive. + + If a dead greenlet is found, we log an error and exit. The + process manager (systemd) will restart neutron-server, which is + the safest recovery — the same approach the elector already uses + for its own unhandled-exception path. See CI-1892. + """ + for name, gt in self._greenlets: + if gt.dead: + LOG.error( + "Worker greenlet %r has unexpectedly died; exiting so " + "that the process manager can restart neutron-server.", + name, + ) + sys.exit(1) + @logging_exceptions(LOG) def _status_updating_thread(self, expected_epoch): """_status_updating_thread @@ -493,6 +542,7 @@ def _status_updating_thread(self, expected_epoch): TrackTask("STATUS_UPDATING") LOG.info("Status updating thread started.") while self._epoch == expected_epoch: + self._check_greenlets_alive() # Only handle updates if we are healthily the master node. See # Elector.healthy() for why we use healthy() rather than master(). if self.elector.healthy(): @@ -1233,6 +1283,7 @@ def resync_monitor_thread(self, launch_epoch): LOG.info("Resync monitor thread started") while self._epoch == launch_epoch: + self._check_greenlets_alive() # Only monitor the resync if we are healthily the master node. if self.elector.healthy(): LOG.info("I am master: monitoring periodic resync") From 56bf5253c24bacb645442c8498642fd3aa01edf5 Mon Sep 17 00:00:00 2001 From: Nell Jerram Date: Tue, 14 Apr 2026 11:58:09 +0100 Subject: [PATCH 3/6] networking-calico: remove CI-1892 reference from code comment CI-1892 is a private Jira ticket; non-Tigera readers cannot access it. The code comment is self-contained without the reference. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../networking_calico/plugins/ml2/drivers/calico/mech_calico.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index cae7e0ff35a..441e1d9a730 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -520,7 +520,7 @@ def _check_greenlets_alive(self): If a dead greenlet is found, we log an error and exit. The process manager (systemd) will restart neutron-server, which is the safest recovery — the same approach the elector already uses - for its own unhandled-exception path. See CI-1892. + for its own unhandled-exception path. """ for name, gt in self._greenlets: if gt.dead: From f34c2b3026bcccc0ed5229d789c783c4a11f99d6 Mon Sep 17 00:00:00 2001 From: Nell Jerram Date: Tue, 14 Apr 2026 12:04:15 +0100 Subject: [PATCH 4/6] networking-calico: handle naive (timezone-less) timestamps in stale status check datetime.fromisoformat() returns a naive datetime when the input has no timezone suffix (e.g. "2015-08-14T10:37:54" with no trailing "Z"). Subtracting a naive datetime from datetime.now(tz=UTC) raises TypeError. Treat naive timestamps as UTC so the lag computation succeeds. Add a test for this case. Addresses Copilot review feedback on PR #12456. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../plugins/ml2/drivers/calico/status.py | 4 ++++ .../plugins/ml2/drivers/calico/test/test_status.py | 10 ++++++++++ 2 files changed, 14 insertions(+) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py index 0adf755886d..10c3fe64167 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py @@ -188,6 +188,10 @@ def _check_for_stale_status(self, hostname, value): hostname, ) return + if status_time.tzinfo is None: + # Treat naive timestamps (no timezone info) as UTC so that + # the subtraction below does not raise TypeError. + status_time = status_time.replace(tzinfo=timezone.utc) lag = (datetime.now(tz=timezone.utc) - status_time).total_seconds() if lag <= STALE_STATUS_WARN_SECS: return diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py index d0a0a02555b..756b6544557 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py @@ -81,6 +81,16 @@ def test_missing_time_field_is_silent(self): ) m_warn.assert_not_called() + def test_stale_naive_timestamp_warns(self): + """A timezone-less timestamp (no trailing Z) should not crash.""" + stale = (datetime.now(tz=timezone.utc) - timedelta(hours=1)).strftime( + "%Y-%m-%dT%H:%M:%S" + ) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + m_warn.assert_called_once() + self.assertIn("stale Felix status update", m_warn.call_args.args[0]) + def test_unparseable_time_logs_separate_warning(self): with mock.patch.object(status.LOG, "warning") as m_warn: self.watcher._check_for_stale_status("host1", {"time": "not a date"}) From ac8e9a38ac258cbfd49bb88b7e82145f8a7e6022 Mon Sep 17 00:00:00 2001 From: Nell Jerram Date: Tue, 14 Apr 2026 12:10:00 +0100 Subject: [PATCH 5/6] make -C networking-calico fmtpy --- .../plugins/ml2/drivers/calico/mech_calico.py | 58 +++++++++++-------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index 441e1d9a730..19b9ea1ed5d 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -469,32 +469,42 @@ def _post_fork_init(self, voting=False): # above is complete before we start running. self._epoch += 1 self._greenlets = [] - self._greenlets.append(( - "resync_monitor", - eventlet.spawn(self.resync_monitor_thread, self._epoch), - )) - self._greenlets.append(( - "periodic_resync", - eventlet.spawn(self.periodic_resync_thread, self._epoch), - )) + self._greenlets.append( + ( + "resync_monitor", + eventlet.spawn(self.resync_monitor_thread, self._epoch), + ) + ) + self._greenlets.append( + ( + "periodic_resync", + eventlet.spawn(self.periodic_resync_thread, self._epoch), + ) + ) if cfg.CONF.calico.etcd_compaction_period_mins > 0: - self._greenlets.append(( - "periodic_compaction", - eventlet.spawn( - self.periodic_compaction_thread, self._epoch - ), - )) - self._greenlets.append(( - "status_updating", - eventlet.spawn(self._status_updating_thread, self._epoch), - )) + self._greenlets.append( + ( + "periodic_compaction", + eventlet.spawn( + self.periodic_compaction_thread, self._epoch + ), + ) + ) + self._greenlets.append( + ( + "status_updating", + eventlet.spawn(self._status_updating_thread, self._epoch), + ) + ) for i in range(cfg.CONF.calico.num_port_status_threads): - self._greenlets.append(( - "port_status_%d" % i, - eventlet.spawn( - self._loop_writing_port_statuses, self._epoch - ), - )) + self._greenlets.append( + ( + "port_status_%d" % i, + eventlet.spawn( + self._loop_writing_port_statuses, self._epoch + ), + ) + ) else: LOG.info( "PID %s: Not a voting participant; " From b4a1a4ed0d2bd20f9356167d7a5cc2bf677be773 Mon Sep 17 00:00:00 2001 From: Nell Jerram Date: Tue, 14 Apr 2026 12:32:08 +0100 Subject: [PATCH 6/6] networking-calico: don't watchdog periodic_resync when resync_interval_secs == 0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When resync_interval_secs is 0, periodic_resync_thread does a single resync cycle and then exits intentionally. The greenlet becomes .dead, which the mutual watchdog treats as an unexpected death and calls sys.exit(1) — a false positive that was triggered immediately in our DevStack CI (which uses resync_interval_secs = 0 by default). Only register the periodic_resync greenlet in the watchdog list when resync_interval_secs > 0, i.e. when the thread is expected to keep running indefinitely. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../plugins/ml2/drivers/calico/mech_calico.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index 19b9ea1ed5d..f264f0f06d0 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -475,12 +475,13 @@ def _post_fork_init(self, voting=False): eventlet.spawn(self.resync_monitor_thread, self._epoch), ) ) - self._greenlets.append( - ( - "periodic_resync", - eventlet.spawn(self.periodic_resync_thread, self._epoch), - ) - ) + resync_gt = eventlet.spawn(self.periodic_resync_thread, self._epoch) + if cfg.CONF.calico.resync_interval_secs > 0: + # Only watchdog the resync thread if it is expected to + # keep running. When resync_interval_secs == 0 the + # thread does a single resync and then exits + # intentionally. + self._greenlets.append(("periodic_resync", resync_gt)) if cfg.CONF.calico.etcd_compaction_period_mins > 0: self._greenlets.append( (