diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py index 1fd573027b4..0c75f10c7a8 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py @@ -36,6 +36,7 @@ from networking_calico import etcdv3 from networking_calico.common import config as calico_config +from networking_calico.monotonic import monotonic_time LOG = log.getLogger(__name__) @@ -101,6 +102,13 @@ def __init__(self, server_id, election_key, old_key=None, interval=30, ttl=60): # Is this the master? To start with, no self._master = False + # Monotonic timestamp of the last successful lease refresh while + # master. Used by healthy() to detect a silently dead election + # greenlet - if _master stays True but the greenlet has stopped + # refreshing the lease, we are no longer actually the master even + # though self._master says we are. + self._last_refresh = 0.0 + # Keep the greenlet ID handy to ease UT. self._greenlet = eventlet.spawn(self._run) @@ -147,7 +155,7 @@ def _vote(self): value, mod_revision = etcdv3.get(self._key) mod_revision = int(mod_revision) except etcdv3.KeyNotFound: - LOG.debug("Try to become the master - key not found") + LOG.info("Try to become the master - key not found") self._become_master() assert False, "_become_master() should not return." except Etcd3Exception as e: @@ -155,7 +163,7 @@ def _vote(self): self._log_exception("read current master", e) return - LOG.debug("ID of elected master is : %s", value) + LOG.info("ID of elected master is : %s", value) if value: # If we happen to be on the same server, check if the master # process is still alive. @@ -186,7 +194,7 @@ def _vote(self): timeout=self._interval * 2, start_revision=mod_revision + 1, ) - LOG.debug("election event: %s", event) + LOG.info("election event: %s", event) action = event.get("type", "SET").lower() value = event["kv"].get("value") mod_revision = int(event["kv"].get("mod_revision", "0")) @@ -199,7 +207,7 @@ def _vote(self): # Something bad and unexpected. Log and reconnect. self._log_exception("wait for master change", e) return - LOG.debug("Election key action: %s; new value %s", action, value) + LOG.info("Election key action: %s; new value %s", action, value) if action in ETCD_DELETE_ACTIONS or value is None: # Deleted - try and become the master. LOG.info( @@ -223,12 +231,12 @@ def _check_master_process(self, master_id): return host = match.group("host") pid = int(match.group("pid")) - LOG.debug("Parsed key as host = %s, PID = %s", host, pid) + LOG.info("Parsed key as host = %s, PID = %s", host, pid) if host == self._server_id: # Check if the PID is still running. - LOG.debug("Previous master was on this server %s", host) + LOG.info("Previous master was on this server %s", host) if os.path.exists("/proc/%s" % pid): - LOG.debug("Master still running") + LOG.info("Master still running") else: LOG.warning( "Master was on this server but cannot find its " @@ -271,6 +279,10 @@ def _become_master(self): LOG.info("Race: someone else beat us to be master") raise RestartElection() + # We are now master; start the healthy() watchdog clock. This must + # be kept up to date by the lease-refresh loop below. + self._last_refresh = monotonic_time() + LOG.info( "Successfully become master - key %s, value %s", self._key, self.id_string ) @@ -281,7 +293,7 @@ def _become_master(self): try: while not self._stopped: try: - LOG.debug("Refreshing master role") + LOG.info("Refreshing master role") # Refresh the lease. ttl = ttl_lease.refresh() # Also rewrite the key, so that non-masters see an event on @@ -294,7 +306,10 @@ def _become_master(self): ): LOG.warning("Key changed or deleted; restart election") raise RestartElection() - LOG.debug("Refreshed master role, TTL now is %d", ttl) + LOG.info("Refreshed master role, TTL now is %d", ttl) + # Record that the refresh succeeded. healthy() uses this + # to detect if the refresh loop silently stops running. + self._last_refresh = monotonic_time() except RestartElection: raise except Exception as e: @@ -362,6 +377,90 @@ def master(self): """ return self._master and not self._stopped + def confirmed_master(self): + """Am I healthily the master AND does etcd still agree? + + Performs a healthy() check first (cheap, local). If that passes, + also re-reads the election key from etcd and confirms that its + value matches our id_string. Intended for callers that are about + to start expensive master-only work (e.g. a periodic resync) + and want an extra belt-and-braces check against an in-process + state disagreement with etcd. + + This involves a synchronous etcd GET, so do not call in a hot + loop - use healthy() for that. + + returns: True if we are confirmed master according to both our + own local state and etcd's current view. + """ + if not self.healthy(): + return False + try: + value, _mod_revision = etcdv3.get(self._key) + except etcdv3.KeyNotFound: + LOG.warning( + "Election key %s not present in etcd but _master is True; " + "treating as no longer master", + self._key, + ) + self._master = False + return False + except Etcd3Exception as e: + # Treat a transient etcd error as "don't know"; be conservative + # and skip master-only work this time. We will retry soon. + self._log_exception("confirm master", e) + return False + if value != self.id_string: + LOG.warning( + "Election key %s in etcd has value %r but we expected %r; " + "treating as no longer master", + self._key, + value, + self.id_string, + ) + self._master = False + return False + return True + + def healthy(self): + """Am I healthily the master? + + Stricter than master(). Returns True only if (a) _master is set, + (b) we have not been stopped, (c) the election greenlet is still + alive, and (d) the lease was refreshed within the last self._ttl + seconds. + + master() alone is unsafe because self._master is a local Python + flag that is set to True when we win the election and only cleared + if the greenlet exits normally via _attempt_step_down() or the + refresh loop's finally clause. If the greenlet dies silently - + e.g. due to an eventlet-level issue that drops the frame without + unwinding Python exceptions - _master stays True indefinitely. + healthy() catches that case by cross-checking the greenlet state + and the refresh timestamp. + + returns: True if this is the master and the election greenlet is + confirmed to still be working. + """ + if not self._master or self._stopped: + return False + if self._greenlet is None or self._greenlet.dead: + LOG.warning( + "Election greenlet is dead but _master is still True; " + "treating as no longer master" + ) + return False + since_refresh = monotonic_time() - self._last_refresh + if since_refresh > self._ttl: + LOG.warning( + "Election lease has not been refreshed for %.1fs (ttl %ds); " + "treating as no longer master", + since_refresh, + self._ttl, + ) + return False + return True + def stop(self): self._stopped = True if self._greenlet and not self._greenlet.dead: diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index b49c0bdc4be..f264f0f06d0 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -28,6 +28,7 @@ from datetime import datetime, timedelta import os import re +import sys import threading import uuid from functools import wraps @@ -341,6 +342,11 @@ def __init__(self): # Last resync completion time self.last_resync_time = datetime.now() + # List of (name, greenlet) for the long-running worker greenlets + # spawned by _post_fork_init. Used by _check_greenlets_alive() to + # detect silent greenlet death. + self._greenlets = [] + # Tell the monkeypatch where we are. global mech_driver assert mech_driver is None @@ -462,13 +468,44 @@ def _post_fork_init(self, voting=False): # We deliberately do this last, to ensure that all of the setup # above is complete before we start running. self._epoch += 1 - eventlet.spawn(self.resync_monitor_thread, self._epoch) - eventlet.spawn(self.periodic_resync_thread, self._epoch) + self._greenlets = [] + self._greenlets.append( + ( + "resync_monitor", + eventlet.spawn(self.resync_monitor_thread, self._epoch), + ) + ) + resync_gt = eventlet.spawn(self.periodic_resync_thread, self._epoch) + if cfg.CONF.calico.resync_interval_secs > 0: + # Only watchdog the resync thread if it is expected to + # keep running. When resync_interval_secs == 0 the + # thread does a single resync and then exits + # intentionally. + self._greenlets.append(("periodic_resync", resync_gt)) if cfg.CONF.calico.etcd_compaction_period_mins > 0: - eventlet.spawn(self.periodic_compaction_thread, self._epoch) - eventlet.spawn(self._status_updating_thread, self._epoch) - for _ in range(cfg.CONF.calico.num_port_status_threads): - eventlet.spawn(self._loop_writing_port_statuses, self._epoch) + self._greenlets.append( + ( + "periodic_compaction", + eventlet.spawn( + self.periodic_compaction_thread, self._epoch + ), + ) + ) + self._greenlets.append( + ( + "status_updating", + eventlet.spawn(self._status_updating_thread, self._epoch), + ) + ) + for i in range(cfg.CONF.calico.num_port_status_threads): + self._greenlets.append( + ( + "port_status_%d" % i, + eventlet.spawn( + self._loop_writing_port_statuses, self._epoch + ), + ) + ) else: LOG.info( "PID %s: Not a voting participant; " @@ -482,6 +519,29 @@ def _post_fork_init(self, voting=False): "Calico mechanism driver initialisation done in process %s", current_pid ) + def _check_greenlets_alive(self): + """Detect if any long-running worker greenlet has silently died. + + Under eventlet, a greenlet can occasionally die without unwinding + its Python frames (e.g. due to a hub-level error), leaving no + traceback in the log and no state cleanup. This method provides + mutual watchdogging: each of the driver's long-running loops + calls it periodically to verify that the others are still alive. + + If a dead greenlet is found, we log an error and exit. The + process manager (systemd) will restart neutron-server, which is + the safest recovery — the same approach the elector already uses + for its own unhandled-exception path. + """ + for name, gt in self._greenlets: + if gt.dead: + LOG.error( + "Worker greenlet %r has unexpectedly died; exiting so " + "that the process manager can restart neutron-server.", + name, + ) + sys.exit(1) + @logging_exceptions(LOG) def _status_updating_thread(self, expected_epoch): """_status_updating_thread @@ -493,8 +553,10 @@ def _status_updating_thread(self, expected_epoch): TrackTask("STATUS_UPDATING") LOG.info("Status updating thread started.") while self._epoch == expected_epoch: - # Only handle updates if we are the master node. - if self.elector.master(): + self._check_greenlets_alive() + # Only handle updates if we are healthily the master node. See + # Elector.healthy() for why we use healthy() rather than master(). + if self.elector.healthy(): if self._etcd_watcher is None: LOG.info("Became the master, starting StatusWatcher") self._etcd_watcher = StatusWatcher(self) @@ -1232,8 +1294,9 @@ def resync_monitor_thread(self, launch_epoch): LOG.info("Resync monitor thread started") while self._epoch == launch_epoch: - # Only monitor the resync if we are the master node. - if self.elector.master(): + self._check_greenlets_alive() + # Only monitor the resync if we are healthily the master node. + if self.elector.healthy(): LOG.info("I am master: monitoring periodic resync") curr_time = datetime.now() @@ -1276,8 +1339,13 @@ def periodic_resync_thread(self, launch_epoch): try: LOG.info("Periodic resync thread started") while self._epoch == launch_epoch: - # Only do the resync if we are the master node. - if self.elector.master(): + # Only do the resync if we are healthily the master node AND + # etcd still agrees that we are the master. The extra etcd + # read (vs. plain healthy()) defends against the in-process + # flag disagreeing with etcd's ground truth; resync is much + # more expensive than a single etcd GET, so the extra check + # is cheap by comparison. + if self.elector.confirmed_master(): LOG.info("I am master: doing periodic resync") start_time = datetime.now() @@ -1343,8 +1411,8 @@ def periodic_compaction_thread(self, launch_epoch): try: LOG.info("Periodic compaction thread started") while self._epoch == launch_epoch: - # Only do the compaction if we are the master node. - if self.elector.master(): + # Only do the compaction if we are healthily the master node. + if self.elector.healthy(): LOG.info("I am master: doing periodic compaction") try: diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py index bdcd88a4c6e..10c3fe64167 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py @@ -19,6 +19,7 @@ # Etcd-based transport for the Calico/OpenStack Plugin. import collections +from datetime import datetime, timezone import json from oslo_log import log @@ -26,6 +27,18 @@ from networking_calico import datamodel_v2 from networking_calico import etcdutils from networking_calico.common import config as calico_config +from networking_calico.monotonic import monotonic_time + + +# If a Felix status update we receive from etcd has a "time" field more than +# this many seconds in the past, we are running behind and should warn the +# operator. Felix writes status updates every 30s by default, so anything +# materially above that indicates a processing backlog. +STALE_STATUS_WARN_SECS = 300 + +# Rate-limit stale-status warnings to at most one per this many seconds, to +# avoid flooding the log when every update in a large batch is stale. +STALE_STATUS_WARN_INTERVAL_SECS = 300 LOG = log.getLogger(__name__) @@ -79,6 +92,11 @@ def __init__(self, calico_driver): # deduplicate before passing on to the Neutron DB. self._felix_live_rev = {} + # Monotonic time of the last stale-status WARNING we logged. Used to + # rate-limit the warning so we do not flood the log when the whole + # cluster is backlogged. + self._last_stale_warn = 0.0 + # Register for felix uptime updates. self.register_path( status_path + "//status", @@ -127,6 +145,7 @@ def _on_status_set(self, response, hostname): except (ValueError, TypeError): LOG.warning("Bad JSON data for key %s: %s", response.key, response.value) else: + self._check_for_stale_status(hostname, value) mod_revision = response.mod_revision if self._felix_live_rev.get(hostname) != mod_revision: self.calico_driver.on_felix_alive( @@ -135,6 +154,61 @@ def _on_status_set(self, response, hostname): ) self._felix_live_rev[hostname] = mod_revision + def _check_for_stale_status(self, hostname, value): + """Warn the operator if we are processing materially stale updates. + + If the "time" field inside the status value is significantly older + than wall-clock now, this StatusWatcher is processing events slower + than Felix is producing them, and a backlog is building up. Left + unaddressed this causes neutron to see agent up/down transitions + hours after they actually happened. Warn the operator so they can + tune ReportingIntervalSecs / agent_down_time or investigate why + processing is slow. + + Rate-limited to one warning per STALE_STATUS_WARN_INTERVAL_SECS. + """ + if self.processing_snapshot: + # During an initial-snapshot replay the "time" values will + # legitimately look old: Felix wrote them some time ago and + # we're only now reading the subtree. That is not evidence of + # a processing backlog - skip the check in this case. + return + status_time_str = value.get("time") + if not status_time_str: + return + try: + # Felix writes the time in RFC3339 with a trailing "Z"; convert + # to a +00:00 offset for datetime.fromisoformat (which has only + # accepted the bare "Z" suffix since Python 3.11). + status_time = datetime.fromisoformat(status_time_str.replace("Z", "+00:00")) + except ValueError: + LOG.warning( + "Could not parse status time %r for host %s", + status_time_str, + hostname, + ) + return + if status_time.tzinfo is None: + # Treat naive timestamps (no timezone info) as UTC so that + # the subtraction below does not raise TypeError. + status_time = status_time.replace(tzinfo=timezone.utc) + lag = (datetime.now(tz=timezone.utc) - status_time).total_seconds() + if lag <= STALE_STATUS_WARN_SECS: + return + now_mono = monotonic_time() + if now_mono - self._last_stale_warn < STALE_STATUS_WARN_INTERVAL_SECS: + return + self._last_stale_warn = now_mono + LOG.warning( + "Processing stale Felix status update for host %s: the update was" + " written %.0fs ago (threshold %ds). StatusWatcher is not keeping" + " up with the rate of updates; consider raising ReportingIntervalSecs" + " and agent_down_time in Neutron / Felix config.", + hostname, + lag, + STALE_STATUS_WARN_SECS, + ) + def _on_status_del(self, response, hostname): """Called when Felix's status key expires. Implies felix is dead.""" # Notes: diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py index 898b050b7b1..a0606676584 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py @@ -213,6 +213,12 @@ def __init__(self, *args, **kwargs): def master(self): return True + def healthy(self): + return True + + def confirmed_master(self): + return True + def stop(self): pass diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py index 34762121575..a228bb01faf 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py @@ -267,3 +267,134 @@ def test_check_master_process_still_alive(self, m_exists): self.assertEqual([], client.transaction.mock_calls) client.failure = None self._wait_and_stop(client, elector) + + +class TestHealthyAndConfirmedMaster(unittest.TestCase): + """Tests for Elector.healthy() and Elector.confirmed_master(). + + These don't exercise _run / _vote / _become_master; they construct an + Elector, immediately kill its greenlet, and then drive the new methods + directly by manipulating the internal state the methods read. + """ + + def setUp(self): + super(TestHealthyAndConfirmedMaster, self).setUp() + # Prevent sys.exit() in _run from interfering if the greenlet we + # spawn hits an exception before we kill it. + self.sys_exit_p = mock.patch("sys.exit") + self.sys_exit_p.start() + etcdv3._client = mock.Mock() + + def tearDown(self): + self.sys_exit_p.stop() + etcdv3._client = None + super(TestHealthyAndConfirmedMaster, self).tearDown() + + def _make_elector(self, ttl=15): + elector = election.Elector("server-id", "/bloop", interval=5, ttl=ttl) + # Kill the spawned greenlet so we can drive state manually without + # racing against _run. Don't call wait() — it would re-raise the + # greenlet's GreenletExit into the test, which is not what we're + # testing. + elector._greenlet.kill() + return elector + + @staticmethod + def _alive_greenlet_stub(): + """Return a stub greenlet object that looks alive to healthy().""" + stub = mock.Mock() + stub.dead = False + return stub + + def test_healthy_false_when_not_master(self): + elector = self._make_elector() + self.assertFalse(elector.master()) + self.assertFalse(elector.healthy()) + + def test_healthy_false_when_stopped(self): + elector = self._make_elector() + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + elector._stopped = True + self.assertFalse(elector.healthy()) + + def test_healthy_false_when_greenlet_dead(self): + elector = self._make_elector() + elector._master = True + elector._last_refresh = election.monotonic_time() + # _make_elector has already killed the greenlet; healthy() should + # notice that it is dead and return False despite _master being set. + self.assertTrue(elector._greenlet.dead) + self.assertFalse(elector.healthy()) + + def test_healthy_false_when_lease_stale(self): + elector = self._make_elector(ttl=15) + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + # Pretend the last refresh was much longer ago than the ttl. + elector._last_refresh = election.monotonic_time() - 100 + self.assertFalse(elector.healthy()) + + def test_healthy_true_when_master_and_fresh(self): + elector = self._make_elector(ttl=15) + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + self.assertTrue(elector.healthy()) + + def test_confirmed_master_false_when_unhealthy(self): + elector = self._make_elector() + self.assertFalse(elector.confirmed_master()) + + def test_confirmed_master_true_when_etcd_agrees(self): + elector = self._make_elector() + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + expected = elector.id_string + with mock.patch( + "networking_calico.etcdv3.get", + return_value=(expected, 123), + ): + self.assertTrue(elector.confirmed_master()) + + def test_confirmed_master_clears_flag_when_etcd_has_different_value(self): + elector = self._make_elector() + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + with mock.patch( + "networking_calico.etcdv3.get", + return_value=("someone-else:99", 123), + ): + self.assertFalse(elector.confirmed_master()) + # The mismatch should have cleared our local _master flag so future + # healthy() calls return False without needing another etcd read. + self.assertFalse(elector._master) + + def test_confirmed_master_clears_flag_when_etcd_has_no_key(self): + elector = self._make_elector() + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + with mock.patch( + "networking_calico.etcdv3.get", + side_effect=etcdv3.KeyNotFound(), + ): + self.assertFalse(elector.confirmed_master()) + self.assertFalse(elector._master) + + def test_confirmed_master_false_on_etcd_error_but_keeps_flag(self): + elector = self._make_elector() + elector._greenlet = self._alive_greenlet_stub() + elector._master = True + elector._last_refresh = election.monotonic_time() + with mock.patch( + "networking_calico.etcdv3.get", + side_effect=e3e.ConnectionFailedError(), + ): + # Transient etcd error: skip this cycle but don't forcibly + # demote ourselves - the next healthy() check will decide. + self.assertFalse(elector.confirmed_master()) + self.assertTrue(elector._master) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_monitor_thread.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_monitor_thread.py index 0ff731f0194..cce28c92a88 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_monitor_thread.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_monitor_thread.py @@ -66,6 +66,8 @@ def increment_epoch(actual_sleep_time): def test_monitor_does_nothing_when_not_master(self): """Test that a driver that is not master does not monitor.""" self.driver.elector.master.return_value = False + self.driver.elector.healthy.return_value = False + self.driver.elector.confirmed_master.return_value = False self.mock_sleep.side_effect = self.simulate_epoch_progression() self.driver.resync_monitor_thread(INITIAL_EPOCH) @@ -77,6 +79,8 @@ def test_monitor_logs_error_when_over_max(self): """Test that an error is logged when interval surpasses maximum.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True fake_resync_time = datetime.now() - timedelta(seconds=TEST_MAX_INTERVAL + 1) self.driver.last_resync_time = fake_resync_time self.mock_sleep.side_effect = self.simulate_epoch_progression() @@ -93,6 +97,8 @@ def test_monitor_no_error_if_interval_under_max(self): """If interval is below max, no error should be logged.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True self.mock_sleep.side_effect = self.simulate_epoch_progression() self.driver.resync_monitor_thread(INITIAL_EPOCH) @@ -102,9 +108,13 @@ def test_monitor_no_error_if_interval_under_max(self): def test_monitor_exception_stops_elector(self): """On unexpected exception, elector.stop() must be called.""" self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True with mock.patch.object(self.driver, "elector") as mock_elector: - mock_elector.master.side_effect = Exception("Test exception") + # resync_monitor_thread calls elector.healthy(); make it raise + # to exercise the exception-handling path. + mock_elector.healthy.side_effect = Exception("Test exception") with self.assertRaises(Exception): self.driver.resync_monitor_thread(INITIAL_EPOCH) @@ -115,6 +125,8 @@ def test_resync_resets_time(self): """Test that resync resets current interval duration to below max.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True fake_resync_time = datetime.now() - timedelta(seconds=TEST_MAX_INTERVAL + 1) self.driver.last_resync_time = fake_resync_time @@ -140,6 +152,8 @@ def test_errors_continue_to_log(self): """Test that errors continue logging if resync does not occur.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True fake_resync_time = datetime.now() - timedelta(seconds=TEST_MAX_INTERVAL + 1) self.driver.last_resync_time = fake_resync_time @@ -166,6 +180,8 @@ def test_sleep_time_logic_before_deadline(self, mock_datetime): """Test that we sleep until deadline if there is time left.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True curr_time = datetime.now() self.driver.last_resync_time = curr_time @@ -181,6 +197,8 @@ def test_sleep_time_logic_after_deadline(self): """Test that we poll if the deadline has passed.""" lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL self.driver.elector.master.return_value = True + self.driver.elector.healthy.return_value = True + self.driver.elector.confirmed_master.return_value = True fake_resync_time = datetime.now() - timedelta(seconds=TEST_MAX_INTERVAL + 1) self.driver.last_resync_time = fake_resync_time diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py index 2635c381fcc..7e7df715328 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py @@ -1579,6 +1579,8 @@ def test_not_master_does_not_resync(self): with mock.patch.object(self.driver, "elector") as m_elector: m_elector.master.return_value = False + m_elector.healthy.return_value = False + m_elector.confirmed_master.return_value = False # Allow the etcd transport's resync thread to run. Nothing will # happen. @@ -2266,7 +2268,10 @@ def test_status_thread_mainline(self, m_StatusWatcher): count = [0] with mock.patch.object(self.driver, "elector") as m_elector: + # _status_updating_thread checks elector.healthy() rather than + # master() - configure both for safety. m_elector.master.return_value = True + m_elector.healthy.return_value = True def maybe_end_loop(*args, **kwargs): if count[0] == 2: @@ -2275,6 +2280,7 @@ def maybe_end_loop(*args, **kwargs): if count[0] == 4: # After a few loops, stop being the master... m_elector.master.return_value = False + m_elector.healthy.return_value = False if count[0] > 6: # Then terminate the loop after a few more... self.driver._epoch += 1 diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py new file mode 100644 index 00000000000..756b6544557 --- /dev/null +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_status.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2026 Tigera, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Targeted unit tests for StatusWatcher helpers that do not need the full +plugin-etcd test harness. See test_plugin_etcd.py for end-to-end watcher +tests. +""" +from datetime import datetime, timedelta, timezone +import unittest + +import mock + +from networking_calico.plugins.ml2.drivers.calico import status + + +class TestCheckForStaleStatus(unittest.TestCase): + """Exercise StatusWatcher._check_for_stale_status in isolation. + + The real __init__ pulls in config and an EtcdWatcher; we skip it via + __new__ and set only the attributes the method reads. + """ + + def setUp(self): + super(TestCheckForStaleStatus, self).setUp() + self.watcher = status.StatusWatcher.__new__(status.StatusWatcher) + self.watcher._last_stale_warn = 0.0 + self.watcher.processing_snapshot = False + + def _fmt(self, dt): + return dt.strftime("%Y-%m-%dT%H:%M:%SZ") + + def test_fresh_update_does_not_warn(self): + fresh = self._fmt(datetime.now(tz=timezone.utc)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": fresh}) + m_warn.assert_not_called() + self.assertEqual(0.0, self.watcher._last_stale_warn) + + def test_stale_update_warns(self): + stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + m_warn.assert_called_once() + # First positional arg of the single call is the log format string. + self.assertIn("stale Felix status update", m_warn.call_args.args[0]) + self.assertGreater(self.watcher._last_stale_warn, 0.0) + + def test_stale_update_is_rate_limited(self): + stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + self.watcher._check_for_stale_status("host2", {"time": stale}) + self.watcher._check_for_stale_status("host3", {"time": stale}) + # Only one warning across several stale updates within the + # rate-limit window. + self.assertEqual(1, m_warn.call_count) + + def test_snapshot_processing_skips_check(self): + self.watcher.processing_snapshot = True + stale = self._fmt(datetime.now(tz=timezone.utc) - timedelta(hours=1)) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + m_warn.assert_not_called() + + def test_missing_time_field_is_silent(self): + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status( + "host1", {"uptime": 10, "first_update": False} + ) + m_warn.assert_not_called() + + def test_stale_naive_timestamp_warns(self): + """A timezone-less timestamp (no trailing Z) should not crash.""" + stale = (datetime.now(tz=timezone.utc) - timedelta(hours=1)).strftime( + "%Y-%m-%dT%H:%M:%S" + ) + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": stale}) + m_warn.assert_called_once() + self.assertIn("stale Felix status update", m_warn.call_args.args[0]) + + def test_unparseable_time_logs_separate_warning(self): + with mock.patch.object(status.LOG, "warning") as m_warn: + self.watcher._check_for_stale_status("host1", {"time": "not a date"}) + m_warn.assert_called_once() + self.assertIn("Could not parse status time", m_warn.call_args.args[0]) + # An unparseable time does not count as a stale-status warning for + # rate-limiting purposes. + self.assertEqual(0.0, self.watcher._last_stale_warn)