From 36a0567cb2cd202fdb189e3fc074a7a153a4091e Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 29 Apr 2026 14:31:21 -0400 Subject: [PATCH 01/10] networking-calico: Separate leader jobs into different processes As clusters grow larger, it is hard for a single Python process to do resync, compaction, and status updating at the same time. To address this, separate the jobs into multiple processes. This change will introduce four new worker processes, each in charge of: * CalicoResourceSyncerWorker: Sync resources from Neutron to etcd. * CalicoManagerWorker: Do leader election and periodic compaction. * CalicoAgentStatusWatcherWorker: Watch agent status updates and report them to Neutron. * CalicoEndpointStatusWatcherWorker: Watch endpoint status updates and report them to Neutron. Signed-off-by: Zhan Zhang --- .../plugins/ml2/drivers/calico/election.py | 78 ++--- .../plugins/ml2/drivers/calico/mech_calico.py | 306 ++++++++++-------- .../plugins/ml2/drivers/calico/status.py | 45 ++- .../plugins/ml2/drivers/calico/test/lib.py | 7 +- .../ml2/drivers/calico/test/test_election.py | 160 +++++---- .../drivers/calico/test/test_mech_calico.py | 124 +++---- .../drivers/calico/test/test_plugin_etcd.py | 254 +++++++++------ .../plugins/ml2/drivers/calico/workers.py | 72 +++++ 8 files changed, 593 insertions(+), 453 deletions(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py index 1fd573027b4..fb76787e2ed 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py @@ -20,9 +20,9 @@ """ import os import random -import re import socket import sys +import time from etcd3gw.exceptions import Etcd3Exception @@ -65,7 +65,9 @@ class RestartElection(Exception): class Elector(object): - def __init__(self, server_id, election_key, old_key=None, interval=30, ttl=60): + def __init__( + self, server_id, election_key, is_master, old_key=None, interval=30, ttl=60 + ): """Class that manages elections. :param server_id: Server ID. Must be unique to this server, and should @@ -73,6 +75,9 @@ def __init__(self, server_id, election_key, old_key=None, interval=30, ttl=60): hostname) :param election_key: The etcd key used for the election - e.g. "/calico/v2/no-region/election" + :param is_master: Process-shared value, used by other processes to + determine whether the current neutron-server + instance is the master. :param old_key: A legacy key that does not determine the election, but that we write whenever we write the election_key - e.g. "/calico/v1/election" @@ -91,6 +96,7 @@ def __init__(self, server_id, election_key, old_key=None, interval=30, ttl=60): self._interval = int(interval) self._ttl = int(ttl) self._stopped = False + self._is_master = is_master if self._interval <= 0: raise ValueError("Interval %r is <= 0" % interval) @@ -99,9 +105,10 @@ def __init__(self, server_id, election_key, old_key=None, interval=30, ttl=60): raise ValueError("TTL %r is <= interval %r" % (ttl, interval)) # Is this the master? To start with, no - self._master = False + self._is_master.value = 0 + self._greenlet = None - # Keep the greenlet ID handy to ease UT. + def run(self): self._greenlet = eventlet.spawn(self._run) def _run(self): @@ -156,10 +163,6 @@ def _vote(self): return LOG.debug("ID of elected master is : %s", value) - if value: - # If we happen to be on the same server, check if the master - # process is still alive. - self._check_master_process(value) while not self._stopped: # We know another instance is the master. Wait until something @@ -207,42 +210,6 @@ def _vote(self): ) self._become_master() - def _check_master_process(self, master_id): - """_check_master_process - - If the master happens to be on our host, checks if its process is - still alive. If it is not, cleans up the now-stale election key. - - :param master_id: Value loaded from the election key. - """ - # Defensive. In case we ever change the master ID format, only parse - # it if it looks like what we expect. - match = re.match(r"^(?P[^:]+):(?P\d+)$", master_id) - if not match: - LOG.warning("Unable to parse master ID: %r.", master_id) - return - host = match.group("host") - pid = int(match.group("pid")) - LOG.debug("Parsed key as host = %s, PID = %s", host, pid) - if host == self._server_id: - # Check if the PID is still running. - LOG.debug("Previous master was on this server %s", host) - if os.path.exists("/proc/%s" % pid): - LOG.debug("Master still running") - else: - LOG.warning( - "Master was on this server but cannot find its " - "PID in /proc. Removing stale election key." - ) - try: - deleted = etcdv3.delete(self._key, existing_value=master_id) - except Etcd3Exception as e: - self._log_exception("remove stale key from dead master", e) - deleted = False - - if not deleted: - raise RestartElection() - def _become_master(self): """_become_master @@ -254,10 +221,11 @@ def _become_master(self): master. Any other error from etcd is not caught in this routine. """ + ok = False try: ttl_lease = etcdv3.get_lease(self._ttl) - self._master = etcdv3.put( + ok = etcdv3.put( self._key, self.id_string, lease=ttl_lease, mod_revision="0" ) except Exception as e: @@ -265,12 +233,13 @@ def _become_master(self): # of error means we should give up, and safer to have a broad # except here. Log and reconnect. self._log_exception("become master", e) - self._master = False + self._is_master.value = 0 - if not self._master: + if not ok: LOG.info("Race: someone else beat us to be master") raise RestartElection() + self._is_master.value = time.time() LOG.info( "Successfully become master - key %s, value %s", self._key, self.id_string ) @@ -294,6 +263,10 @@ def _become_master(self): ): LOG.warning("Key changed or deleted; restart election") raise RestartElection() + + # Successfully refreshed the role - let's update the + # timestamp. + self._is_master.value = time.time() LOG.debug("Refreshed master role, TTL now is %d", ttl) except RestartElection: raise @@ -310,7 +283,7 @@ def _become_master(self): eventlet.sleep(self._interval) finally: LOG.info("Exiting master refresh loop, no longer the master") - self._master = False + self._is_master.value = 0 raise RestartElection() def _write_old_key(self, lease): @@ -347,7 +320,7 @@ def id_string(self): return "%s:%d" % (self._server_id, os.getpid()) def _attempt_step_down(self): - self._master = False + self._is_master.value = 0 try: etcdv3.delete(self._key, existing_value=self.id_string) except Exception: @@ -355,13 +328,6 @@ def _attempt_step_down(self): # will expire anyway. LOG.exception("Failed to step down as master. Ignoring.") - def master(self): - """Am I the master? - - returns: True if this is the master. - """ - return self._master and not self._stopped - def stop(self): self._stopped = True if self._greenlet and not self._greenlet.dead: diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index 15c8b2569f3..71af57951c4 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -26,12 +26,13 @@ # It is implemented as a Neutron/ML2 mechanism driver. import contextlib import inspect +import multiprocessing import os import threading +import time import eventlet from eventlet.queue import PriorityQueue -from eventlet.semaphore import Semaphore from neutron import wsgi from neutron.agent import rpc as agent_rpc @@ -78,10 +79,16 @@ endpoint_name, ) from networking_calico.plugins.ml2.drivers.calico.policy import PolicySyncer -from networking_calico.plugins.ml2.drivers.calico.status import StatusWatcher +from networking_calico.plugins.ml2.drivers.calico.status import ( + AgentStatusWatcher, + EndpointStatusWatcher, +) from networking_calico.plugins.ml2.drivers.calico.subnets import SubnetSyncer from networking_calico.plugins.ml2.drivers.calico.workers import ( CalicoStartupResyncWorker, + CalicoManagerWorker, + CalicoAgentStatusWatcherWorker, + CalicoEndpointStatusWatcherWorker, ) from networking_calico.resync import scope as resync @@ -292,8 +299,6 @@ def __init__(self): {"port_filter": True, "mac_address": "00:61:fe:ed:ca:fe"}, ) qos_driver.register(self) - # Lock to prevent concurrent initialisation. - self._init_lock = Semaphore() # Generally initialize attributes to nil values. They get initialized # properly, as needed, in _post_fork_init(). self.db = None @@ -302,19 +307,11 @@ def __init__(self): self._etcd_watcher = None self._etcd_watcher_thread = None self._my_pid = None - self._epoch = 0 - # Mapping from (hostname, port-id) to Calico's status for a port. The - # hostname is included to disambiguate between multiple copies of a - # port, which may exist during a migration or a re-schedule. - self._port_status_cache = {} - # Queue used to fan out port status updates to worker threads. Notes: - # * we don't recreate the queue in _post_fork_init() so that we can't - # possibly lose updates that had already been queued. - # * the queue contains tuples (priority, ); we use a - # higher priority for events and a lower priority for snapshot - # keys, so that current data skips the queue. - self._port_status_queue = PriorityQueue() - self._port_status_queue_too_long = False + # Process-shared variable for checking if the current instance of + # neutron-server is elected as the leader or not. + # "d" = double. Used for storing time.time(). See + # https://docs.python.org/3/library/array.html#module-array + self._is_master = multiprocessing.Value("d", 0) # RPC client for fanning out agent state reports. self.state_report_rpc = None @@ -324,6 +321,10 @@ def __init__(self): # safe to compare this with other values returned by monotonic_time(). self._last_status_queue_log_time = monotonic_time() + # Flag for telling workers to stop. Only applicable to the + # calico worker processes. + self._stop_worker = False + LOG.info("Created Calico mechanism driver %s", self) def initialize(self): @@ -353,9 +354,16 @@ def get_workers(self): ``calico-resync`` from a CD pipeline, or by leaving ``always`` set on exactly one neutron-server in the deployment). """ - if cfg.CONF.calico.startup_resync == "never": - return [] - return [CalicoStartupResyncWorker()] + services = [ + CalicoManagerWorker(), + CalicoAgentStatusWatcherWorker(), + CalicoEndpointStatusWatcherWorker(), + ] + + if cfg.CONF.calico.startup_resync != "never": + services.append(CalicoStartupResyncWorker()) + + return services def post_fork_initialize(self, resource, event, trigger, payload=None): """Per-worker-process initialisation, fired by Neutron after fork. @@ -377,137 +385,154 @@ def post_fork_initialize(self, resource, event, trigger, payload=None): elector and master-only background threads. """ trigger_cls = _trigger_class(trigger) - if trigger_cls is CalicoStartupResyncWorker: - self._do_startup_resync() - elif trigger_cls is wsgi.WorkerService: - self._post_fork_init(voting=False) - else: - self._post_fork_init(voting=True) + self._post_fork_inititialize_common() - @logging_exceptions(LOG) - def _post_fork_init(self, voting=False): - """_post_fork_init + worker_mapping = { + CalicoManagerWorker: self._init_and_start_calico_manager, + CalicoStartupResyncWorker: self._init_and_start_calico_resouce_syncer, + CalicoAgentStatusWatcherWorker: self._init_and_start_agent_status_watcher, + CalicoEndpointStatusWatcherWorker: self._init_and_start_endpoint_status_watcher, + } - Creates the connection state required for talking to the Neutron DB - and to etcd. This is a no-op if it has been executed before. + if trigger_cls in worker_mapping: + self._stop_worker = False + worker_mapping[trigger_cls]() - This is split out from __init__ to allow us to defer this - initialisation until after Neutron has forked off its worker - children. If we initialise the DB and etcd connections before - the fork (as would happen in __init__()) then the workers - would share sockets incorrectly. + LOG.info( + "Calico mechanism driver initialisation done for class %s", + trigger_cls.__name__ if trigger_cls else trigger_cls, + ) + + def is_master(self): + """Check whether the current instance of neutron-server is the master. + + In order for a neutron-server to be considered as a master, it needs + to aquire the election key and actively maintain it. """ - # The self._init_lock semaphore mediates if two or more eventlet threads call - # _post_fork_init at the same time, within the same Neutron server fork. This - # shouldn't normally happen now that ``post_fork_initialize`` drives the call - # from a single AFTER_INIT event, but the lock is cheap insurance against future - # call sites. - with self._init_lock: - current_pid = os.getpid() - if self._my_pid == current_pid: - # We've initialised our PID and it hasn't changed since last - # time, nothing to do. - LOG.info("Calico state already initialised for PID %s", current_pid) - return - # else: either this is the first call or our PID has changed: - # (re)initialise. - TrackTask("POST_FORK_INIT") - - if self._my_pid is not None: - # This is unexpected but we can deal with it: Neutron should - # fork before we trigger the first call to _post_fork_init!(). - LOG.warning( - "PID changed from %s to %s; unexpected fork after " - "initialisation? Reinitialising Calico driver.", - self._my_pid, - current_pid, - ) - else: - LOG.info( - "Doing Calico mechanism driver initialisation in process %s", - current_pid, - ) + # We were not elected. We are not the master. + if self._is_master.value <= 0: + return False - # (Re)init the DB. - self.db = None - self._get_db() + # Else, let's check if we refresh the time within timeout. + time_till_last_refreshed = self._is_master.value - time.time() + refreshed_in_time = time_till_last_refreshed < MASTER_TIMEOUT - # Create syncers. - self.subnet_syncer = SubnetSyncer(self.db, self._txn_from_context) - self.policy_syncer = PolicySyncer(self.db, self._txn_from_context) - self.endpoint_syncer = WorkloadEndpointSyncer( - self.db, self._txn_from_context, self.policy_syncer + # If not, there is something wrong with elector!! + if not refreshed_in_time: + LOG.warning( + "The elector hasn't refreshed the lease in " + f"{time_till_last_refreshed}s." ) - # Admin context used by (only) the thread that updates Felix agent status. - self._agent_update_context = ctx.get_admin_context() + return refreshed_in_time - # Get RPC connection for fanning out Felix state reports. - try: - state_report_topic = topics.REPORTS - except AttributeError: - # Older versions of OpenStack share the PLUGIN topic. - state_report_topic = topics.PLUGIN - self.state_report_rpc = agent_rpc.PluginReportStateAPI(state_report_topic) - - if voting: - # Elector, for performing leader election. - self.elector = Elector( - cfg.CONF.calico.elector_name, - datamodel_v2.neutron_election_key( - calico_config.get_region_string() - ), - old_key=datamodel_v1.NEUTRON_ELECTION_KEY, - interval=MASTER_REFRESH_INTERVAL, - ttl=MASTER_TIMEOUT, - ) - LOG.info( - "PID %s: Initializing Calico Elector; " - "this process WILL participate in leader election.", - current_pid, - ) - # Start our long-running threads. Just in case we ever get two same - # threads running, use an epoch counter to tell the old thread to die. - # We deliberately do this last, to ensure that all of the setup above is - # complete before we start running. - self._epoch += 1 - if cfg.CONF.calico.etcd_compaction_period_mins > 0: - eventlet.spawn(self.periodic_compaction_thread, self._epoch) - eventlet.spawn(self._status_updating_thread, self._epoch) - for _ in range(cfg.CONF.calico.num_port_status_threads): - eventlet.spawn(self._loop_writing_port_statuses, self._epoch) - # Note, resync runs in a dedicated worker process spawned via - # ``get_workers``, not here. See ``post_fork_initialize``. + def _post_fork_inititialize_common(self): + """Common post fork initialization. + + Creates the connection state required for talking to the Neutron DB + and to etcd. + """ + # Init the DB. + self.db = None + self._get_db() + + # Create a Keystone client. + authcfg = cfg.CONF.keystone_authtoken + LOG.debug("authcfg = %r", authcfg) + for key in authcfg: + if "password" in key: + LOG.debug("authcfg[%s] = %s", key, "***") else: - LOG.info( - "PID %s: Not a voting participant; " - "skipping elector and leader threads.", - current_pid, - ) + LOG.debug("authcfg[%s] = %s", key, authcfg[key]) - self._my_pid = current_pid + # Create syncers. + self.subnet_syncer = SubnetSyncer(self.db, self._txn_from_context) + self.policy_syncer = PolicySyncer(self.db, self._txn_from_context) + self.endpoint_syncer = WorkloadEndpointSyncer( + self.db, self._txn_from_context, self.policy_syncer + ) - LOG.info( - "Calico mechanism driver initialisation done in process %s", current_pid + # Admin context used by (only) the thread that updates Felix agent + # status. + self._agent_update_context = ctx.get_admin_context() + + # Get RPC connection for fanning out Felix state reports. + try: + state_report_topic = topics.REPORTS + except AttributeError: + # Older versions of OpenStack share the PLUGIN topic. + state_report_topic = topics.PLUGIN + self.state_report_rpc = agent_rpc.PluginReportStateAPI(state_report_topic) + + def _init_and_start_calico_resouce_syncer(self): + self.start_up_resync_thread = eventlet.spawn(self._do_startup_resync) + + def _init_and_start_calico_manager(self): + self.elector = Elector( + cfg.CONF.calico.elector_name, + datamodel_v2.neutron_election_key(calico_config.get_region_string()), + self._is_master, + old_key=datamodel_v1.NEUTRON_ELECTION_KEY, + interval=MASTER_REFRESH_INTERVAL, + ttl=MASTER_TIMEOUT, + ) + + self.election_thread = eventlet.spawn(self.elector.run) + if cfg.CONF.calico.etcd_compaction_period_mins > 0: + self.periodic_compaction_thread = eventlet.spawn( + self.do_periodic_compaction + ) + + def _init_and_start_agent_status_watcher(self): + self.agent_status_watch_thread = eventlet.spawn( + self.watch_status_updates, AgentStatusWatcher + ) + + def _init_and_start_endpoint_status_watcher(self): + # Mapping from (hostname, port-id) to Calico's status for a port. The + # hostname is included to disambiguate between multiple copies of a + # port, which may exist during a migration or a re-schedule. + self._port_status_cache = {} + # Queue used to fan out port status updates to worker threads. Notes: + # * the queue contains tuples (priority, ); we use a + # higher priority for events and a lower priority for snapshot + # keys, so that current data skips the queue. + self._port_status_queue = PriorityQueue() + self._port_status_queue_too_long = False + + self.endpoint_status_watch_thread = eventlet.spawn( + self.watch_status_updates, EndpointStatusWatcher + ) + + self.port_status_update_threads = [] + for _ in range(cfg.CONF.calico.num_port_status_threads): + self.port_status_update_threads.append( + eventlet.spawn(self._loop_writing_port_statuses) ) @logging_exceptions(LOG) - def _status_updating_thread(self, expected_epoch): - """_status_updating_thread + def watch_status_updates(self, watcher): + """watch_status_updates This method acts as a status updates handler logic for the Calico mechanism driver. Watches for felix updates in etcd and passes info to Neutron database. + + :param watcher: Watcher class to created to watch and update status. """ TrackTask("STATUS_UPDATING") - LOG.info("Status updating thread started.") - while self._epoch == expected_epoch: + LOG.info("Status updating thread started for %s.", watcher.__name__) + + while not self._stop_worker: # Only handle updates if we are the master node. - if self.elector.master(): + if self.is_master(): if self._etcd_watcher is None: - LOG.info("Became the master, starting StatusWatcher") - self._etcd_watcher = StatusWatcher(self) + LOG.info( + "Became the master, starting %s", + watcher.__name__, + ) + self._etcd_watcher = watcher(self) def start_etcd_watcher(): TrackTask("STATUS_ETCD_WATCHER") @@ -520,21 +545,24 @@ def start_etcd_watcher(): self._etcd_watcher_thread, ) elif not self._etcd_watcher_thread: - LOG.error("StatusWatcher %s died", self._etcd_watcher) + LOG.error( + "StatusWatcher %s died: %s", + self._etcd_watcher, + watcher.__name__, + ) self._etcd_watcher.stop() self._etcd_watcher = None else: if self._etcd_watcher is not None: - LOG.warning("No longer master, stopping StatusWatcher") + LOG.warning( + "No longer master, stopping StatusWatcher: %s.", + watcher.__name__, + ) self._etcd_watcher.stop() self._etcd_watcher = None # Short sleep interval before we check if we've become # the master. eventlet.sleep(MASTER_CHECK_INTERVAL_SECS) - else: - LOG.warning( - "Unexpected: epoch changed. Handling status updates thread exiting." - ) def on_felix_alive(self, felix_hostname, new): LOG.info("Felix on host %s is alive; fanning out status report", felix_hostname) @@ -642,11 +670,11 @@ def on_port_status_changed(self, hostname, port_id, status_dict, priority="low") LOG.warning("Port status update queue back to normal: %s", qsize) @logging_exceptions(LOG) - def _loop_writing_port_statuses(self, expected_epoch): + def _loop_writing_port_statuses(self): TrackTask("PORT_STATUS_WRITE") - LOG.info("Port status write thread started epoch=%s", expected_epoch) + LOG.info("Port status write thread started") admin_context = ctx.get_admin_context() - while self._epoch == expected_epoch: + while not self._stop_worker: # Wait for work to do. _, port_status_key = self._port_status_queue.get() # Actually do the update. Catch all exceptions to avoid @@ -1253,7 +1281,7 @@ def _do_startup_resync(self): result.to_dict(), ) - def periodic_compaction_thread(self, launch_epoch): + def do_periodic_compaction(self): """Periodic etcd compaction logic. On a fixed interval, requests etcd compaction to prevent unbounded disk usage @@ -1262,9 +1290,9 @@ def periodic_compaction_thread(self, launch_epoch): TrackTask("COMPACTION") try: LOG.info("Periodic compaction thread started") - while self._epoch == launch_epoch: + while not self._stop_worker: # Only do the compaction if we are the master node. - if self.elector.master(): + if self.is_master(): LOG.info("I am master: doing periodic compaction") try: diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py index bdcd88a4c6e..dca6f9712fb 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/status.py @@ -63,8 +63,8 @@ class StatusWatcher(etcdutils.EtcdWatcher): def __init__(self, calico_driver): self.region_string = calico_config.get_region_string() - status_path = datamodel_v2.felix_status_dir(self.region_string) - super(StatusWatcher, self).__init__(status_path, "/round-trip-check") + self.status_path = datamodel_v2.felix_status_dir(self.region_string) + super(StatusWatcher, self).__init__(self.status_path, "/round-trip-check") self.calico_driver = calico_driver self.processing_snapshot = False @@ -79,21 +79,6 @@ def __init__(self, calico_driver): # deduplicate before passing on to the Neutron DB. self._felix_live_rev = {} - # Register for felix uptime updates. - self.register_path( - status_path + "//status", - on_set=self._on_status_set, - on_del=self._on_status_del, - ) - # Register for per-port status updates. - self.register_path( - status_path - + "//workload/openstack//endpoint/", - on_set=self._on_ep_set, - on_del=self._on_ep_delete, - ) - LOG.info("StatusWatcher created") - def _pre_snapshot_hook(self): # Save off current endpoint status, then reset current state, so we # will be able to identify any changes in the new snapshot. @@ -119,6 +104,19 @@ def _post_snapshot_hook(self, old_endpoints_by_host): ) self.processing_snapshot = False + +class AgentStatusWatcher(StatusWatcher): + + def __init__(self, calico_driver): + super().__init__(calico_driver) + + # Register for felix uptime updates. + self.register_path( + self.status_path + "//status", + on_set=self._on_status_set, + on_del=self._on_status_del, + ) + def _on_status_set(self, response, hostname): """Called when a felix uptime report is inserted/updated.""" try: @@ -148,6 +146,19 @@ def _on_status_del(self, response, hostname): # agent failures by timeout. LOG.error("Felix on host %s failed to check in.", hostname) + +class EndpointStatusWatcher(StatusWatcher): + + def __init__(self, calico_driver): + super().__init__(calico_driver) + + self.register_path( + self.status_path + + "//workload/openstack//endpoint/", + on_set=self._on_ep_set, + on_del=self._on_ep_delete, + ) + def _on_ep_set(self, response, hostname, workload, endpoint): """Called when the status key for a particular endpoint is updated. diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py index 721f1cc769c..85a42aa00b8 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py @@ -215,6 +215,9 @@ class GrandDukeOfSalzburg(object): def __init__(self, *args, **kwargs): pass + def run(self): + pass + def master(self): return True @@ -349,6 +352,8 @@ def setUp(self): # Create an instance of CalicoMechanismDriver. self.driver = mech_calico.CalicoMechanismDriver() + self.driver.is_master = mock.Mock() + self.driver.is_master.return_value = True # Hook the (mock) Neutron database. self.db = mech_calico.plugin_dir.get_plugin() @@ -589,7 +594,7 @@ def do_post_fork_actions(self, uuid_str=None): """ cm = FixedUUID(uuid_str) if uuid_str else contextlib.nullcontext() with cm: - self.driver._post_fork_init(voting=True) + self.driver._post_fork_inititialize_common() if mech_calico.cfg.CONF.calico.startup_resync == "always": self.driver._do_startup_resync() diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py index 34762121575..fe55e37d8af 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py @@ -49,6 +49,8 @@ def setUp(self): # Mock calls to sys.exit. self.sys_exit_p = mock.patch("sys.exit") self.sys_exit_p.start() + # Mock is_master variable + self.mock_is_master = mock.MagicMock() def tearDown(self): self.sys_exit_p.stop() @@ -61,14 +63,26 @@ def test_invalid(self): # Test that not elected using defaults. with self.assertRaises(ValueError): etcdv3._client = stub_etcd.Client() - elector = election.Elector("test_basic", "/bloop", interval=-1, ttl=15) - self.assertFalse(elector.master()) + elector = election.Elector( + "test_basic", + "/bloop", + self.mock_is_master, + interval=-1, + ttl=15, + ) + elector.run() self._wait_and_stop(etcdv3._client, elector) with self.assertRaises(ValueError): etcdv3._client = stub_etcd.Client() - elector = election.Elector("test_basic", "/bloop", interval=10, ttl=5) - self.assertFalse(elector.master()) + elector = election.Elector( + "test_basic", + "/bloop", + self.mock_is_master, + interval=10, + ttl=5, + ) + elector.run() self._wait_and_stop(etcdv3._client, elector) def _wait_and_stop(self, client, elector): @@ -87,15 +101,23 @@ def _wait_and_stop(self, client, elector): client.stop.send() # Double-check there were no failures. self.assertIsNone(client.failure, msg=client.failure) + # Make sure the value is set back to 0 + self.assertEqual(self.mock_is_master.value, 0) def test_basic_election(self): # Test that not elected using defaults. LOG.debug("test_basic_election") etcdv3._client = client = stub_etcd.Client() client.add_read_result(key="/bloop", value="value") - elector = election.Elector("test_basic", "/bloop", interval=5, ttl=15) + elector = election.Elector( + "test_basic", + "/bloop", + self.mock_is_master, + interval=5, + ttl=15, + ) + elector.run() self._wait_and_stop(client, elector) - self.assertFalse(elector.master()) def test_become_master_first_time(self): # Become the master after one round @@ -106,8 +128,14 @@ def test_become_master_first_time(self): client.add_write_exception(None) client.add_write_exception(None) elector = election.Elector( - "test_basic", "/bloop", old_key="/legacy", interval=5, ttl=15 + "test_basic", + "/bloop", + self.mock_is_master, + old_key="/legacy", + interval=5, + ttl=15, ) + elector.run() self._wait_and_stop(client, elector) client.assert_key_written("/legacy") @@ -118,7 +146,14 @@ def test_fail_to_maintain(self): client.add_read_exception(etcdv3.KeyNotFound()) client.add_write_exception(None) client.add_write_exception(e3e.ConnectionFailedError()) - elector = election.Elector("test_basic", "/bloop", interval=5, ttl=15) + elector = election.Elector( + "test_basic", + "/bloop", + self.mock_is_master, + interval=5, + ttl=15, + ) + elector.run() self._wait_and_stop(client, elector) def test_become_master_multiple_attempts(self): @@ -132,7 +167,14 @@ def test_become_master_multiple_attempts(self): client.add_read_result(key="/bloop", value=None, action=action) client.add_write_exception(None) client.add_write_exception(None) - elector = election.Elector("test_basic", "/bloop", interval=5, ttl=15) + elector = election.Elector( + "test_basic", + "/bloop", + self.mock_is_master, + interval=5, + ttl=15, + ) + elector.run() self._wait_and_stop(client, elector) def test_become_master_implausible(self): @@ -144,7 +186,14 @@ def test_become_master_implausible(self): client.add_read_exception(etcdv3.KeyNotFound()) client.add_write_result() client.add_write_result() - elector = election.Elector("test_basic", "/bloop", interval=5, ttl=15) + elector = election.Elector( + "test_basic", + "/bloop", + self.mock_is_master, + interval=5, + ttl=15, + ) + elector.run() self._wait_and_stop(client, elector) def test_initial_read_exceptions(self): @@ -155,7 +204,14 @@ def test_initial_read_exceptions(self): client.add_read_exception(e3e.InternalServerError()) client.add_read_exception(e3e.ConnectionFailedError()) client.add_read_exception(e3e.PreconditionFailedError()) - elector = election.Elector("test_basic", "/bloop", interval=5, ttl=15) + elector = election.Elector( + "test_basic", + "/bloop", + self.mock_is_master, + interval=5, + ttl=15, + ) + elector.run() self._wait_and_stop(client, elector) def test_exception_detail_logging(self): @@ -165,7 +221,14 @@ def test_exception_detail_logging(self): etcdv3._client = client = stub_etcd.Client() exc = e3e.Etcd3Exception(detail_text="Unauthorised user") client.add_read_exception(exc) - elector = election.Elector("test_basic", "/bloop", interval=5, ttl=15) + elector = election.Elector( + "test_basic", + "/bloop", + self.mock_is_master, + interval=5, + ttl=15, + ) + elector.run() self._wait_and_stop(client, elector) # Check that Etcd3Exception detail was logged. @@ -189,7 +252,14 @@ def test_later_exceptions(self): client.add_read_exception(e3e.ConnectionFailedError()) client.add_read_result(key="/bloop", value="value") client.add_read_exception(e3e.PreconditionFailedError()) - elector = election.Elector("test_basic", "/bloop", interval=5, ttl=15) + elector = election.Elector( + "test_basic", + "/bloop", + self.mock_is_master, + interval=5, + ttl=15, + ) + elector.run() self._wait_and_stop(client, elector) def test_master_failure(self): @@ -208,62 +278,12 @@ def test_master_failure(self): client.add_read_result(key="/bloop", value=None, action="delete") client.add_write_exception(None) client.add_write_exception(None) - elector = election.Elector("test_basic", "/bloop", interval=5, ttl=15) - self._wait_and_stop(client, elector) - - # We are no longer the master, after being told to stop. - self.assertFalse(elector.master()) - - @mock.patch("os.path.exists") - def test_check_master_process_died(self, m_exists): - m_exists.return_value = False - etcdv3._client = client = mock.Mock() - elector = election.Elector("server-id", "/bloop", interval=5, ttl=15) - # etcd3 transaction returns False because the key is no longer there. - client.transaction.return_value = {} - self.assertRaises( - election.RestartElection, elector._check_master_process, "server-id:1234" - ) - self.assertEqual( - [ - mock.call( - { - "compare": [ - { - "value": "c2VydmVyLWlkOjEyMzQ=", - "result": "EQUAL", - "key": "L2Jsb29w", - "target": "VALUE", - } - ], - "success": [{"request_delete_range": {"key": "L2Jsb29w"}}], - "failure": [], - } - ) - ], - client.transaction.mock_calls, + elector = election.Elector( + "test_basic", + "/bloop", + self.mock_is_master, + interval=5, + ttl=15, ) - client.failure = None - self._wait_and_stop(client, elector) - - @mock.patch("os.path.exists") - def test_check_master_process_other_server(self, m_exists): - m_exists.return_value = False - etcdv3._client = client = mock.Mock() - elector = election.Elector("server-id", "/bloop", interval=5, ttl=15) - elector._check_master_process("other-server:1234") - self.assertEqual([], client.delete.mock_calls) - self.assertEqual([], client.transaction.mock_calls) - client.failure = None - self._wait_and_stop(client, elector) - - @mock.patch("os.path.exists") - def test_check_master_process_still_alive(self, m_exists): - m_exists.return_value = True - etcdv3._client = client = mock.Mock() - elector = election.Elector("server-id", "/bloop", interval=5, ttl=15) - elector._check_master_process("server-id:1234") - self.assertEqual([], client.delete.mock_calls) - self.assertEqual([], client.transaction.mock_calls) - client.failure = None + elector.run() self._wait_and_stop(client, elector) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py index 77f7126847d..4623e04fed1 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py @@ -41,8 +41,9 @@ def setUp(self): lib.m_oslo_config.cfg.CONF.keystone_authtoken.auth_url = "" lib.m_oslo_config.cfg.CONF.calico.openstack_region = "no-region" - lib.m_oslo_config.cfg.CONF.calico.etcd_compaction_period_mins = 0 + lib.m_oslo_config.cfg.CONF.calico.etcd_compaction_period_mins = 10 lib.m_oslo_config.cfg.CONF.calico.project_name_cache_max = 0 + lib.m_oslo_config.cfg.CONF.calico.num_port_status_threads = 4 # Mock etcd3gw client so background threads don't touch real etcd. etcdv3._client = self.clientv3 = mock.Mock() @@ -59,100 +60,63 @@ def tearDown(self): super(TestMechanismDriverVoting, self).tearDown() - def _disable_background_threads(self, driver): - """Disable background threads that would touch etcd or do unrelated things.""" - driver._do_startup_resync = mock.Mock() - driver._status_updating_thread = mock.Mock() - - @mock.patch.object(mech_calico, "Elector") - def test_parent_creates_elector(self, m_elector): + def test_driver_init_common(self): driver = mech_calico.CalicoMechanismDriver() - self._disable_background_threads(driver) + driver._post_fork_inititialize_common() - driver._my_pid = None - driver._post_fork_init(voting=True) + self.assertIsNotNone(driver.db) + self.assertIsNotNone(driver.subnet_syncer) + self.assertIsNotNone(driver.policy_syncer) + self.assertIsNotNone(driver.endpoint_syncer) + self.assertIsNotNone(driver._agent_update_context) + self.assertIsNotNone(driver.state_report_rpc) - m_elector.assert_called_once() - self.assertIs(driver.elector, m_elector.return_value) + @mock.patch("eventlet.spawn") + def test_driver_init_calico_resource_syncer(self, m_spawn): + m_spawn.return_value = True - @mock.patch.object(mech_calico, "Elector") - def test_worker_does_not_create_elector(self, m_elector): driver = mech_calico.CalicoMechanismDriver() - self._disable_background_threads(driver) - - driver._my_pid = None - driver._post_fork_init(voting=False) + driver._init_and_start_calico_resouce_syncer() - m_elector.assert_not_called() - self.assertIsNone(driver.elector) + self.assertTrue(driver.start_up_resync_thread) - @mock.patch.object(mech_calico, "_trigger_class") + @mock.patch("eventlet.spawn") @mock.patch.object(mech_calico, "Elector") - def test_api_worker_does_not_become_voter(self, m_elector, m_trigger_class): - """API forks are triggered by ``neutron.wsgi.WorkerService`` at - AFTER_INIT. ``post_fork_initialize`` must dispatch them to - ``_post_fork_init(voting=False)`` so they never join the master - election -- preserving PR #11580's intent after the periodic-resync - rework moved init out of the old ``@requires_state`` decorator.""" - driver = mech_calico.CalicoMechanismDriver() - self._disable_background_threads(driver) - driver._my_pid = None - - m_trigger_class.return_value = mech_calico.wsgi.WorkerService - driver.post_fork_initialize(mock.Mock(), mock.Mock(), mock.Mock()) - - m_elector.assert_not_called() - self.assertIsNone(driver.elector) + def test_driver_init_calico_manager(self, m_elector, m_spawn): + m_spawn.return_value = True - @mock.patch.object(mech_calico, "_trigger_class") - @mock.patch.object(mech_calico, "Elector") - def test_resync_worker_runs_resync_only(self, m_elector, m_trigger_class): - """``CalicoStartupResyncWorker`` triggers run only the one-shot - resync; they don't join the elector or set up the master-only - background threads.""" driver = mech_calico.CalicoMechanismDriver() - self._disable_background_threads(driver) - driver._my_pid = None - - m_trigger_class.return_value = mech_calico.CalicoStartupResyncWorker - driver.post_fork_initialize(mock.Mock(), mock.Mock(), mock.Mock()) + driver._init_and_start_calico_manager() - driver._do_startup_resync.assert_called_once() - m_elector.assert_not_called() - self.assertIsNone(driver.elector) - - @mock.patch.object(mech_calico, "_trigger_class") - @mock.patch.object(mech_calico, "Elector") - def test_other_worker_becomes_voter(self, m_elector, m_trigger_class): - """Workers that aren't API workers or the resync worker (RPC, - state-report, etc.) get ``voting=True`` and join the master - election.""" - driver = mech_calico.CalicoMechanismDriver() - self._disable_background_threads(driver) - driver._my_pid = None - - # An arbitrary non-WSGI, non-resync trigger class. - class _RpcWorker: - pass - - m_trigger_class.return_value = _RpcWorker - driver.post_fork_initialize(mock.Mock(), mock.Mock(), mock.Mock()) + self.assertTrue(driver.election_thread) + self.assertTrue(driver.periodic_compaction_thread) m_elector.assert_called_once() - self.assertIs(driver.elector, m_elector.return_value) - @mock.patch.object(mech_calico, "Elector") - def test_worker_init_does_not_override_parent_elector(self, m_elector): + @mock.patch("eventlet.spawn") + def test_driver_init_calico_agent_status_watcher(self, m_spawn): + m_spawn.return_value = True + driver = mech_calico.CalicoMechanismDriver() - self._disable_background_threads(driver) + driver._init_and_start_agent_status_watcher() - driver._my_pid = None - driver._post_fork_init(voting=True) - parent_elector = driver.elector + self.assertTrue(driver.agent_status_watch_thread) - # Simulate a worker re-initializing in the same process object - driver._my_pid = 99999 - driver._post_fork_init(voting=False) + @mock.patch("eventlet.spawn") + def test_driver_init_calico_endpoint_status_watcher(self, m_spawn): + m_spawn.return_value = True - self.assertIs(driver.elector, parent_elector) - m_elector.assert_called_once() + driver = mech_calico.CalicoMechanismDriver() + driver._init_and_start_endpoint_status_watcher() + + self.assertTrue(driver.endpoint_status_watch_thread) + self.assertEqual( + len(driver.port_status_update_threads), + lib.m_oslo_config.cfg.CONF.calico.num_port_status_threads, + ) + + # We will also need to ensure that the required queues components + # are also created. + self.assertIsNotNone(driver._port_status_cache) + self.assertIsNotNone(driver._port_status_queue) + self.assertIsNotNone(driver._port_status_queue_too_long) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py index 2339836d387..246f5a87ec2 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py @@ -345,8 +345,8 @@ def setUp(self): # written before that was added and they do not support the interleaved # requests from the status thread. The status-reporting thread is # tested separately. - self.driver._status_updating_thread = mock.Mock( - spec=self.driver._status_updating_thread + self.driver.watch_status_updates = mock.Mock( + spec=self.driver.watch_status_updates ) # Mock out config. @@ -437,6 +437,12 @@ def setUp(self): self.sg_default_key_v3: self.sg_default_value_v3, } + self.driver._post_fork_inititialize_common() + self.driver._init_and_start_calico_resouce_syncer() + self.driver._init_and_start_agent_status_watcher() + self.driver._init_and_start_calico_manager() + self.driver._init_and_start_endpoint_status_watcher() + def make_context(self): context = mock.MagicMock() context._plugin_context.to_dict.return_value = {} @@ -2011,6 +2017,7 @@ def _pre_migrate(self, dest_host=None): def test_pre_live_migration(self): """Pre-live-migration creates destination WEP and LiveMigration.""" + self.driver._post_fork_inititialize_common() self._do_initial_resync() self._pre_migrate() @@ -2030,6 +2037,7 @@ def test_pre_live_migration(self): def test_live_migration_succeeded(self): """After migration succeeds, source WEP deleted, dest WEP kept.""" + self.driver._post_fork_inititialize_common() self._do_initial_resync() self._pre_migrate() @@ -2063,6 +2071,7 @@ def test_live_migration_succeeded(self): def test_live_migration_failed(self): """After migration fails, dest WEP deleted, source WEP unchanged.""" + self.driver._post_fork_inititialize_common() self._do_initial_resync() self._pre_migrate() @@ -2094,6 +2103,7 @@ def test_live_migration_failed(self): def test_port_delete_during_migration(self): """Deleting port during migration cleans up both WEPs and LM.""" + self.driver._post_fork_inititialize_common() self._do_initial_resync() self._pre_migrate() @@ -2159,8 +2169,10 @@ def test_vif_plug_notification(self): mock_db_port ) - def test_vif_plug_no_notification_for_non_migration(self): + @mock.patch("eventlet.spawn") + def test_vif_plug_no_notification_for_non_migration(self, _m_spawn): """Felix 'up' on source host does NOT trigger Nova notification.""" + self.driver._init_and_start_endpoint_status_watcher() self._do_initial_resync() self.recent_writes = {} self.recent_deletes = set() @@ -2173,6 +2185,7 @@ def test_vif_plug_no_notification_for_non_migration(self): def test_resync_creates_missing_live_migration(self): """Resync creates LiveMigration and dest WEP for migrating port.""" + self.driver._post_fork_inititialize_common() self._do_initial_resync() # Set up the port as mid-migration in the Neutron DB (migrating_to @@ -2189,6 +2202,7 @@ def test_resync_creates_missing_live_migration(self): def test_resync_deletes_stale_live_migration(self): """Resync deletes orphaned LiveMigration with no migrating port.""" + self.driver._post_fork_inititialize_common() self._do_initial_resync() # Inject a stale LiveMigration into etcd as if a migration was in @@ -2483,38 +2497,72 @@ def test_felix_agent_state(self): mech_calico.felix_agent_state("host2", False), ) - def test_status_thread_epoch(self): - self.driver._epoch = 2 - self.driver._status_updating_thread(1) - @mock.patch( - "networking_calico.plugins.ml2.drivers.calico.mech_calico.StatusWatcher", + "networking_calico.plugins.ml2.drivers.calico.status.AgentStatusWatcher", autospec=True, ) - def test_status_thread_mainline(self, m_StatusWatcher): + def test_agent_status_thread_mainline(self, m_watcher): count = [0] + m_watcher.__name__ = "AgentStatusWatcher" + self.driver.is_master = mock.Mock() + self.driver.is_master.return_value = True + + def maybe_end_loop(*args, **kwargs): + if count[0] == 2: + # Thread dies, should be restarted. + self.driver._etcd_watcher_thread = False + if count[0] == 4: + # After a few loops, stop being the master... + self.driver.is_master.return_value = False + if count[0] > 6: + # Then terminate the loop after a few more... + self.driver._stop_worker = True + count[0] += 1 + + with mock.patch("eventlet.spawn") as m_spawn: + with mock.patch("eventlet.sleep") as m_sleep: + m_sleep.side_effect = maybe_end_loop + self.driver.watch_status_updates(m_watcher) + + m_watcher = m_watcher.return_value + self.assertEqual( + [ + mock.call(mock.ANY), + mock.call(mock.ANY), + ], + [c for c in m_spawn.mock_calls if c[0] == ""], + ) + self.assertEqual(2, len(m_watcher.stop.mock_calls)) + self.assertIsNone(self.driver._etcd_watcher) - with mock.patch.object(self.driver, "elector") as m_elector: - m_elector.master.return_value = True - - def maybe_end_loop(*args, **kwargs): - if count[0] == 2: - # Thread dies, should be restarted. - self.driver._etcd_watcher_thread = False - if count[0] == 4: - # After a few loops, stop being the master... - m_elector.master.return_value = False - if count[0] > 6: - # Then terminate the loop after a few more... - self.driver._epoch += 1 - count[0] += 1 - - with mock.patch("eventlet.spawn") as m_spawn: - with mock.patch("eventlet.sleep") as m_sleep: - m_sleep.side_effect = maybe_end_loop - self.driver._status_updating_thread(0) - - m_watcher = m_StatusWatcher.return_value + @mock.patch( + "networking_calico.plugins.ml2.drivers.calico.status.StatusWatcher", + autospec=True, + ) + def test_endpoint_status_thread_mainline(self, m_watcher): + count = [0] + m_watcher.__name__ = "StatusWatcher" + self.driver.is_master = mock.Mock() + self.driver.is_master.return_value = True + + def maybe_end_loop(*args, **kwargs): + if count[0] == 2: + # Thread dies, should be restarted. + self.driver._etcd_watcher_thread = False + if count[0] == 4: + # After a few loops, stop being the master... + self.driver.is_master.return_value = False + if count[0] > 6: + # Then terminate the loop after a few more... + self.driver._stop_worker = True + count[0] += 1 + + with mock.patch("eventlet.spawn") as m_spawn: + with mock.patch("eventlet.sleep") as m_sleep: + m_sleep.side_effect = maybe_end_loop + self.driver.watch_status_updates(m_watcher) + + m_watcher = m_watcher.return_value self.assertEqual( [ mock.call(mock.ANY), @@ -2547,7 +2595,9 @@ def test_on_felix_alive(self): m_rpc.report_state.mock_calls, ) - def test_on_port_status_changed(self): + @mock.patch("eventlet.spawn") + def test_on_port_status_changed(self, _m_spawn): + self.driver._init_and_start_endpoint_status_watcher() self.driver._last_status_queue_log_time = monotonic_time() - 100 with mock.patch.object(self.driver, "_port_status_queue") as m_queue: m_queue.qsize.return_value = 100 @@ -2600,7 +2650,10 @@ def test_on_port_status_changed(self): m_queue.put.mock_calls, ) - def test_loop_writing_port_statuses(self): + @mock.patch("eventlet.spawn") + def test_loop_writing_port_statuses(self, _m_spawn): + self.driver._init_and_start_endpoint_status_watcher() + with mock.patch.object(self.driver, "_port_status_queue") as m_queue: with mock.patch.object( self.driver, "_try_to_update_port_status" @@ -2609,7 +2662,6 @@ def test_loop_writing_port_statuses(self): self.assertRaises( StopIteration, self.driver._loop_writing_port_statuses, - self.driver._epoch, ) self.assertEqual( [ @@ -2618,8 +2670,10 @@ def test_loop_writing_port_statuses(self): m_try_upd.mock_calls, ) - def test_try_to_update_port_status(self): + @mock.patch("eventlet.spawn") + def test_try_to_update_port_status(self, _m_spawn): self.driver._get_db() + self.driver._init_and_start_endpoint_status_watcher() mock_calls = [] @@ -2640,8 +2694,10 @@ def m_update_port_status(context, port_id, status, host=None): ) self.assertEqual([], m_spawn.mock_calls) # No retry on success - def test_try_to_update_port_status_fail(self): + @mock.patch("eventlet.spawn") + def test_try_to_update_port_status_fail(self, _m_spawn): self.driver._get_db() + self.driver._init_and_start_endpoint_status_watcher() mock_calls = [] @@ -2695,7 +2751,8 @@ def setUp(self): super(TestStatusWatcherBase, self).setUp() self.driver = mock.Mock(spec=mech_calico.CalicoMechanismDriver) - self.watcher = status.StatusWatcher(self.driver) + self.agent_watcher = status.AgentStatusWatcher(self.driver) + self.endpoint_watcher = status.EndpointStatusWatcher(self.driver) def _add_test_endpoint(self): # Add a workload to be deleted @@ -2705,9 +2762,11 @@ def _add_test_endpoint(self): + "openstack/wlid/endpoint/ep1" ) % self.region_string m_port_status_node.value = '{"status": "up"}' - self.watcher._on_ep_set(m_port_status_node, "hostname", "wlid", "ep1") + self.endpoint_watcher._on_ep_set(m_port_status_node, "hostname", "wlid", "ep1") ep_id = datamodel_v1.WloadEndpointId("hostname", "openstack", "wlid", "ep1") - self.assertEqual({"hostname": set([ep_id])}, self.watcher._endpoints_by_host) + self.assertEqual( + {"hostname": set([ep_id])}, self.endpoint_watcher._endpoints_by_host + ) return m_port_status_node @@ -2717,16 +2776,55 @@ def test_tls(self): lib.m_oslo_config.cfg.CONF.calico.etcd_cert_file = "cert-file" lib.m_oslo_config.cfg.CONF.calico.etcd_ca_cert_file = "ca-cert-file" lib.m_oslo_config.cfg.CONF.calico.etcd_key_file = "key-file" - self.watcher = status.StatusWatcher(self.driver) + _watcher = status.StatusWatcher(self.driver) @mock.patch("eventlet.spawn") - def test_snapshot(self, m_spawn): - # Populate initial status tree data, for initial snapshot testing. - + def test_snapshot_agent(self, _m_spawn): felix_status_key = "/calico/felix/v2/no-region/host/hostname/status" felix_last_reported_status_key = ( "/calico/felix/v2/no-region/host/hostname/last_reported_status" ) + + self.etcd_data = { + # An agent status key to ignore. + felix_last_reported_status_key: json.dumps( + {"uptime": 10, "first_update": True} + ), + # An agent status key to take notice of. + felix_status_key: json.dumps({"uptime": 10, "first_update": True}), + } + + watch_events = [] + + def _iterator(): + for e in watch_events: + yield e + _log.info("Stop watcher now") + self.agent_watcher.stop() + yield None + + def _cancel(): + pass + + self.clientv3.watch_prefix.return_value = _iterator(), _cancel + + # Start the watcher. It will do initial snapshot processing, then stop + # when it tries to watch for further changes. + self.agent_watcher.start() + + self.driver.on_felix_alive.assert_called_once_with("hostname", new=True) + + # Start the watcher again, with the same etcd data. We should not see the + # felix alive gets send again, as we already updated. + self.driver.on_felix_alive.reset_mock() + self.clientv3.watch_prefix.return_value = _iterator(), _cancel + self.agent_watcher.start() + self.driver.on_felix_alive.assert_not_called() + + @mock.patch("eventlet.spawn") + def test_snapshot_endpoint(self, _m_spawn): + # Populate initial status tree data, for initial snapshot testing. + ep_on_that_host_key = ( "/calico/felix/v2/no-region/host/hostname/workload/" + "openstack/wlid/endpoint/ep1" @@ -2737,12 +2835,6 @@ def test_snapshot(self, m_spawn): ) self.etcd_data = { - # An agent status key to ignore. - felix_last_reported_status_key: json.dumps( - {"uptime": 10, "first_update": True} - ), - # An agent status key to take notice of. - felix_status_key: json.dumps({"uptime": 10, "first_update": True}), # A port status key to take notice of. ep_on_that_host_key: '{"status": "up"}', # A port status key to ignore. @@ -2755,7 +2847,7 @@ def _iterator(): for e in watch_events: yield e _log.info("Stop watcher now") - self.watcher.stop() + self.endpoint_watcher.stop() yield None def _cancel(): @@ -2765,9 +2857,8 @@ def _cancel(): # Start the watcher. It will do initial snapshot processing, then stop # when it tries to watch for further changes. - self.watcher.start() + self.endpoint_watcher.start() - self.driver.on_felix_alive.assert_called_once_with("hostname", new=True) self.driver.on_port_status_changed.assert_has_calls( [ mock.call("unknown", "ep2", {"status": "up"}, priority="low"), @@ -2778,11 +2869,9 @@ def _cancel(): # Start the watcher again, with the same etcd data. We should see the # same status callbacks. - self.driver.on_felix_alive.reset_mock() self.driver.on_port_status_changed.reset_mock() self.clientv3.watch_prefix.return_value = _iterator(), _cancel - self.watcher.start() - self.driver.on_felix_alive.assert_not_called() + self.endpoint_watcher.start() self.driver.on_port_status_changed.assert_has_calls( [ mock.call("unknown", "ep2", {"status": "up"}, priority="low"), @@ -2794,11 +2883,9 @@ def _cancel(): # Resync after deleting the unknown host endpoint. We should see that # endpoint reported with status None. del self.etcd_data[ep_on_unknown_host_key] - self.driver.on_felix_alive.reset_mock() self.driver.on_port_status_changed.reset_mock() self.clientv3.watch_prefix.return_value = _iterator(), _cancel - self.watcher.start() - self.driver.on_felix_alive.assert_not_called() + self.endpoint_watcher.start() self.driver.on_port_status_changed.assert_has_calls( [ mock.call("unknown", "ep2", None, priority="low"), @@ -2807,21 +2894,6 @@ def _cancel(): any_order=True, ) - # Resync after deleting the Felix status. This does not affect the - # status of ep1. - del self.etcd_data[felix_status_key] - self.driver.on_felix_alive.reset_mock() - self.driver.on_port_status_changed.reset_mock() - self.clientv3.watch_prefix.return_value = _iterator(), _cancel - self.watcher.start() - self.driver.on_felix_alive.assert_not_called() - self.driver.on_port_status_changed.assert_has_calls( - [ - mock.call("hostname", "ep1", {"status": "up"}, priority="low"), - ], - any_order=True, - ) - # Resync with some follow-on events; checks that the priority goes # back to high after the snapshot. watch_events = [ @@ -2836,11 +2908,9 @@ def _cancel(): "type": "SET", } ] - self.driver.on_felix_alive.reset_mock() self.driver.on_port_status_changed.reset_mock() self.clientv3.watch_prefix.return_value = _iterator(), _cancel - self.watcher.start() - self.driver.on_felix_alive.assert_not_called() + self.endpoint_watcher.start() self.driver.on_port_status_changed.assert_has_calls( [ mock.call("hostname", "ep1", {"status": "up"}, priority="high"), @@ -2851,7 +2921,9 @@ def _cancel(): def test_endpoint_status_add_delete(self): m_port_status_node = self._add_test_endpoint() m_port_status_node.action = "delete" - self.watcher._on_ep_delete(m_port_status_node, "hostname", "wlid", "ep1") + self.endpoint_watcher._on_ep_delete( + m_port_status_node, "hostname", "wlid", "ep1" + ) self.assertEqual( [ @@ -2860,7 +2932,7 @@ def test_endpoint_status_add_delete(self): ], self.driver.on_port_status_changed.mock_calls, ) - self.assertEqual({}, self.watcher._endpoints_by_host) + self.assertEqual({}, self.endpoint_watcher._endpoints_by_host) def test_endpoint_status_add_bad_json(self): m_port_status_node = mock.Mock() @@ -2869,7 +2941,7 @@ def test_endpoint_status_add_bad_json(self): "openstack/wlid/endpoint/ep1" ) m_port_status_node.value = '{"status": "up"' - self.watcher._on_ep_set(m_port_status_node, "hostname", "wlid", "ep1") + self.endpoint_watcher._on_ep_set(m_port_status_node, "hostname", "wlid", "ep1") self.assertEqual( [ @@ -2877,23 +2949,23 @@ def test_endpoint_status_add_bad_json(self): ], self.driver.on_port_status_changed.mock_calls, ) - self.assertEqual({}, self.watcher._endpoints_by_host) + self.assertEqual({}, self.endpoint_watcher._endpoints_by_host) def test_endpoint_status_add_bad_id(self): m_port_status_node = mock.Mock() m_port_status_node.key = ( "/calico/felix/v2/no-region/host/hostname/workload/openstack/wlid/endpoint" ) - self.watcher._on_ep_set(m_port_status_node, "hostname", "wlid", "ep1") + self.endpoint_watcher._on_ep_set(m_port_status_node, "hostname", "wlid", "ep1") self.assertEqual([], self.driver.on_port_status_changed.mock_calls) - self.assertEqual({}, self.watcher._endpoints_by_host) + self.assertEqual({}, self.endpoint_watcher._endpoints_by_host) def test_status_bad_json(self): for value in ["{", 10, "foo"]: m_response = mock.Mock() m_response.key = "/calico/felix/v2/no-region/host/hostname/status" m_response.value = value - self.watcher._on_status_set(m_response, "foo") + self.agent_watcher._on_status_set(m_response, "foo") self.assertFalse(self.driver.on_felix_alive.called) def test_felix_status_expiry(self): @@ -2904,12 +2976,12 @@ def test_felix_status_expiry(self): "openstack/wlid/endpoint/epid" ) m_response.value = '{"status": "up"}' - self.watcher._on_ep_set(m_response, "hostname", "wlid", "epid") + self.endpoint_watcher._on_ep_set(m_response, "hostname", "wlid", "epid") # Then note that felix is down. m_response = mock.Mock() m_response.key = "/calico/felix/v2/no-region/host/hostname/status" - self.watcher._on_status_del(m_response, "hostname") + self.agent_watcher._on_status_del(m_response, "hostname") # Check that nothing happens to the port. (Previously, we used to mark # the port as in ERROR but that behaviour was removed due to its @@ -2931,7 +3003,9 @@ def setUp_region(self): def test_endpoint_status_add_delete(self): m_port_status_node = self._add_test_endpoint() m_port_status_node.action = "delete" - self.watcher._on_ep_delete(m_port_status_node, "hostname", "wlid", "ep1") + self.endpoint_watcher._on_ep_delete( + m_port_status_node, "hostname", "wlid", "ep1" + ) self.assertEqual( [ @@ -2940,7 +3014,7 @@ def test_endpoint_status_add_delete(self): ], self.driver.on_port_status_changed.mock_calls, ) - self.assertEqual({}, self.watcher._endpoints_by_host) + self.assertEqual({}, self.endpoint_watcher._endpoints_by_host) def test_handle_port_this_region(self): # Simulate status update for a workload in this region. @@ -2952,7 +3026,7 @@ def test_handle_port_this_region(self): ) m_port_status_node.value = '{"status": "up"}' m_port_status_node.action = "set" - self.watcher.dispatcher.handle_event(m_port_status_node) + self.endpoint_watcher.dispatcher.handle_event(m_port_status_node) self.assertEqual( [ mock.call("hostname", "ep1", {"status": "up"}, priority="high"), @@ -2969,7 +3043,7 @@ def test_ignore_port_other_region(self): ) m_port_status_node.value = '{"status": "up"}' m_port_status_node.action = "set" - self.watcher.dispatcher.handle_event(m_port_status_node) + self.endpoint_watcher.dispatcher.handle_event(m_port_status_node) self.assertEqual([], self.driver.on_port_status_changed.mock_calls) def test_handle_felix_this_region(self): @@ -2985,7 +3059,7 @@ def test_handle_felix_this_region(self): "first_update": True, } ) - self.watcher.dispatcher.handle_event(m_response) + self.agent_watcher.dispatcher.handle_event(m_response) self.assertTrue(self.driver.on_felix_alive.called) def test_ignore_felix_other_region(self): @@ -2999,7 +3073,7 @@ def test_ignore_felix_other_region(self): "first_update": True, } ) - self.watcher.dispatcher.handle_event(m_response) + self.agent_watcher.dispatcher.handle_event(m_response) self.assertFalse(self.driver.on_felix_alive.called) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py index 1986e0acaec..16504aed82d 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py @@ -90,5 +90,77 @@ def stop(self): # threading.Event.set() is idempotent. self._stop_event.set() + +class CalicoManagerWorker(worker.BaseWorker): + """Service for doing election and compaction. + + The super class will trigger the post_fork_initialize in the mech driver. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def start(self, name="calico-manager", desc=None): + """Start service.""" + super(CalicoManagerWorker, self).start(name, desc) + + def stop(self): + """Stop service.""" + super(CalicoManagerWorker, self).stop() + + def wait(self): + """Wait for service to complete.""" + super(CalicoManagerWorker, self).wait() + + def reset(self): + config.reset_service() + + +class CalicoAgentStatusWatcherWorker(worker.BaseWorker): + """Service for watching and updating calico-felix agent health. + + The super class will trigger the post_fork_initialize in the mech driver. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def start(self, name="calico-agent-status-watcher", desc=None): + """Start service.""" + super(CalicoAgentStatusWatcherWorker, self).start(name, desc) + + def stop(self): + """Stop service.""" + super(CalicoAgentStatusWatcherWorker, self).stop() + + def wait(self): + """Wait for service to complete.""" + super(CalicoAgentStatusWatcherWorker, self).wait() + + def reset(self): + config.reset_service() + + +class CalicoEndpointStatusWatcherWorker(worker.BaseWorker): + """Service for watching and updating endpoint status. + + The super class will trigger the post_fork_initialize in the mech driver. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def start(self, name="calico-endpoint-status-watcher", desc=None): + """Start service.""" + super(CalicoEndpointStatusWatcherWorker, self).start(name, desc) + + def stop(self): + """Stop service.""" + super(CalicoEndpointStatusWatcherWorker, self).stop() + + def wait(self): + """Wait for service to complete.""" + super(CalicoEndpointStatusWatcherWorker, self).wait() + def reset(self): config.reset_service() From 3111aa333ba6dee8b55acbae9fbe56a3d0cc5757 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Thu, 30 Apr 2026 18:03:44 -0400 Subject: [PATCH 02/10] misc: Minor test text fix Signed-off-by: Zhan Zhang --- .../plugins/ml2/drivers/calico/test/test_plugin_etcd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py index 246f5a87ec2..bffa48804c6 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py @@ -2541,7 +2541,7 @@ def maybe_end_loop(*args, **kwargs): ) def test_endpoint_status_thread_mainline(self, m_watcher): count = [0] - m_watcher.__name__ = "StatusWatcher" + m_watcher.__name__ = "EndpointStatusWatcher" self.driver.is_master = mock.Mock() self.driver.is_master.return_value = True From 6f4e54e133df7f43125d970dcc970c84e1d3e831 Mon Sep 17 00:00:00 2001 From: Nell Jerram Date: Fri, 1 May 2026 14:17:15 +0100 Subject: [PATCH 03/10] make -C networking-calico flake8 fmtpy --- .../plugins/ml2/drivers/calico/mech_calico.py | 4 +++- .../plugins/ml2/drivers/calico/test/test_plugin_etcd.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index 71af57951c4..6ec3fa4fd13 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -391,7 +391,9 @@ def post_fork_initialize(self, resource, event, trigger, payload=None): CalicoManagerWorker: self._init_and_start_calico_manager, CalicoStartupResyncWorker: self._init_and_start_calico_resouce_syncer, CalicoAgentStatusWatcherWorker: self._init_and_start_agent_status_watcher, - CalicoEndpointStatusWatcherWorker: self._init_and_start_endpoint_status_watcher, + CalicoEndpointStatusWatcherWorker: ( + self._init_and_start_endpoint_status_watcher + ), } if trigger_cls in worker_mapping: diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py index bffa48804c6..ac3d519011a 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py @@ -2776,7 +2776,7 @@ def test_tls(self): lib.m_oslo_config.cfg.CONF.calico.etcd_cert_file = "cert-file" lib.m_oslo_config.cfg.CONF.calico.etcd_ca_cert_file = "ca-cert-file" lib.m_oslo_config.cfg.CONF.calico.etcd_key_file = "key-file" - _watcher = status.StatusWatcher(self.driver) + _ = status.StatusWatcher(self.driver) @mock.patch("eventlet.spawn") def test_snapshot_agent(self, _m_spawn): From a2a0cdedf31b95764339bc1d2d14778cc51579d2 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 20 May 2026 15:48:04 +0000 Subject: [PATCH 04/10] fix: Misc comment updates and remove unnecessary code Signed-off-by: Zhan Zhang --- .../plugins/ml2/drivers/calico/mech_calico.py | 4 +- .../plugins/ml2/drivers/calico/test/lib.py | 8 +-- .../drivers/calico/test/test_mech_calico.py | 57 +++++++------------ .../drivers/calico/test/test_plugin_etcd.py | 13 +---- 4 files changed, 29 insertions(+), 53 deletions(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index 6ec3fa4fd13..db0edbeb1d4 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -34,7 +34,6 @@ import eventlet from eventlet.queue import PriorityQueue -from neutron import wsgi from neutron.agent import rpc as agent_rpc from neutron.conf.agent import common as config from neutron.objects import ports as ports_object @@ -300,7 +299,7 @@ def __init__(self): ) qos_driver.register(self) # Generally initialize attributes to nil values. They get initialized - # properly, as needed, in _post_fork_init(). + # properly, as needed, in post_fork_initialize(). self.db = None self.elector = None self._agent_update_context = None @@ -428,7 +427,6 @@ def is_master(self): return refreshed_in_time - def _post_fork_inititialize_common(self): """Common post fork initialization. diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py index 85a42aa00b8..b32fb558f38 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py @@ -579,12 +579,12 @@ def do_post_fork_actions(self, uuid_str=None): the one UT process, which means: - Do the same startup preparations - DB connection etc. - that the API worker - process would do. These are all coded in ``_post_fork_init()``. This allows - tests to later call driver entrypoints like ``update_port_postcommit()``, - similarly as production Neutron would. + process would do. These are all coded in ``_post_fork_initialize_common()``. + This allows tests to later call driver entrypoints like + ``update_port_postcommit()``, similarly as production Neutron would. - Spawn the threads for "other work" (as above) as the RPC worker process would - do. This is achieved by calling ``_post_fork_init()`` with ``voting=True``. + do. This is achieved by calling ``_post_fork_inititialize_common()``. - Do the startup resync that the Calico resync process would do. This is coded in ``_do_startup_resync()``. diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py index 4623e04fed1..f97f2f7427d 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py @@ -13,17 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Tests for CalicoMechanismDriver initialization and voting behavior. - -These tests validate: - - Only ``_post_fork_init(voting=True)`` creates an Elector. - - ``_post_fork_init(voting=False)`` never creates an Elector. - - ``post_fork_initialize`` (the AFTER_INIT callback) dispatches correctly: - API workers (``neutron.wsgi.WorkerService``) get ``voting=False`` so they - never become master (per PR #11580); ``CalicoStartupResyncWorker`` runs - only the one-shot resync; any other worker (RPC etc.) gets - ``voting=True``. - - A worker initialization does not override the parent elector. +Tests for CalicoMechanismDriver initialization. """ import unittest @@ -61,35 +51,32 @@ def tearDown(self): super(TestMechanismDriverVoting, self).tearDown() def test_driver_init_common(self): - driver = mech_calico.CalicoMechanismDriver() - driver._post_fork_inititialize_common() + self.driver._post_fork_inititialize_common() - self.assertIsNotNone(driver.db) - self.assertIsNotNone(driver.subnet_syncer) - self.assertIsNotNone(driver.policy_syncer) - self.assertIsNotNone(driver.endpoint_syncer) - self.assertIsNotNone(driver._agent_update_context) - self.assertIsNotNone(driver.state_report_rpc) + self.assertIsNotNone(self.driver.db) + self.assertIsNotNone(self.driver.subnet_syncer) + self.assertIsNotNone(self.driver.policy_syncer) + self.assertIsNotNone(self.driver.endpoint_syncer) + self.assertIsNotNone(self.driver._agent_update_context) + self.assertIsNotNone(self.driver.state_report_rpc) @mock.patch("eventlet.spawn") def test_driver_init_calico_resource_syncer(self, m_spawn): m_spawn.return_value = True - driver = mech_calico.CalicoMechanismDriver() - driver._init_and_start_calico_resouce_syncer() + self.driver._init_and_start_calico_resouce_syncer() - self.assertTrue(driver.start_up_resync_thread) + self.assertTrue(self.driver.start_up_resync_thread) @mock.patch("eventlet.spawn") @mock.patch.object(mech_calico, "Elector") def test_driver_init_calico_manager(self, m_elector, m_spawn): m_spawn.return_value = True - driver = mech_calico.CalicoMechanismDriver() - driver._init_and_start_calico_manager() + self.driver._init_and_start_calico_manager() - self.assertTrue(driver.election_thread) - self.assertTrue(driver.periodic_compaction_thread) + self.assertTrue(self.driver.election_thread) + self.assertTrue(self.driver.periodic_compaction_thread) m_elector.assert_called_once() @@ -97,26 +84,24 @@ def test_driver_init_calico_manager(self, m_elector, m_spawn): def test_driver_init_calico_agent_status_watcher(self, m_spawn): m_spawn.return_value = True - driver = mech_calico.CalicoMechanismDriver() - driver._init_and_start_agent_status_watcher() + self.driver._init_and_start_agent_status_watcher() - self.assertTrue(driver.agent_status_watch_thread) + self.assertTrue(self.driver.agent_status_watch_thread) @mock.patch("eventlet.spawn") def test_driver_init_calico_endpoint_status_watcher(self, m_spawn): m_spawn.return_value = True - driver = mech_calico.CalicoMechanismDriver() - driver._init_and_start_endpoint_status_watcher() + self.driver._init_and_start_endpoint_status_watcher() - self.assertTrue(driver.endpoint_status_watch_thread) + self.assertTrue(self.driver.endpoint_status_watch_thread) self.assertEqual( - len(driver.port_status_update_threads), + len(self.driver.port_status_update_threads), lib.m_oslo_config.cfg.CONF.calico.num_port_status_threads, ) # We will also need to ensure that the required queues components # are also created. - self.assertIsNotNone(driver._port_status_cache) - self.assertIsNotNone(driver._port_status_queue) - self.assertIsNotNone(driver._port_status_queue_too_long) + self.assertIsNotNone(self.driver._port_status_cache) + self.assertIsNotNone(self.driver._port_status_queue) + self.assertIsNotNone(self.driver._port_status_queue_too_long) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py index ac3d519011a..6852fb36190 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py @@ -461,9 +461,9 @@ def _trigger_resync(self, expect_ok=True, **scope_kwargs): assertions happen to match the no-op output. Negative tests can opt out with ``expect_ok=False``. - If the driver has been initialised (post _post_fork_init) we reuse its syncers - so the same primed project cache is in play. Otherwise we let the runner build - fresh syncers against the mocked DB and Keystone. + If the driver has been initialised (post post_fork_initialize) we reuse its + syncers so the same primed project cache is in play. Otherwise we let the + runner build fresh syncers against the mocked DB and Keystone. """ result = resync.Scope( self.db, @@ -2017,7 +2017,6 @@ def _pre_migrate(self, dest_host=None): def test_pre_live_migration(self): """Pre-live-migration creates destination WEP and LiveMigration.""" - self.driver._post_fork_inititialize_common() self._do_initial_resync() self._pre_migrate() @@ -2037,7 +2036,6 @@ def test_pre_live_migration(self): def test_live_migration_succeeded(self): """After migration succeeds, source WEP deleted, dest WEP kept.""" - self.driver._post_fork_inititialize_common() self._do_initial_resync() self._pre_migrate() @@ -2071,7 +2069,6 @@ def test_live_migration_succeeded(self): def test_live_migration_failed(self): """After migration fails, dest WEP deleted, source WEP unchanged.""" - self.driver._post_fork_inititialize_common() self._do_initial_resync() self._pre_migrate() @@ -2103,7 +2100,6 @@ def test_live_migration_failed(self): def test_port_delete_during_migration(self): """Deleting port during migration cleans up both WEPs and LM.""" - self.driver._post_fork_inititialize_common() self._do_initial_resync() self._pre_migrate() @@ -2172,7 +2168,6 @@ def test_vif_plug_notification(self): @mock.patch("eventlet.spawn") def test_vif_plug_no_notification_for_non_migration(self, _m_spawn): """Felix 'up' on source host does NOT trigger Nova notification.""" - self.driver._init_and_start_endpoint_status_watcher() self._do_initial_resync() self.recent_writes = {} self.recent_deletes = set() @@ -2185,7 +2180,6 @@ def test_vif_plug_no_notification_for_non_migration(self, _m_spawn): def test_resync_creates_missing_live_migration(self): """Resync creates LiveMigration and dest WEP for migrating port.""" - self.driver._post_fork_inititialize_common() self._do_initial_resync() # Set up the port as mid-migration in the Neutron DB (migrating_to @@ -2202,7 +2196,6 @@ def test_resync_creates_missing_live_migration(self): def test_resync_deletes_stale_live_migration(self): """Resync deletes orphaned LiveMigration with no migrating port.""" - self.driver._post_fork_inititialize_common() self._do_initial_resync() # Inject a stale LiveMigration into etcd as if a migration was in From 8edbb00d2458e545fe485d7151a925d7be583d7d Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 20 May 2026 17:21:23 +0000 Subject: [PATCH 05/10] fix: Implement reset for CalicoStartupResyncWorker Signed-off-by: Zhan Zhang --- .../networking_calico/plugins/ml2/drivers/calico/workers.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py index 16504aed82d..e02f33bc7c0 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py @@ -90,6 +90,8 @@ def stop(self): # threading.Event.set() is idempotent. self._stop_event.set() + def reset(self): + config.reset_service() class CalicoManagerWorker(worker.BaseWorker): """Service for doing election and compaction. From 173da3fdf90096c9a99c6372da59f9b0f50a72ba Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 20 May 2026 17:30:42 +0000 Subject: [PATCH 06/10] fix: Fix linter Signed-off-by: Zhan Zhang --- .../networking_calico/plugins/ml2/drivers/calico/workers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py index e02f33bc7c0..93876e7bf42 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py @@ -93,6 +93,7 @@ def stop(self): def reset(self): config.reset_service() + class CalicoManagerWorker(worker.BaseWorker): """Service for doing election and compaction. From caf928e193680d782e5954e8ea58ee0742440fa6 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 20 May 2026 17:42:35 +0000 Subject: [PATCH 07/10] fix: Remove unneeded code and update comments Signed-off-by: Zhan Zhang --- .../plugins/ml2/drivers/calico/mech_calico.py | 16 +++------------- .../ml2/drivers/calico/test/test_mech_calico.py | 6 +++--- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index db0edbeb1d4..3fead092784 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -345,8 +345,7 @@ def get_workers(self): """Workers that neutron-server should fork on our behalf. Returns a list of ``neutron_lib.worker.BaseWorker`` instances, each of which - becomes one OS process. Today we ask for a single worker that runs the - one-shot resync. + becomes one OS process. ``[calico] startup_resync = never`` suppresses the worker entirely so the operator can take responsibility for resync (typically by running @@ -373,8 +372,8 @@ def post_fork_initialize(self, resource, event, trigger, payload=None): * ``CalicoStartupResyncWorker`` -> just the one-shot resync. - * ``neutron.wsgi.WorkerService`` -> connection state only, ``voting=False``. - Per PR #11580, API workers must never be elected master, because their + * ``neutron.wsgi.WorkerService`` -> connection state and worker threads only. + Per PR #11580, API workers must never run master-only jobs, because their primary job is to serve API requests quickly: getting tied up running the master-only background threads (status watcher, port-status writers, periodic compaction) would hurt API response latency, and the resync work @@ -437,15 +436,6 @@ def _post_fork_inititialize_common(self): self.db = None self._get_db() - # Create a Keystone client. - authcfg = cfg.CONF.keystone_authtoken - LOG.debug("authcfg = %r", authcfg) - for key in authcfg: - if "password" in key: - LOG.debug("authcfg[%s] = %s", key, "***") - else: - LOG.debug("authcfg[%s] = %s", key, authcfg[key]) - # Create syncers. self.subnet_syncer = SubnetSyncer(self.db, self._txn_from_context) self.policy_syncer = PolicySyncer(self.db, self._txn_from_context) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py index f97f2f7427d..fc64ee07695 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py @@ -24,10 +24,10 @@ from networking_calico import etcdv3 -class TestMechanismDriverVoting(lib.Lib, unittest.TestCase): +class TestMechanismDriver(lib.Lib, unittest.TestCase): def setUp(self): - super(TestMechanismDriverVoting, self).setUp() + super(TestMechanismDriver, self).setUp() lib.m_oslo_config.cfg.CONF.keystone_authtoken.auth_url = "" lib.m_oslo_config.cfg.CONF.calico.openstack_region = "no-region" @@ -48,7 +48,7 @@ def tearDown(self): # Reset global etcd client. etcdv3._client = None - super(TestMechanismDriverVoting, self).tearDown() + super(TestMechanismDriver, self).tearDown() def test_driver_init_common(self): self.driver._post_fork_inititialize_common() From eb205d46ba5c2be96a1ed28aedf92e0d12fd2720 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Tue, 26 May 2026 15:59:52 +0000 Subject: [PATCH 08/10] fix: Addressing comments Signed-off-by: Zhan Zhang --- .../plugins/ml2/drivers/calico/election.py | 3 +- .../plugins/ml2/drivers/calico/mech_calico.py | 68 +++++++++++-------- .../plugins/ml2/drivers/calico/test/lib.py | 10 ++- .../ml2/drivers/calico/test/test_election.py | 22 +++--- .../drivers/calico/test/test_mech_calico.py | 39 +++++++++-- .../drivers/calico/test/test_plugin_etcd.py | 18 ++--- 6 files changed, 97 insertions(+), 63 deletions(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py index fb76787e2ed..e32eed758fe 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/election.py @@ -108,8 +108,9 @@ def __init__( self._is_master.value = 0 self._greenlet = None - def run(self): + def start(self): self._greenlet = eventlet.spawn(self._run) + return self._greenlet def _run(self): """Main election thread run routine. diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index 3fead092784..9c600367b6b 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -306,8 +306,9 @@ def __init__(self): self._etcd_watcher = None self._etcd_watcher_thread = None self._my_pid = None - # Process-shared variable for checking if the current instance of - # neutron-server is elected as the leader or not. + # Variable shared across all processes that are forked for the + # current Neutron server. Tracks whether or not this Neutron server + # is the master for its OpenStack region. # "d" = double. Used for storing time.time(). See # https://docs.python.org/3/library/array.html#module-array self._is_master = multiprocessing.Value("d", 0) @@ -321,7 +322,8 @@ def __init__(self): self._last_status_queue_log_time = monotonic_time() # Flag for telling workers to stop. Only applicable to the - # calico worker processes. + # calico worker processes and currently used for unit tests + # only. self._stop_worker = False LOG.info("Created Calico mechanism driver %s", self) @@ -372,7 +374,7 @@ def post_fork_initialize(self, resource, event, trigger, payload=None): * ``CalicoStartupResyncWorker`` -> just the one-shot resync. - * ``neutron.wsgi.WorkerService`` -> connection state and worker threads only. + * ``neutron.wsgi.WorkerService`` -> indicates an API worker process. Per PR #11580, API workers must never run master-only jobs, because their primary job is to serve API requests quickly: getting tied up running the master-only background threads (status watcher, port-status writers, @@ -383,14 +385,21 @@ def post_fork_initialize(self, resource, event, trigger, payload=None): elector and master-only background threads. """ trigger_cls = _trigger_class(trigger) - self._post_fork_inititialize_common() + + # ResyncWorker is special-cased because the function can be called by CLI as + # well. Thus, all necessary init will happen in the _do_startup_resync + # function. + if trigger_cls is CalicoStartupResyncWorker: + self._init_start_calico_resource_syncer() + return + + self._post_fork_init() worker_mapping = { - CalicoManagerWorker: self._init_and_start_calico_manager, - CalicoStartupResyncWorker: self._init_and_start_calico_resouce_syncer, - CalicoAgentStatusWatcherWorker: self._init_and_start_agent_status_watcher, + CalicoManagerWorker: self._init_start_calico_manager, + CalicoAgentStatusWatcherWorker: self._init_start_agent_status_watcher, CalicoEndpointStatusWatcherWorker: ( - self._init_and_start_endpoint_status_watcher + self._init_start_endpoint_status_watcher ), } @@ -409,12 +418,12 @@ def is_master(self): In order for a neutron-server to be considered as a master, it needs to aquire the election key and actively maintain it. """ - # We were not elected. We are not the master. if self._is_master.value <= 0: + # We were not elected. We are not the master. return False # Else, let's check if we refresh the time within timeout. - time_till_last_refreshed = self._is_master.value - time.time() + time_till_last_refreshed = time.time() - self._is_master.value refreshed_in_time = time_till_last_refreshed < MASTER_TIMEOUT # If not, there is something wrong with elector!! @@ -426,7 +435,7 @@ def is_master(self): return refreshed_in_time - def _post_fork_inititialize_common(self): + def _post_fork_init(self): """Common post fork initialization. Creates the connection state required for talking to the Neutron DB @@ -443,22 +452,10 @@ def _post_fork_inititialize_common(self): self.db, self._txn_from_context, self.policy_syncer ) - # Admin context used by (only) the thread that updates Felix agent - # status. - self._agent_update_context = ctx.get_admin_context() - - # Get RPC connection for fanning out Felix state reports. - try: - state_report_topic = topics.REPORTS - except AttributeError: - # Older versions of OpenStack share the PLUGIN topic. - state_report_topic = topics.PLUGIN - self.state_report_rpc = agent_rpc.PluginReportStateAPI(state_report_topic) - - def _init_and_start_calico_resouce_syncer(self): + def _init_start_calico_resource_syncer(self): self.start_up_resync_thread = eventlet.spawn(self._do_startup_resync) - def _init_and_start_calico_manager(self): + def _init_start_calico_manager(self): self.elector = Elector( cfg.CONF.calico.elector_name, datamodel_v2.neutron_election_key(calico_config.get_region_string()), @@ -468,18 +465,31 @@ def _init_and_start_calico_manager(self): ttl=MASTER_TIMEOUT, ) - self.election_thread = eventlet.spawn(self.elector.run) + self.election_thread = self.elector.start() + if cfg.CONF.calico.etcd_compaction_period_mins > 0: self.periodic_compaction_thread = eventlet.spawn( self.do_periodic_compaction ) - def _init_and_start_agent_status_watcher(self): + def _init_start_agent_status_watcher(self): + # Admin context used by (only) the thread that updates Felix agent + # status. + self._agent_update_context = ctx.get_admin_context() + + # Get RPC connection for fanning out Felix state reports. + try: + state_report_topic = topics.REPORTS + except AttributeError: + # Older versions of OpenStack share the PLUGIN topic. + state_report_topic = topics.PLUGIN + self.state_report_rpc = agent_rpc.PluginReportStateAPI(state_report_topic) + self.agent_status_watch_thread = eventlet.spawn( self.watch_status_updates, AgentStatusWatcher ) - def _init_and_start_endpoint_status_watcher(self): + def _init_start_endpoint_status_watcher(self): # Mapping from (hostname, port-id) to Calico's status for a port. The # hostname is included to disambiguate between multiple copies of a # port, which may exist during a migration or a re-schedule. diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py index b32fb558f38..6d7f0721575 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/lib.py @@ -215,7 +215,7 @@ class GrandDukeOfSalzburg(object): def __init__(self, *args, **kwargs): pass - def run(self): + def start(self): pass def master(self): @@ -352,8 +352,6 @@ def setUp(self): # Create an instance of CalicoMechanismDriver. self.driver = mech_calico.CalicoMechanismDriver() - self.driver.is_master = mock.Mock() - self.driver.is_master.return_value = True # Hook the (mock) Neutron database. self.db = mech_calico.plugin_dir.get_plugin() @@ -579,12 +577,12 @@ def do_post_fork_actions(self, uuid_str=None): the one UT process, which means: - Do the same startup preparations - DB connection etc. - that the API worker - process would do. These are all coded in ``_post_fork_initialize_common()``. + process would do. These are all coded in ``_post_fork_init()``. This allows tests to later call driver entrypoints like ``update_port_postcommit()``, similarly as production Neutron would. - Spawn the threads for "other work" (as above) as the RPC worker process would - do. This is achieved by calling ``_post_fork_inititialize_common()``. + do. This is achieved by calling ``_post_fork_init()``. - Do the startup resync that the Calico resync process would do. This is coded in ``_do_startup_resync()``. @@ -594,7 +592,7 @@ def do_post_fork_actions(self, uuid_str=None): """ cm = FixedUUID(uuid_str) if uuid_str else contextlib.nullcontext() with cm: - self.driver._post_fork_inititialize_common() + self.driver._post_fork_init() if mech_calico.cfg.CONF.calico.startup_resync == "always": self.driver._do_startup_resync() diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py index fe55e37d8af..c3f72adeafb 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_election.py @@ -70,7 +70,7 @@ def test_invalid(self): interval=-1, ttl=15, ) - elector.run() + elector.start() self._wait_and_stop(etcdv3._client, elector) with self.assertRaises(ValueError): @@ -82,7 +82,7 @@ def test_invalid(self): interval=10, ttl=5, ) - elector.run() + elector.start() self._wait_and_stop(etcdv3._client, elector) def _wait_and_stop(self, client, elector): @@ -116,7 +116,7 @@ def test_basic_election(self): interval=5, ttl=15, ) - elector.run() + elector.start() self._wait_and_stop(client, elector) def test_become_master_first_time(self): @@ -135,7 +135,7 @@ def test_become_master_first_time(self): interval=5, ttl=15, ) - elector.run() + elector.start() self._wait_and_stop(client, elector) client.assert_key_written("/legacy") @@ -153,7 +153,7 @@ def test_fail_to_maintain(self): interval=5, ttl=15, ) - elector.run() + elector.start() self._wait_and_stop(client, elector) def test_become_master_multiple_attempts(self): @@ -174,7 +174,7 @@ def test_become_master_multiple_attempts(self): interval=5, ttl=15, ) - elector.run() + elector.start() self._wait_and_stop(client, elector) def test_become_master_implausible(self): @@ -193,7 +193,7 @@ def test_become_master_implausible(self): interval=5, ttl=15, ) - elector.run() + elector.start() self._wait_and_stop(client, elector) def test_initial_read_exceptions(self): @@ -211,7 +211,7 @@ def test_initial_read_exceptions(self): interval=5, ttl=15, ) - elector.run() + elector.start() self._wait_and_stop(client, elector) def test_exception_detail_logging(self): @@ -228,7 +228,7 @@ def test_exception_detail_logging(self): interval=5, ttl=15, ) - elector.run() + elector.start() self._wait_and_stop(client, elector) # Check that Etcd3Exception detail was logged. @@ -259,7 +259,7 @@ def test_later_exceptions(self): interval=5, ttl=15, ) - elector.run() + elector.start() self._wait_and_stop(client, elector) def test_master_failure(self): @@ -285,5 +285,5 @@ def test_master_failure(self): interval=5, ttl=15, ) - elector.run() + elector.start() self._wait_and_stop(client, elector) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py index fc64ee07695..48eb56d90b0 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_mech_calico.py @@ -51,20 +51,18 @@ def tearDown(self): super(TestMechanismDriver, self).tearDown() def test_driver_init_common(self): - self.driver._post_fork_inititialize_common() + self.driver._post_fork_init() self.assertIsNotNone(self.driver.db) self.assertIsNotNone(self.driver.subnet_syncer) self.assertIsNotNone(self.driver.policy_syncer) self.assertIsNotNone(self.driver.endpoint_syncer) - self.assertIsNotNone(self.driver._agent_update_context) - self.assertIsNotNone(self.driver.state_report_rpc) @mock.patch("eventlet.spawn") def test_driver_init_calico_resource_syncer(self, m_spawn): m_spawn.return_value = True - self.driver._init_and_start_calico_resouce_syncer() + self.driver._init_start_calico_resource_syncer() self.assertTrue(self.driver.start_up_resync_thread) @@ -73,18 +71,45 @@ def test_driver_init_calico_resource_syncer(self, m_spawn): def test_driver_init_calico_manager(self, m_elector, m_spawn): m_spawn.return_value = True - self.driver._init_and_start_calico_manager() + self.driver._init_start_calico_manager() self.assertTrue(self.driver.election_thread) self.assertTrue(self.driver.periodic_compaction_thread) m_elector.assert_called_once() + @mock.patch("time.time") + def test_is_master(self, m_time): + m_time.return_value = 5 + + self.driver._is_master = mock.MagicMock() + self.driver._is_master.value = 1 + + self.assertTrue(self.driver.is_master()) + + @mock.patch("time.time") + def test_is_not_master(self, m_time): + m_time.return_value = 5 + + self.driver._is_master = mock.MagicMock() + self.driver._is_master.value = 0 + + self.assertFalse(self.driver.is_master()) + + @mock.patch("time.time") + def test_is_not_master_timeout(self, m_time): + m_time.return_value = mech_calico.MASTER_TIMEOUT + 100 + + self.driver._is_master = mock.MagicMock() + self.driver._is_master.value = 1 + + self.assertFalse(self.driver.is_master()) + @mock.patch("eventlet.spawn") def test_driver_init_calico_agent_status_watcher(self, m_spawn): m_spawn.return_value = True - self.driver._init_and_start_agent_status_watcher() + self.driver._init_start_agent_status_watcher() self.assertTrue(self.driver.agent_status_watch_thread) @@ -92,7 +117,7 @@ def test_driver_init_calico_agent_status_watcher(self, m_spawn): def test_driver_init_calico_endpoint_status_watcher(self, m_spawn): m_spawn.return_value = True - self.driver._init_and_start_endpoint_status_watcher() + self.driver._init_start_endpoint_status_watcher() self.assertTrue(self.driver.endpoint_status_watch_thread) self.assertEqual( diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py index 6852fb36190..b97fde7d536 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py @@ -437,11 +437,11 @@ def setUp(self): self.sg_default_key_v3: self.sg_default_value_v3, } - self.driver._post_fork_inititialize_common() - self.driver._init_and_start_calico_resouce_syncer() - self.driver._init_and_start_agent_status_watcher() - self.driver._init_and_start_calico_manager() - self.driver._init_and_start_endpoint_status_watcher() + self.driver._post_fork_init() + self.driver._init_start_calico_resource_syncer() + self.driver._init_start_agent_status_watcher() + self.driver._init_start_calico_manager() + self.driver._init_start_endpoint_status_watcher() def make_context(self): context = mock.MagicMock() @@ -2590,7 +2590,7 @@ def test_on_felix_alive(self): @mock.patch("eventlet.spawn") def test_on_port_status_changed(self, _m_spawn): - self.driver._init_and_start_endpoint_status_watcher() + self.driver._init_start_endpoint_status_watcher() self.driver._last_status_queue_log_time = monotonic_time() - 100 with mock.patch.object(self.driver, "_port_status_queue") as m_queue: m_queue.qsize.return_value = 100 @@ -2645,7 +2645,7 @@ def test_on_port_status_changed(self, _m_spawn): @mock.patch("eventlet.spawn") def test_loop_writing_port_statuses(self, _m_spawn): - self.driver._init_and_start_endpoint_status_watcher() + self.driver._init_start_endpoint_status_watcher() with mock.patch.object(self.driver, "_port_status_queue") as m_queue: with mock.patch.object( @@ -2666,7 +2666,7 @@ def test_loop_writing_port_statuses(self, _m_spawn): @mock.patch("eventlet.spawn") def test_try_to_update_port_status(self, _m_spawn): self.driver._get_db() - self.driver._init_and_start_endpoint_status_watcher() + self.driver._init_start_endpoint_status_watcher() mock_calls = [] @@ -2690,7 +2690,7 @@ def m_update_port_status(context, port_id, status, host=None): @mock.patch("eventlet.spawn") def test_try_to_update_port_status_fail(self, _m_spawn): self.driver._get_db() - self.driver._init_and_start_endpoint_status_watcher() + self.driver._init_start_endpoint_status_watcher() mock_calls = [] From 217220fe056ced57f40dd94553fcd6142ec688aa Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Tue, 16 Jun 2026 20:46:38 +0000 Subject: [PATCH 09/10] fix: Address comments Signed-off-by: Zhan Zhang --- .../plugins/ml2/drivers/calico/mech_calico.py | 6 ++-- .../drivers/calico/test/test_plugin_etcd.py | 3 +- .../plugins/ml2/drivers/calico/workers.py | 33 ------------------- 3 files changed, 4 insertions(+), 38 deletions(-) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py index 8c23d7fa7f0..5560568aee2 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/mech_calico.py @@ -488,14 +488,14 @@ def is_master(self): return False # Else, let's check if we refresh the time within timeout. - time_till_last_refreshed = time.time() - self._is_master.value - refreshed_in_time = time_till_last_refreshed < MASTER_TIMEOUT + time_since_last_refreshed = time.time() - self._is_master.value + refreshed_in_time = time_since_last_refreshed < MASTER_TIMEOUT # If not, there is something wrong with elector!! if not refreshed_in_time: LOG.warning( "The elector hasn't refreshed the lease in " - f"{time_till_last_refreshed}s." + f"{time_since_last_refreshed}s." ) return refreshed_in_time diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py index afaae3ff297..81309958406 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/test/test_plugin_etcd.py @@ -2178,8 +2178,7 @@ def test_vif_plug_notification(self): mock_db_port ) - @mock.patch("eventlet.spawn") - def test_vif_plug_no_notification_for_non_migration(self, _m_spawn): + def test_vif_plug_no_notification_for_non_migration(self): """Felix 'up' on source host does NOT trigger Nova notification.""" self._do_initial_resync() self.recent_writes = {} diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py index 93876e7bf42..5ef0ad156f1 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py @@ -100,21 +100,10 @@ class CalicoManagerWorker(worker.BaseWorker): The super class will trigger the post_fork_initialize in the mech driver. """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - def start(self, name="calico-manager", desc=None): """Start service.""" super(CalicoManagerWorker, self).start(name, desc) - def stop(self): - """Stop service.""" - super(CalicoManagerWorker, self).stop() - - def wait(self): - """Wait for service to complete.""" - super(CalicoManagerWorker, self).wait() - def reset(self): config.reset_service() @@ -125,21 +114,10 @@ class CalicoAgentStatusWatcherWorker(worker.BaseWorker): The super class will trigger the post_fork_initialize in the mech driver. """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - def start(self, name="calico-agent-status-watcher", desc=None): """Start service.""" super(CalicoAgentStatusWatcherWorker, self).start(name, desc) - def stop(self): - """Stop service.""" - super(CalicoAgentStatusWatcherWorker, self).stop() - - def wait(self): - """Wait for service to complete.""" - super(CalicoAgentStatusWatcherWorker, self).wait() - def reset(self): config.reset_service() @@ -150,20 +128,9 @@ class CalicoEndpointStatusWatcherWorker(worker.BaseWorker): The super class will trigger the post_fork_initialize in the mech driver. """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - def start(self, name="calico-endpoint-status-watcher", desc=None): """Start service.""" super(CalicoEndpointStatusWatcherWorker, self).start(name, desc) - def stop(self): - """Stop service.""" - super(CalicoEndpointStatusWatcherWorker, self).stop() - - def wait(self): - """Wait for service to complete.""" - super(CalicoEndpointStatusWatcherWorker, self).wait() - def reset(self): config.reset_service() From 181f5cf381f8cd2ea8b066026b39d16022f18d03 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Tue, 16 Jun 2026 21:03:19 +0000 Subject: [PATCH 10/10] fix: Add stop and wait functions back Signed-off-by: Zhan Zhang --- .../plugins/ml2/drivers/calico/workers.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py index 5ef0ad156f1..c2d2267a37a 100644 --- a/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py +++ b/networking-calico/networking_calico/plugins/ml2/drivers/calico/workers.py @@ -104,6 +104,14 @@ def start(self, name="calico-manager", desc=None): """Start service.""" super(CalicoManagerWorker, self).start(name, desc) + def stop(self): + """Stop service.""" + super(CalicoManagerWorker, self).stop() + + def wait(self): + """Wait for service to complete.""" + super(CalicoManagerWorker, self).wait() + def reset(self): config.reset_service() @@ -118,6 +126,14 @@ def start(self, name="calico-agent-status-watcher", desc=None): """Start service.""" super(CalicoAgentStatusWatcherWorker, self).start(name, desc) + def stop(self): + """Stop service.""" + super(CalicoAgentStatusWatcherWorker, self).stop() + + def wait(self): + """Wait for service to complete.""" + super(CalicoAgentStatusWatcherWorker, self).wait() + def reset(self): config.reset_service() @@ -132,5 +148,13 @@ def start(self, name="calico-endpoint-status-watcher", desc=None): """Start service.""" super(CalicoEndpointStatusWatcherWorker, self).start(name, desc) + def stop(self): + """Stop service.""" + super(CalicoEndpointStatusWatcherWorker, self).stop() + + def wait(self): + """Wait for service to complete.""" + super(CalicoEndpointStatusWatcherWorker, self).wait() + def reset(self): config.reset_service()