diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py index e32eed758fe..08cf10b7258 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py @@ -20,6 +20,7 @@ """ import os import random +import re import socket import sys import time @@ -164,6 +165,14 @@ def _vote(self): return LOG.debug("ID of elected master is : %s", value) + if value: + # If the previous master was on this host, check whether its process is + # still alive. This recovers from the case where the previous master + # was killed without running its step-down cleanup (e.g. SIGKILL, or + # the worker greenlet dying without unwinding) so the stale election + # key would otherwise have to wait out the lease TTL before any node + # could win. + self._check_master_process(value) while not self._stopped: # We know another instance is the master. Wait until something @@ -211,6 +220,44 @@ def _vote(self): ) self._become_master() + def _check_master_process(self, master_id): + """If the previous master was on this host, check whether the process is + still alive; if not, delete the stale election key (with CAS against the + observed value) and restart the election. + + :param master_id: Value read from the election key. + """ + # Defensive: only parse the key if it looks like what _become_master + # writes. An unparseable value means someone else's election scheme is + # writing here, or the format has changed -- leave it alone. + match = re.match(r"^(?P[^:]+):(?P\d+)$", master_id) + if not match: + LOG.warning("Unable to parse master ID: %r.", master_id) + return + host = match.group("host") + pid = int(match.group("pid")) + LOG.debug("Parsed key as host = %s, PID = %s", host, pid) + if host != self._server_id: + return + LOG.debug("Previous master was on this server %s", host) + if os.path.exists("/proc/%s" % pid): + LOG.debug("Master still running") + return + LOG.warning( + "Master was on this server but cannot find its PID in /proc. " + "Removing stale election key." + ) + try: + deleted = etcdv3.delete(self._key, existing_value=master_id) + except Etcd3Exception as e: + self._log_exception("remove stale key from dead master", e) + deleted = False + if not deleted: + # Either the CAS-delete found a value that no longer matches (so + # somebody else has already moved the election on), or the etcd + # call failed. Either way, restart the election to re-read state. + raise RestartElection() + def _become_master(self): """_become_master diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index 01f6ddb7277..52bc11e68e3 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -479,8 +479,14 @@ def get_workers(self): ``calico-resync`` from a CD pipeline, or by leaving ``always`` set on exactly one neutron-server in the deployment. """ + # CalicoManagerWorker gets a back-reference to the driver so its + # stop() can reach self.elector and step down cleanly on graceful + # shutdown -- otherwise the elector greenlet is killed without + # running its finally _attempt_step_down, the election key stays + # in etcd until the lease expires, and the next neutron-server + # restart has to wait out that TTL before anyone can win. services = [ - CalicoManagerWorker(), + CalicoManagerWorker(driver=self), CalicoAgentStatusWatcherWorker(), CalicoEndpointStatusWatcherWorker(), ] diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py index c3f72adeafb..57147bebbd6 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py @@ -287,3 +287,87 @@ def test_master_failure(self): ) elector.start() self._wait_and_stop(client, elector) + + +class TestCheckMasterProcess(unittest.TestCase): + """Direct tests for Elector._check_master_process. + + These tests instantiate the Elector but never start the election greenlet; + they call _check_master_process synchronously with mocked /proc and + etcdv3.delete. + """ + + HOST = "this-host" + OTHER_HOST = "other-host" + KEY = "/calico/v2/no-region/neutron_election" + + def setUp(self): + super(TestCheckMasterProcess, self).setUp() + self.elector = election.Elector( + self.HOST, + self.KEY, + mock.MagicMock(), + interval=5, + ttl=15, + ) + + def test_same_host_live_pid_does_nothing(self): + with mock.patch("os.path.exists", return_value=True) as m_exists, mock.patch( + "networking_calico.etcdv3.delete" + ) as m_delete: + # No RestartElection, no delete call. + self.elector._check_master_process("%s:12345" % self.HOST) + m_exists.assert_called_once_with("/proc/12345") + m_delete.assert_not_called() + + def test_same_host_dead_pid_deletes_then_returns(self): + # On a successful CAS-delete the method returns normally (the watch + # in _vote will then see the delete event and we will try to become + # master through the normal path). + with mock.patch("os.path.exists", return_value=False), mock.patch( + "networking_calico.etcdv3.delete", return_value=True + ) as m_delete: + self.elector._check_master_process("%s:99999" % self.HOST) + m_delete.assert_called_once_with( + self.KEY, existing_value="%s:99999" % self.HOST + ) + + def test_same_host_dead_pid_cas_fail_raises(self): + # CAS-delete returning False means somebody else has already moved + # the election on; we restart so the next _vote sees the new state. + with mock.patch("os.path.exists", return_value=False), mock.patch( + "networking_calico.etcdv3.delete", return_value=False + ): + with self.assertRaises(election.RestartElection): + self.elector._check_master_process("%s:99999" % self.HOST) + + def test_same_host_dead_pid_etcd_exception_raises(self): + # etcd-side error during the cleanup delete: log and restart. + with mock.patch("os.path.exists", return_value=False), mock.patch( + "networking_calico.etcdv3.delete", + side_effect=e3e.ConnectionFailedError(), + ): + with self.assertRaises(election.RestartElection): + self.elector._check_master_process("%s:99999" % self.HOST) + + def test_different_host_does_nothing(self): + # Previous master was on another node -- not our problem; defer to + # the lease TTL for cleanup if that node has died. + with mock.patch("os.path.exists") as m_exists, mock.patch( + "networking_calico.etcdv3.delete" + ) as m_delete: + self.elector._check_master_process("%s:12345" % self.OTHER_HOST) + m_exists.assert_not_called() + m_delete.assert_not_called() + + def test_unparseable_value_does_nothing(self): + # A value that doesn't match ":" is left alone; warn but + # do not attempt any cleanup. + with mock.patch("os.path.exists") as m_exists, mock.patch( + "networking_calico.etcdv3.delete" + ) as m_delete, mock.patch.object(election.LOG, "warning") as m_warn: + self.elector._check_master_process("not-a-valid-id") + m_exists.assert_not_called() + m_delete.assert_not_called() + m_warn.assert_called_once() + self.assertIn("Unable to parse master ID", m_warn.call_args.args[0]) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py index c2d2267a37a..53ef2a83dbd 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py @@ -98,14 +98,40 @@ class CalicoManagerWorker(worker.BaseWorker): """Service for doing election and compaction. The super class will trigger the post_fork_initialize in the mech driver. + + Holds a back-reference to the mech driver so stop() can ask the elector + to step down cleanly. Without that, neutron-server's SIGTERM would just + kill the elector greenlet, the ``finally: _attempt_step_down()`` block + would not run, and the election key would linger in etcd until its lease + expired -- adding ttl seconds of delay before any node could win the + next election. """ + def __init__(self, driver=None, *args, **kwargs): + super(CalicoManagerWorker, self).__init__(*args, **kwargs) + self._driver = driver + def start(self, name="calico-manager", desc=None): """Start service.""" super(CalicoManagerWorker, self).start(name, desc) def stop(self): - """Stop service.""" + """Stop service. + + Step down from mastership before chaining to the base ``stop()``. + The elector's stop() blocks until its greenlet has exited, which is + what runs the ``finally: _attempt_step_down()`` that deletes the + election key in etcd. Guarded against the case where stop() fires + before post_fork_initialize has populated ``driver.elector``. + """ + elector = getattr(self._driver, "elector", None) if self._driver else None + if elector is not None: + try: + elector.stop() + except Exception: + # Best-effort -- we are already on the shutdown path, and + # the key will expire with its lease either way. + LOG.exception("Error stopping elector during worker shutdown") super(CalicoManagerWorker, self).stop() def wait(self): diff --git a/networking-calico/networking_calico/tests/test_workers.py b/networking-calico/networking_calico/tests/test_workers.py new file mode 100644 index 00000000000..92cbf27f8f7 --- /dev/null +++ b/networking-calico/networking_calico/tests/test_workers.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2026 Tigera, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Unit tests for ``CalicoManagerWorker.stop()``. + +The shutdown path is the regression fix in +https://github.com/projectcalico/calico/pull/13069 -- ``stop()`` must ask +the elector to step down so its ``finally: _attempt_step_down()`` clause +runs and the election key is removed from etcd promptly, rather than +lingering for the lease ttl. These tests verify all branches of that +path: elector present, elector absent, and elector raising. + +Lives under ``networking_calico/tests/`` rather than +``networking_calico/plugins/ml2/drivers/calico/test/`` because the +latter's ``lib.py`` replaces ``sys.modules['neutron_lib.worker']`` with +a MagicMock at import time -- which collapses +``CalicoManagerWorker`` itself into a MagicMock and makes its real +shutdown code unreachable. Running here keeps ``neutron_lib.worker`` +real (the two test directories are run in separate subunit processes +per ``.testr.conf``). +""" + +import unittest + +import mock + +from networking_calico.plugins.ml2.drivers.calico import workers + + +class TestCalicoManagerWorkerStop(unittest.TestCase): + """Verify ``CalicoManagerWorker.stop()`` shutdown semantics.""" + + def setUp(self): + super(TestCalicoManagerWorkerStop, self).setUp() + # Patch the parent's stop() so we can assert it gets chained to + # without dragging in the real oslo_service shutdown plumbing. + # create=True because BaseWorker doesn't define stop() itself -- + # it inherits the abstract method from oslo_service.ServiceBase. + self.super_stop_p = mock.patch.object( + workers.worker.BaseWorker, "stop", create=True + ) + self.super_stop = self.super_stop_p.start() + self.addCleanup(self.super_stop_p.stop) + + def _make_worker(self, driver): + # set_proctitle='off' avoids touching the real process title in + # the test runner. + return workers.CalicoManagerWorker(driver=driver, set_proctitle="off") + + def test_stop_steps_down_elector_then_chains_to_super(self): + # Happy path: driver.elector is set, so stop() must call + # elector.stop() exactly once, then super().stop(). + driver = mock.Mock() + driver.elector = mock.Mock() + worker = self._make_worker(driver) + + worker.stop() + + driver.elector.stop.assert_called_once_with() + self.super_stop.assert_called_once_with() + + def test_stop_tolerates_elector_attribute_missing(self): + # post_fork_initialize hasn't run yet -- driver has no `elector` + # attribute. stop() must not raise and must still chain to + # super(). + driver = mock.Mock(spec=[]) # empty spec => no elector attr + worker = self._make_worker(driver) + + worker.stop() # must not raise + + self.super_stop.assert_called_once_with() + + def test_stop_tolerates_driver_is_none(self): + # Defensive case: _driver itself is None (worker was somehow + # constructed without a driver back-reference). + worker = self._make_worker(driver=None) + + worker.stop() # must not raise + + self.super_stop.assert_called_once_with() + + def test_stop_tolerates_elector_being_none(self): + # post_fork_initialize cleared the elector (or never set it), + # leaving driver.elector = None. + driver = mock.Mock() + driver.elector = None + worker = self._make_worker(driver) + + worker.stop() # must not raise + + self.super_stop.assert_called_once_with() + + def test_stop_swallows_elector_stop_exception(self): + # elector.stop() can fail (e.g. etcd unreachable during shutdown). + # The exception must be logged but not propagate, and super().stop() + # must still be called so the worker process exits cleanly. + driver = mock.Mock() + driver.elector.stop.side_effect = RuntimeError("etcd unreachable") + worker = self._make_worker(driver) + + with mock.patch.object(workers.LOG, "exception") as mock_log_exc: + worker.stop() # must not raise + + mock_log_exc.assert_called_once() + self.super_stop.assert_called_once_with() + + +if __name__ == "__main__": + unittest.main()