Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
"""
import os
import random
import re
import socket
import sys
import time

from etcd3gw.exceptions import Etcd3Exception

Expand Down Expand Up @@ -65,14 +65,19 @@ class RestartElection(Exception):


class Elector(object):
def __init__(self, server_id, election_key, old_key=None, interval=30, ttl=60):
def __init__(
self, server_id, election_key, is_master, old_key=None, interval=30, ttl=60
):
"""Class that manages elections.

:param server_id: Server ID. Must be unique to this server, and should
take a value that is meaningful in logs (e.g.
hostname)
:param election_key: The etcd key used for the election - e.g.
"/calico/v2/no-region/election"
:param is_master: Process-shared value, used by other processes to
determine whether the current neutron-server
instance is the master.
:param old_key: A legacy key that does not determine the election, but
that we write whenever we write the election_key - e.g.
"/calico/v1/election"
Expand All @@ -91,6 +96,7 @@ def __init__(self, server_id, election_key, old_key=None, interval=30, ttl=60):
self._interval = int(interval)
self._ttl = int(ttl)
self._stopped = False
self._is_master = is_master

if self._interval <= 0:
raise ValueError("Interval %r is <= 0" % interval)
Expand All @@ -99,10 +105,12 @@ def __init__(self, server_id, election_key, old_key=None, interval=30, ttl=60):
raise ValueError("TTL %r is <= interval %r" % (ttl, interval))

# Is this the master? To start with, no
self._master = False
self._is_master.value = 0
self._greenlet = None

# Keep the greenlet ID handy to ease UT.
def start(self):
self._greenlet = eventlet.spawn(self._run)
return self._greenlet

def _run(self):
"""Main election thread run routine.
Expand Down Expand Up @@ -156,10 +164,6 @@ def _vote(self):
return

LOG.debug("ID of elected master is : %s", value)
if value:
# If we happen to be on the same server, check if the master
# process is still alive.
self._check_master_process(value)

while not self._stopped:
# We know another instance is the master. Wait until something
Expand Down Expand Up @@ -207,42 +211,6 @@ def _vote(self):
)
self._become_master()

def _check_master_process(self, master_id):
"""_check_master_process

If the master happens to be on our host, checks if its process is
still alive. If it is not, cleans up the now-stale election key.

:param master_id: Value loaded from the election key.
"""
# Defensive. In case we ever change the master ID format, only parse
# it if it looks like what we expect.
match = re.match(r"^(?P<host>[^:]+):(?P<pid>\d+)$", master_id)
if not match:
LOG.warning("Unable to parse master ID: %r.", master_id)
return
host = match.group("host")
pid = int(match.group("pid"))
LOG.debug("Parsed key as host = %s, PID = %s", host, pid)
if host == self._server_id:
# Check if the PID is still running.
LOG.debug("Previous master was on this server %s", host)
if os.path.exists("/proc/%s" % pid):
LOG.debug("Master still running")
else:
LOG.warning(
"Master was on this server but cannot find its "
"PID in /proc. Removing stale election key."
)
try:
deleted = etcdv3.delete(self._key, existing_value=master_id)
except Etcd3Exception as e:
self._log_exception("remove stale key from dead master", e)
deleted = False

if not deleted:
raise RestartElection()

def _become_master(self):
"""_become_master

Expand All @@ -254,23 +222,25 @@ def _become_master(self):
master.
Any other error from etcd is not caught in this routine.
"""
ok = False

try:
ttl_lease = etcdv3.get_lease(self._ttl)
self._master = etcdv3.put(
ok = etcdv3.put(
self._key, self.id_string, lease=ttl_lease, mod_revision="0"
)
except Exception as e:
# We could be smarter about what exceptions we allow, but any kind
# of error means we should give up, and safer to have a broad
# except here. Log and reconnect.
self._log_exception("become master", e)
self._master = False
self._is_master.value = 0

if not self._master:
if not ok:
LOG.info("Race: someone else beat us to be master")
raise RestartElection()

self._is_master.value = time.time()
LOG.info(
"Successfully become master - key %s, value %s", self._key, self.id_string
)
Expand All @@ -294,6 +264,10 @@ def _become_master(self):
):
LOG.warning("Key changed or deleted; restart election")
raise RestartElection()

# Successfully refreshed the role - let's update the
# timestamp.
self._is_master.value = time.time()
LOG.debug("Refreshed master role, TTL now is %d", ttl)
except RestartElection:
raise
Expand All @@ -310,7 +284,7 @@ def _become_master(self):
eventlet.sleep(self._interval)
finally:
LOG.info("Exiting master refresh loop, no longer the master")
self._master = False
self._is_master.value = 0
raise RestartElection()

def _write_old_key(self, lease):
Expand Down Expand Up @@ -347,21 +321,14 @@ def id_string(self):
return "%s:%d" % (self._server_id, os.getpid())

def _attempt_step_down(self):
self._master = False
self._is_master.value = 0
try:
etcdv3.delete(self._key, existing_value=self.id_string)
except Exception:
# Broad except because we're already on an error path. The key
# will expire anyway.
LOG.exception("Failed to step down as master. Ignoring.")

def master(self):
"""Am I the master?

returns: True if this is the master.
"""
return self._master and not self._stopped

def stop(self):
self._stopped = True
if self._greenlet and not self._greenlet.dead:
Expand Down
Loading
Loading