Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""
import os
import random
import re
import socket
import sys
import time
Expand Down Expand Up @@ -164,6 +165,14 @@ def _vote(self):
return

LOG.debug("ID of elected master is : %s", value)
if value:
# If the previous master was on this host, check whether its process is
# still alive. This recovers from the case where the previous master
# was killed without running its step-down cleanup (e.g. SIGKILL, or
# the worker greenlet dying without unwinding) so the stale election
# key would otherwise have to wait out the lease TTL before any node
# could win.
self._check_master_process(value)

while not self._stopped:
# We know another instance is the master. Wait until something
Expand Down Expand Up @@ -211,6 +220,44 @@ def _vote(self):
)
self._become_master()

def _check_master_process(self, master_id):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "master" process over here is really the manager process right? If we run this function in the manager process itself (and it somehow died/malfunctioned), then presumably this function will not run either?

This was working before, IIUC, because each process can become master and they can check the master process is alive or not. Now with just one process can be master, we kinda lose this ability. Therefore, for checking malfunctioning/failures//crashes, I would think we should rely on TTL? It would probably make sense to make it configurable.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the master process is the "manager" process, because that is the one that includes the Elector.

If we run this function in the manager process itself (and it somehow died/malfunctioned), then presumably this function will not run either?

I'm afraid I don't understand your query here. The specific system test scenario here was the neutron-server being restarted in order to pick up a config change. So the manager process in the old neutron-server is killed, and then a new manager process runs in the new neutron-server.

This was working before, IIUC, because each process can become master and they can check the master process is alive or not. Now with just one process can be master, we kinda lose this ability.

No, I don't think that's right. It was working before because of the _check_master_process logic. That was removed in #12668 , and this PR now reinstates it.

Therefore, for checking malfunctioning/failures//crashes, I would think we should rely on TTL?

What TTL do you have in mind here? I'm afraid I don't understand your suggestion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see what you are referring to here, I believe I misunderstood the intention of this function. Before Chao & I made any changes, IIUC, all neutron-server processes will run _post_fork_init and therefore run Elector. One of them will be elected as leader, and the rest will just do this _check_master_process. With this design, if the old leader process somehow died, then other processes can quickly delete the key and restart the election. When I was writing #12668, I thought this was the intention and hence removed it (because there will only be one process running Elector). I did not think about the restarts.

In terms of the TTL, I mean the lease TTL (MASTER_TIMEOUT). I think it would be helpful to make it configurable so that in case of a machine failure, where no one will delete the election key, we can shrink the time it takes for other machines to step in and become master.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it might be useful for MASTER_TIMEOUT and MASTER_REFRESH_INTERVAL to be configurable. But I think that is independent of the current PR, isn't it? 10s is already quite low for MASTER_REFRESH_INTERVAL.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ofc, this can be a separate PR. While the refresh interval is 10s, I think it's the 60s timeout who's blocking longer for other neutron-server to step up?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but (in case it's not clear) the current PR already fixes that. When the Neutron server restarts:

  1. It immediately creates the ManagerWorker process, -> _init_start_calico_manager() -> self.elector.start()
  2. Elector calls _vote, which finds the key and calls _check_master_process
  3. _check_master_process parses successfully, finds that host matches its own, but PID no longer running, and so deletes the key.
  4. _vote now either sees the delete, or KeyNotFound, and so calls _become_master.

The important point is that that all happens immediately without waiting for any timeout or refresh interval.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is happy path where the neutron-server is restarted and I think we are on the same page here :D. I'm more referring to when there is a machine failure (i.e., the machine crashed) and thus the neutron-server is never restarted (because it can't), and it would then take this 60s for other machines running neutron-server to become master - and in that case if we want to reduce the downtime, we'll need to reduce the 60s - but this should be another PR as we discussed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense. Would you like to prepare that PR?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, can do when I get a chance.

"""If the previous master was on this host, check whether the process is
still alive; if not, delete the stale election key (with CAS against the
observed value) and restart the election.

:param master_id: Value read from the election key.
"""
# Defensive: only parse the key if it looks like what _become_master
# writes. An unparseable value means someone else's election scheme is
# writing here, or the format has changed -- leave it alone.
match = re.match(r"^(?P<host>[^:]+):(?P<pid>\d+)$", master_id)
if not match:
LOG.warning("Unable to parse master ID: %r.", master_id)
return
host = match.group("host")
pid = int(match.group("pid"))
LOG.debug("Parsed key as host = %s, PID = %s", host, pid)
if host != self._server_id:
return
LOG.debug("Previous master was on this server %s", host)
if os.path.exists("/proc/%s" % pid):
LOG.debug("Master still running")
return
LOG.warning(
"Master was on this server but cannot find its PID in /proc. "
"Removing stale election key."
)
try:
deleted = etcdv3.delete(self._key, existing_value=master_id)
except Etcd3Exception as e:
self._log_exception("remove stale key from dead master", e)
deleted = False
if not deleted:
# Either the CAS-delete found a value that no longer matches (so
# somebody else has already moved the election on), or the etcd
# call failed. Either way, restart the election to re-read state.
raise RestartElection()

def _become_master(self):
"""_become_master

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,14 @@ def get_workers(self):
``calico-resync`` from a CD pipeline, or by leaving ``always`` set on exactly
one neutron-server in the deployment.
"""
# CalicoManagerWorker gets a back-reference to the driver so its
# stop() can reach self.elector and step down cleanly on graceful
# shutdown -- otherwise the elector greenlet is killed without
# running its finally _attempt_step_down, the election key stays
# in etcd until the lease expires, and the next neutron-server
# restart has to wait out that TTL before anyone can win.
services = [
CalicoManagerWorker(),
CalicoManagerWorker(driver=self),
CalicoAgentStatusWatcherWorker(),
CalicoEndpointStatusWatcherWorker(),
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,87 @@ def test_master_failure(self):
)
elector.start()
self._wait_and_stop(client, elector)


class TestCheckMasterProcess(unittest.TestCase):
"""Direct tests for Elector._check_master_process.

These tests instantiate the Elector but never start the election greenlet;
they call _check_master_process synchronously with mocked /proc and
etcdv3.delete.
"""

HOST = "this-host"
OTHER_HOST = "other-host"
KEY = "/calico/v2/no-region/neutron_election"

def setUp(self):
super(TestCheckMasterProcess, self).setUp()
self.elector = election.Elector(
self.HOST,
self.KEY,
mock.MagicMock(),
interval=5,
ttl=15,
)

def test_same_host_live_pid_does_nothing(self):
with mock.patch("os.path.exists", return_value=True) as m_exists, mock.patch(
"networking_calico.etcdv3.delete"
) as m_delete:
# No RestartElection, no delete call.
self.elector._check_master_process("%s:12345" % self.HOST)
m_exists.assert_called_once_with("/proc/12345")
m_delete.assert_not_called()

def test_same_host_dead_pid_deletes_then_returns(self):
# On a successful CAS-delete the method returns normally (the watch
# in _vote will then see the delete event and we will try to become
# master through the normal path).
with mock.patch("os.path.exists", return_value=False), mock.patch(
"networking_calico.etcdv3.delete", return_value=True
) as m_delete:
self.elector._check_master_process("%s:99999" % self.HOST)
m_delete.assert_called_once_with(
self.KEY, existing_value="%s:99999" % self.HOST
)

def test_same_host_dead_pid_cas_fail_raises(self):
# CAS-delete returning False means somebody else has already moved
# the election on; we restart so the next _vote sees the new state.
with mock.patch("os.path.exists", return_value=False), mock.patch(
"networking_calico.etcdv3.delete", return_value=False
):
with self.assertRaises(election.RestartElection):
self.elector._check_master_process("%s:99999" % self.HOST)

def test_same_host_dead_pid_etcd_exception_raises(self):
# etcd-side error during the cleanup delete: log and restart.
with mock.patch("os.path.exists", return_value=False), mock.patch(
"networking_calico.etcdv3.delete",
side_effect=e3e.ConnectionFailedError(),
):
with self.assertRaises(election.RestartElection):
self.elector._check_master_process("%s:99999" % self.HOST)

def test_different_host_does_nothing(self):
# Previous master was on another node -- not our problem; defer to
# the lease TTL for cleanup if that node has died.
with mock.patch("os.path.exists") as m_exists, mock.patch(
"networking_calico.etcdv3.delete"
) as m_delete:
self.elector._check_master_process("%s:12345" % self.OTHER_HOST)
m_exists.assert_not_called()
m_delete.assert_not_called()

def test_unparseable_value_does_nothing(self):
# A value that doesn't match "<host>:<pid>" is left alone; warn but
# do not attempt any cleanup.
with mock.patch("os.path.exists") as m_exists, mock.patch(
"networking_calico.etcdv3.delete"
) as m_delete, mock.patch.object(election.LOG, "warning") as m_warn:
self.elector._check_master_process("not-a-valid-id")
m_exists.assert_not_called()
m_delete.assert_not_called()
m_warn.assert_called_once()
self.assertIn("Unable to parse master ID", m_warn.call_args.args[0])
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,40 @@ class CalicoManagerWorker(worker.BaseWorker):
"""Service for doing election and compaction.

The super class will trigger the post_fork_initialize in the mech driver.

Holds a back-reference to the mech driver so stop() can ask the elector
to step down cleanly. Without that, neutron-server's SIGTERM would just
kill the elector greenlet, the ``finally: _attempt_step_down()`` block
would not run, and the election key would linger in etcd until its lease
expired -- adding ttl seconds of delay before any node could win the
next election.
"""

def __init__(self, driver=None, *args, **kwargs):
super(CalicoManagerWorker, self).__init__(*args, **kwargs)
self._driver = driver

def start(self, name="calico-manager", desc=None):
"""Start service."""
super(CalicoManagerWorker, self).start(name, desc)

def stop(self):
"""Stop service."""
"""Stop service.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. IIUC (correct me if I'm wrong), when we do something like systemctl stop neutron-server, it will send a SIGINT to all neutron-server processes, which each process's handler will catch that and call stop?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe so. So when a shutdown is "graceful" in this way, the master elector deletes its etcd key. However, in case that doesn't happen (for any reason), we also reinstate the check_master_process logic so that a stale etcd key can be immediately identified when the neutron server starts again.


Step down from mastership before chaining to the base ``stop()``.
The elector's stop() blocks until its greenlet has exited, which is
what runs the ``finally: _attempt_step_down()`` that deletes the
election key in etcd. Guarded against the case where stop() fires
before post_fork_initialize has populated ``driver.elector``.
"""
elector = getattr(self._driver, "elector", None) if self._driver else None
if elector is not None:
try:
elector.stop()
except Exception:
# Best-effort -- we are already on the shutdown path, and
# the key will expire with its lease either way.
LOG.exception("Error stopping elector during worker shutdown")
super(CalicoManagerWorker, self).stop()
Comment thread
nelljerram marked this conversation as resolved.

def wait(self):
Expand Down
121 changes: 121 additions & 0 deletions networking-calico/networking_calico/tests/test_workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2026 Tigera, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.

"""Unit tests for ``CalicoManagerWorker.stop()``.

The shutdown path is the regression fix in
https://github.com/projectcalico/calico/pull/13069 -- ``stop()`` must ask
the elector to step down so its ``finally: _attempt_step_down()`` clause
runs and the election key is removed from etcd promptly, rather than
lingering for the lease ttl. These tests verify all branches of that
path: elector present, elector absent, and elector raising.

Lives under ``networking_calico/tests/`` rather than
``networking_calico/plugins/ml2/drivers/calico/test/`` because the
latter's ``lib.py`` replaces ``sys.modules['neutron_lib.worker']`` with
a MagicMock at import time -- which collapses
``CalicoManagerWorker`` itself into a MagicMock and makes its real
shutdown code unreachable. Running here keeps ``neutron_lib.worker``
real (the two test directories are run in separate subunit processes
per ``.testr.conf``).
"""

import unittest

import mock

from networking_calico.plugins.ml2.drivers.calico import workers


class TestCalicoManagerWorkerStop(unittest.TestCase):
"""Verify ``CalicoManagerWorker.stop()`` shutdown semantics."""

def setUp(self):
super(TestCalicoManagerWorkerStop, self).setUp()
# Patch the parent's stop() so we can assert it gets chained to
# without dragging in the real oslo_service shutdown plumbing.
# create=True because BaseWorker doesn't define stop() itself --
# it inherits the abstract method from oslo_service.ServiceBase.
self.super_stop_p = mock.patch.object(
workers.worker.BaseWorker, "stop", create=True
)
self.super_stop = self.super_stop_p.start()
self.addCleanup(self.super_stop_p.stop)

def _make_worker(self, driver):
# set_proctitle='off' avoids touching the real process title in
# the test runner.
return workers.CalicoManagerWorker(driver=driver, set_proctitle="off")

def test_stop_steps_down_elector_then_chains_to_super(self):
# Happy path: driver.elector is set, so stop() must call
# elector.stop() exactly once, then super().stop().
driver = mock.Mock()
driver.elector = mock.Mock()
worker = self._make_worker(driver)

worker.stop()

driver.elector.stop.assert_called_once_with()
self.super_stop.assert_called_once_with()

def test_stop_tolerates_elector_attribute_missing(self):
# post_fork_initialize hasn't run yet -- driver has no `elector`
# attribute. stop() must not raise and must still chain to
# super().
driver = mock.Mock(spec=[]) # empty spec => no elector attr
worker = self._make_worker(driver)

worker.stop() # must not raise

self.super_stop.assert_called_once_with()

def test_stop_tolerates_driver_is_none(self):
# Defensive case: _driver itself is None (worker was somehow
# constructed without a driver back-reference).
worker = self._make_worker(driver=None)

worker.stop() # must not raise

self.super_stop.assert_called_once_with()

def test_stop_tolerates_elector_being_none(self):
# post_fork_initialize cleared the elector (or never set it),
# leaving driver.elector = None.
driver = mock.Mock()
driver.elector = None
worker = self._make_worker(driver)

worker.stop() # must not raise

self.super_stop.assert_called_once_with()

def test_stop_swallows_elector_stop_exception(self):
# elector.stop() can fail (e.g. etcd unreachable during shutdown).
# The exception must be logged but not propagate, and super().stop()
# must still be called so the worker process exits cleanly.
driver = mock.Mock()
driver.elector.stop.side_effect = RuntimeError("etcd unreachable")
worker = self._make_worker(driver)

with mock.patch.object(workers.LOG, "exception") as mock_log_exc:
worker.stop() # must not raise

mock_log_exc.assert_called_once()
self.super_stop.assert_called_once_with()


if __name__ == "__main__":
unittest.main()
Loading