Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from networking_calico import etcdv3
from networking_calico.common import config as calico_config
from networking_calico.monotonic import monotonic_time


LOG = log.getLogger(__name__)
Expand Down Expand Up @@ -101,6 +102,13 @@ def __init__(self, server_id, election_key, old_key=None, interval=30, ttl=60):
# Is this the master? To start with, no
self._master = False

# Monotonic timestamp of the last successful lease refresh while
# master. Used by healthy() to detect a silently dead election
# greenlet - if _master stays True but the greenlet has stopped
# refreshing the lease, we are no longer actually the master even
# though self._master says we are.
self._last_refresh = 0.0

# Keep the greenlet ID handy to ease UT.
self._greenlet = eventlet.spawn(self._run)

Expand Down Expand Up @@ -147,15 +155,15 @@ def _vote(self):
value, mod_revision = etcdv3.get(self._key)
mod_revision = int(mod_revision)
except etcdv3.KeyNotFound:
LOG.debug("Try to become the master - key not found")
LOG.info("Try to become the master - key not found")
self._become_master()
assert False, "_become_master() should not return."
except Etcd3Exception as e:
# Something bad and unexpected. Log and reconnect.
self._log_exception("read current master", e)
return

LOG.debug("ID of elected master is : %s", value)
LOG.info("ID of elected master is : %s", value)
if value:
# If we happen to be on the same server, check if the master
# process is still alive.
Expand Down Expand Up @@ -186,7 +194,7 @@ def _vote(self):
timeout=self._interval * 2,
start_revision=mod_revision + 1,
)
LOG.debug("election event: %s", event)
LOG.info("election event: %s", event)
action = event.get("type", "SET").lower()
value = event["kv"].get("value")
mod_revision = int(event["kv"].get("mod_revision", "0"))
Expand All @@ -199,7 +207,7 @@ def _vote(self):
# Something bad and unexpected. Log and reconnect.
self._log_exception("wait for master change", e)
return
LOG.debug("Election key action: %s; new value %s", action, value)
LOG.info("Election key action: %s; new value %s", action, value)
if action in ETCD_DELETE_ACTIONS or value is None:
# Deleted - try and become the master.
LOG.info(
Expand All @@ -223,12 +231,12 @@ def _check_master_process(self, master_id):
return
host = match.group("host")
pid = int(match.group("pid"))
LOG.debug("Parsed key as host = %s, PID = %s", host, pid)
LOG.info("Parsed key as host = %s, PID = %s", host, pid)
if host == self._server_id:
# Check if the PID is still running.
LOG.debug("Previous master was on this server %s", host)
LOG.info("Previous master was on this server %s", host)
if os.path.exists("/proc/%s" % pid):
LOG.debug("Master still running")
LOG.info("Master still running")
else:
LOG.warning(
"Master was on this server but cannot find its "
Expand Down Expand Up @@ -271,6 +279,10 @@ def _become_master(self):
LOG.info("Race: someone else beat us to be master")
raise RestartElection()

# We are now master; start the healthy() watchdog clock. This must
# be kept up to date by the lease-refresh loop below.
self._last_refresh = monotonic_time()

LOG.info(
"Successfully become master - key %s, value %s", self._key, self.id_string
)
Expand All @@ -281,7 +293,7 @@ def _become_master(self):
try:
while not self._stopped:
try:
LOG.debug("Refreshing master role")
LOG.info("Refreshing master role")
# Refresh the lease.
ttl = ttl_lease.refresh()
# Also rewrite the key, so that non-masters see an event on
Expand All @@ -294,7 +306,10 @@ def _become_master(self):
):
LOG.warning("Key changed or deleted; restart election")
raise RestartElection()
LOG.debug("Refreshed master role, TTL now is %d", ttl)
LOG.info("Refreshed master role, TTL now is %d", ttl)
# Record that the refresh succeeded. healthy() uses this
# to detect if the refresh loop silently stops running.
self._last_refresh = monotonic_time()
except RestartElection:
raise
except Exception as e:
Expand Down Expand Up @@ -362,6 +377,90 @@ def master(self):
"""
return self._master and not self._stopped

def confirmed_master(self):
"""Am I healthily the master AND does etcd still agree?

Performs a healthy() check first (cheap, local). If that passes,
also re-reads the election key from etcd and confirms that its
value matches our id_string. Intended for callers that are about
to start expensive master-only work (e.g. a periodic resync)
and want an extra belt-and-braces check against an in-process
state disagreement with etcd.

This involves a synchronous etcd GET, so do not call in a hot
loop - use healthy() for that.

returns: True if we are confirmed master according to both our
own local state and etcd's current view.
"""
if not self.healthy():
return False
try:
value, _mod_revision = etcdv3.get(self._key)
except etcdv3.KeyNotFound:
LOG.warning(
"Election key %s not present in etcd but _master is True; "
"treating as no longer master",
self._key,
)
self._master = False
return False
except Etcd3Exception as e:
# Treat a transient etcd error as "don't know"; be conservative
# and skip master-only work this time. We will retry soon.
self._log_exception("confirm master", e)
return False
if value != self.id_string:
LOG.warning(
"Election key %s in etcd has value %r but we expected %r; "
"treating as no longer master",
self._key,
value,
self.id_string,
)
self._master = False
return False
return True

def healthy(self):
"""Am I healthily the master?

Stricter than master(). Returns True only if (a) _master is set,
(b) we have not been stopped, (c) the election greenlet is still
alive, and (d) the lease was refreshed within the last self._ttl
seconds.

master() alone is unsafe because self._master is a local Python
flag that is set to True when we win the election and only cleared
if the greenlet exits normally via _attempt_step_down() or the
refresh loop's finally clause. If the greenlet dies silently -
e.g. due to an eventlet-level issue that drops the frame without
unwinding Python exceptions - _master stays True indefinitely.
healthy() catches that case by cross-checking the greenlet state
and the refresh timestamp.

returns: True if this is the master and the election greenlet is
confirmed to still be working.
"""
if not self._master or self._stopped:
return False
if self._greenlet is None or self._greenlet.dead:
LOG.warning(
"Election greenlet is dead but _master is still True; "
"treating as no longer master"
)
return False
since_refresh = monotonic_time() - self._last_refresh
if since_refresh > self._ttl:
LOG.warning(
"Election lease has not been refreshed for %.1fs (ttl %ds); "
"treating as no longer master",
since_refresh,
self._ttl,
)
return False
return True

def stop(self):
self._stopped = True
if self._greenlet and not self._greenlet.dead:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from datetime import datetime, timedelta
import os
import re
import sys
import threading
import uuid
from functools import wraps
Expand Down Expand Up @@ -341,6 +342,11 @@ def __init__(self):
# Last resync completion time
self.last_resync_time = datetime.now()

# List of (name, greenlet) for the long-running worker greenlets
# spawned by _post_fork_init. Used by _check_greenlets_alive() to
# detect silent greenlet death.
self._greenlets = []

# Tell the monkeypatch where we are.
global mech_driver
assert mech_driver is None
Expand Down Expand Up @@ -462,13 +468,44 @@ def _post_fork_init(self, voting=False):
# We deliberately do this last, to ensure that all of the setup
# above is complete before we start running.
self._epoch += 1
eventlet.spawn(self.resync_monitor_thread, self._epoch)
eventlet.spawn(self.periodic_resync_thread, self._epoch)
self._greenlets = []
self._greenlets.append(
(
"resync_monitor",
eventlet.spawn(self.resync_monitor_thread, self._epoch),
)
)
resync_gt = eventlet.spawn(self.periodic_resync_thread, self._epoch)
if cfg.CONF.calico.resync_interval_secs > 0:
# Only watchdog the resync thread if it is expected to
# keep running. When resync_interval_secs == 0 the
# thread does a single resync and then exits
# intentionally.
self._greenlets.append(("periodic_resync", resync_gt))
if cfg.CONF.calico.etcd_compaction_period_mins > 0:
eventlet.spawn(self.periodic_compaction_thread, self._epoch)
eventlet.spawn(self._status_updating_thread, self._epoch)
for _ in range(cfg.CONF.calico.num_port_status_threads):
eventlet.spawn(self._loop_writing_port_statuses, self._epoch)
self._greenlets.append(
(
"periodic_compaction",
eventlet.spawn(
self.periodic_compaction_thread, self._epoch
),
)
)
self._greenlets.append(
(
"status_updating",
eventlet.spawn(self._status_updating_thread, self._epoch),
)
)
for i in range(cfg.CONF.calico.num_port_status_threads):
self._greenlets.append(
(
"port_status_%d" % i,
eventlet.spawn(
self._loop_writing_port_statuses, self._epoch
),
)
)
else:
LOG.info(
"PID %s: Not a voting participant; "
Expand All @@ -482,6 +519,29 @@ def _post_fork_init(self, voting=False):
"Calico mechanism driver initialisation done in process %s", current_pid
)

def _check_greenlets_alive(self):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current code (without this change), even though we might have multiple masters doing the same job, the cluster is actually still functional (e.g., it's ok to have two masters writing agent/status updates). My main worry here (with this change) is what if the elector threads on all neutron-servers die at roughly the same time? In this case, everyone will stop doing the job until we restart it here. This may take a LONG time, as this function is called only when we start to watch statuses or in resync monitor (which will sleep based on the configured time).

It's unfortunate that these threads are not reliable and none of the solutions on top my mind handles this well (e.g., have yet another thread to monitor this (or alter the existing monitor thread to be more general-purposed)), but lemme know what do you think?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhanz1 Thanks for looking at this. I entirely agree that this is a theoretical problem:

what if the elector threads on all neutron-servers die at roughly the same time?

But do you think there is anything in this PR that has made that more likely? I don't think the PR does that, but perhaps I have missed something.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I didn't realize that the status update and resync monitor thread is constantly (every 5 secs) checking if the threads are alive if it's not the leader, my bad. I was thinking could there be a big gap between all three electors die till at least one the neutron-servers restarts.

No it doesn't make the elector threads on all neutron-servers die at roughly the same time more likely from my reading. I was simply thinking about edge cases because we don't have enough information on why exactly the threads die.

"""Detect if any long-running worker greenlet has silently died.

Under eventlet, a greenlet can occasionally die without unwinding
its Python frames (e.g. due to a hub-level error), leaving no
traceback in the log and no state cleanup. This method provides
mutual watchdogging: each of the driver's long-running loops
calls it periodically to verify that the others are still alive.

If a dead greenlet is found, we log an error and exit. The
process manager (systemd) will restart neutron-server, which is
the safest recovery — the same approach the elector already uses
for its own unhandled-exception path.
"""
for name, gt in self._greenlets:

@zhanz1 zhanz1 Apr 15, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be more aggressive - not trusting eventlet at all.

If an elector is not doing it's job (for master, this would be not updating self._last_refresh, for non-master, this would be the last time they watched the vote key change (so something like self._last_checked)), do a restart.

If the port status update worker thread is not consuming events from the queue, do a restart.

if gt.dead:
LOG.error(
"Worker greenlet %r has unexpectedly died; exiting so "
"that the process manager can restart neutron-server.",
name,
)
sys.exit(1)

@logging_exceptions(LOG)
def _status_updating_thread(self, expected_epoch):
"""_status_updating_thread
Expand All @@ -493,8 +553,10 @@ def _status_updating_thread(self, expected_epoch):
TrackTask("STATUS_UPDATING")
LOG.info("Status updating thread started.")
while self._epoch == expected_epoch:
# Only handle updates if we are the master node.
if self.elector.master():
self._check_greenlets_alive()
# Only handle updates if we are healthily the master node. See
# Elector.healthy() for why we use healthy() rather than master().
if self.elector.healthy():
if self._etcd_watcher is None:
LOG.info("Became the master, starting StatusWatcher")
self._etcd_watcher = StatusWatcher(self)
Expand Down Expand Up @@ -1232,8 +1294,9 @@ def resync_monitor_thread(self, launch_epoch):
LOG.info("Resync monitor thread started")

while self._epoch == launch_epoch:
# Only monitor the resync if we are the master node.
if self.elector.master():
self._check_greenlets_alive()
# Only monitor the resync if we are healthily the master node.
if self.elector.healthy():
LOG.info("I am master: monitoring periodic resync")

curr_time = datetime.now()
Expand Down Expand Up @@ -1276,8 +1339,13 @@ def periodic_resync_thread(self, launch_epoch):
try:
LOG.info("Periodic resync thread started")
while self._epoch == launch_epoch:
# Only do the resync if we are the master node.
if self.elector.master():
# Only do the resync if we are healthily the master node AND
# etcd still agrees that we are the master. The extra etcd
# read (vs. plain healthy()) defends against the in-process
# flag disagreeing with etcd's ground truth; resync is much
# more expensive than a single etcd GET, so the extra check
# is cheap by comparison.
if self.elector.confirmed_master():
LOG.info("I am master: doing periodic resync")
start_time = datetime.now()

Expand Down Expand Up @@ -1343,8 +1411,8 @@ def periodic_compaction_thread(self, launch_epoch):
try:
LOG.info("Periodic compaction thread started")
while self._epoch == launch_epoch:
# Only do the compaction if we are the master node.
if self.elector.master():
# Only do the compaction if we are healthily the master node.
if self.elector.healthy():
LOG.info("I am master: doing periodic compaction")

try:
Expand Down
Loading
Loading