Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/66282.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed master 4505 publish port becoming unresponsive under load: TCP `PubServer` now broadcasts to subscribers concurrently so a single slow subscriber no longer stalls the event publisher loop, and the ZeroMQ master PUB socket now enables ZMTP heartbeats so dead subscribers are reaped within seconds instead of waiting for the kernel TCP keepalive.
23 changes: 16 additions & 7 deletions salt/transport/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,27 +903,36 @@ def publish_payload(self, package, topic_list=None):
log.trace("TCP PubServer sending payload: %s \n\n %r", package, topic_list)
payload = salt.transport.frame.frame_msg(package)
to_remove = []
# Start writes to every targeted client concurrently so a single
# slow subscriber can't stall delivery to the rest of the fleet.
# See https://github.com/saltstack/salt/issues/66282 — sequential
# ``yield client.stream.write(...)`` was clogging the event
# publisher loop, growing per-client write buffers and eventually
# wedging the master.
write_futures = []
if topic_list:
for topic in topic_list:
sent = False
for client in self.clients:
for client in list(self.clients):
if topic == client.id_:
try:
# Write the packed str
yield client.stream.write(payload)
write_futures.append((client, client.stream.write(payload)))
sent = True
# self.io_loop.add_future(f, lambda f: True)
except salt.ext.tornado.iostream.StreamClosedError:
to_remove.append(client)
if not sent:
log.debug("Publish target %s not connected %r", topic, self.clients)
else:
for client in self.clients:
for client in list(self.clients):
try:
# Write the packed str
yield client.stream.write(payload)
write_futures.append((client, client.stream.write(payload)))
except salt.ext.tornado.iostream.StreamClosedError:
to_remove.append(client)
for client, future in write_futures:
try:
yield future
except salt.ext.tornado.iostream.StreamClosedError:
to_remove.append(client)
for client in to_remove:
log.debug(
"Subscriber at %s has disconnected from publisher", client.address
Expand Down
37 changes: 37 additions & 0 deletions salt/transport/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,42 @@ def _set_tcp_keepalive(zmq_socket, opts):
zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, opts["tcp_keepalive_intvl"])


# Defaults are intentionally generous: small enough to reap dead SUB peers
# within seconds (rather than the ~2h15m kernel TCP keepalive default), but
# large enough not to disrupt a healthy fleet of thousands of minions on a
# laggy network. Operators can tune via ``zmq_heartbeat_ivl`` /
# ``zmq_heartbeat_timeout`` in milliseconds.
_DEFAULT_ZMQ_HEARTBEAT_IVL = 10000 # 10s between heartbeats
_DEFAULT_ZMQ_HEARTBEAT_TIMEOUT = 30000 # 30s with no response -> peer dead


def _set_zmq_heartbeat(zmq_socket, opts):
"""
Enable ZMTP heartbeats on a ZeroMQ socket.

Without ``ZMQ_HEARTBEAT_IVL`` / ``ZMQ_HEARTBEAT_TIMEOUT`` configured,
ZMQ relies on the kernel TCP keepalive to notice a peer that vanished
without sending FIN (host reboot, kernel panic, dropped firewall rule).
On Linux that's ~2h15m by default, during which the master's PUB
socket keeps buffering for the dead peer and ``netstat`` accumulates
``CLOSE_WAIT`` entries on port 4505 — eventually the master stops
accepting new connections. See
https://github.com/saltstack/salt/issues/66282.

Heartbeat opts are configured in milliseconds. Setting ``ivl`` or
``timeout`` to ``0`` disables the corresponding option (matching the
ZMQ defaults).
"""
if not opts:
opts = {}
ivl = int(opts.get("zmq_heartbeat_ivl", _DEFAULT_ZMQ_HEARTBEAT_IVL))
timeout = int(opts.get("zmq_heartbeat_timeout", _DEFAULT_ZMQ_HEARTBEAT_TIMEOUT))
if hasattr(zmq, "HEARTBEAT_IVL"):
zmq_socket.setsockopt(zmq.HEARTBEAT_IVL, ivl)
if hasattr(zmq, "HEARTBEAT_TIMEOUT"):
zmq_socket.setsockopt(zmq.HEARTBEAT_TIMEOUT, timeout)


# TODO: unit tests!
class AsyncReqMessageClient:
"""
Expand Down Expand Up @@ -1122,6 +1158,7 @@ def publish_daemon(
monitor = ZeroMQSocketMonitor(pub_sock)
monitor.start_io_loop(ioloop)
_set_tcp_keepalive(pub_sock, self.opts)
_set_zmq_heartbeat(pub_sock, self.opts)
self.dpub_sock = pub_sock = zmq.eventloop.zmqstream.ZMQStream(pub_sock)
# if 2.1 >= zmq < 3.0, we only have one HWM setting
try:
Expand Down
113 changes: 113 additions & 0 deletions tests/pytests/functional/transport/tcp/test_pub_server_stability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
Regression tests for salt-master 4505 publish-port stability.

See https://github.com/saltstack/salt/issues/66282 — under prolonged load a
single slow IPC subscriber stalls the entire TCP ``PubServer`` broadcast
loop because ``publish_payload`` ``yield``s each ``client.stream.write()``
serially. Bytes accumulate in the tornado write buffer of the slow
client, the publisher coroutine can't service the next event from the
pull socket, and the master eventually appears wedged.
"""

import logging
import time

import pytest

import salt.ext.tornado.gen
import salt.ext.tornado.ioloop
import salt.transport.tcp

log = logging.getLogger(__name__)


class _FakeStream:
"""Mimic the bits of ``IOStream`` ``PubServer`` touches."""

def __init__(self, write_delay=0.0, name=""):
self.write_delay = write_delay
self.name = name
self.writes = []
self._closed = False
self.received_at = None

def closed(self):
return self._closed

def close(self):
self._closed = True

@salt.ext.tornado.gen.coroutine
def write(self, payload):
# Record receipt time *before* sleeping so the assertion measures
# the moment the broadcast loop actually reached this client, not
# the moment it finished.
self.received_at = time.monotonic()
self.writes.append(payload)
if self.write_delay:
yield salt.ext.tornado.gen.sleep(self.write_delay)


def _make_subscriber(stream, name):
sub = salt.transport.tcp.Subscriber(stream, name)
sub.id_ = name
return sub


@pytest.mark.timeout(60)
async def test_slow_subscriber_does_not_block_fast_subscriber(master_opts):
"""
A single slow subscriber must not stall delivery to other subscribers.

Without the fix, the ``for client in self.clients: yield ...write()``
loop in ``PubServer.publish_payload`` is serial: when the slow client
is iterated first, the fast client doesn't see its bytes until the
slow client's write completes ~3 s later. The test asserts the fast
client receives its payload within 1 s of ``publish_payload`` being
called.
"""
master_opts["transport"] = "tcp"
server = salt.transport.tcp.PubServer(
master_opts, io_loop=salt.ext.tornado.ioloop.IOLoop.current()
)
try:
slow_stream = _FakeStream(write_delay=3.0, name="slow")
fast_stream = _FakeStream(write_delay=0.0, name="fast")
slow_sub = _make_subscriber(slow_stream, "slow")
fast_sub = _make_subscriber(fast_stream, "fast")
# ``self.clients`` is a ``set`` in production so iteration order is
# undefined. Pin the order [slow, fast] so the bug is hit
# deterministically.
server.clients = [slow_sub, fast_sub]

start = time.monotonic()
# Don't await the broadcast — we want to observe the fast client
# *during* the slow client's write. ``publish_payload`` is a
# tornado-coroutine that returns a ``salt.ext.tornado.concurrent.Future``;
# ``convert_yielded`` makes it awaitable from an ``async def`` driven
# by the IOLoop running this test.
broadcast = salt.ext.tornado.gen.convert_yielded(
server.publish_payload({"jid": "abc"})
)
# Poll up to 1 s for the fast client to have received the payload.
deadline = start + 1.0
while time.monotonic() < deadline and fast_stream.received_at is None:
await salt.ext.tornado.gen.convert_yielded(salt.ext.tornado.gen.sleep(0.05))
elapsed = (
(fast_stream.received_at - start)
if fast_stream.received_at is not None
else None
)
# Let the broadcast finish so we don't leak the slow coroutine.
await broadcast
finally:
server.close()

assert elapsed is not None, (
"fast subscriber did not receive the payload within 1s of publish; "
"PubServer.publish_payload is broadcasting serially (#66282)"
)
assert elapsed < 1.0, (
f"fast subscriber waited {elapsed:.2f}s behind slow subscriber; "
"PubServer.publish_payload is broadcasting serially (#66282)"
)
90 changes: 90 additions & 0 deletions tests/pytests/unit/transport/test_zeromq_pub_stability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""
Regression tests for ZMQ PUB-socket heartbeat configuration.
See https://github.com/saltstack/salt/issues/66282 — when a SUB peer dies
without sending FIN (host reboot, kernel panic, firewall block) the master's
PUB socket only reaps that peer once kernel TCP keepalive expires, which is
~2 h 15 min on default Linux. ZMTP heartbeats (``ZMQ_HEARTBEAT_IVL`` /
``ZMQ_HEARTBEAT_TIMEOUT``) reduce that to seconds, which prevents the
``CLOSE_WAIT`` build-up and per-peer buffer growth that eventually wedges
port 4505.
These tests pin the heartbeat options on the publisher socket. The fix
adds them via a helper invoked from ``publish_daemon``; the helper is
unit-tested here without bringing up the full daemon.
"""

import logging

import pytest
import zmq

import salt.transport.zeromq

log = logging.getLogger(__name__)


pytestmark = [
pytest.mark.core_test,
]


@pytest.fixture
def zmq_context():
ctx = zmq.Context()
try:
yield ctx
finally:
ctx.destroy(linger=0)


def test_pub_socket_has_zmtp_heartbeats_configured(zmq_context):
"""
``salt.transport.zeromq._set_zmq_heartbeat`` must enable ZMTP
heartbeats on the PUB socket so dead subscribers are reaped in
seconds instead of hours. Without this, ``netstat`` accumulates
``CLOSE_WAIT`` entries on port 4505 and the master eventually stops
accepting new connections (#66282).
"""
sock = zmq_context.socket(zmq.PUB)
try:
# Default ZMQ behaviour: heartbeats disabled. Sanity check that
# the test is actually verifying a non-default configuration.
assert sock.getsockopt(zmq.HEARTBEAT_IVL) == 0
assert sock.getsockopt(zmq.HEARTBEAT_TIMEOUT) == -1

opts = {}
salt.transport.zeromq._set_zmq_heartbeat(sock, opts)

ivl = sock.getsockopt(zmq.HEARTBEAT_IVL)
timeout = sock.getsockopt(zmq.HEARTBEAT_TIMEOUT)
assert ivl > 0, (
"PUB socket has ZMTP HEARTBEAT_IVL disabled; dead SUB peers "
"will not be reaped until kernel TCP keepalive expires "
"(~2h15m). #66282"
)
assert timeout > 0, (
"PUB socket has ZMTP HEARTBEAT_TIMEOUT unset; dead SUB peers "
"will not be reaped until kernel TCP keepalive expires. "
"#66282"
)
# The timeout should exceed the interval; otherwise heartbeats
# would mark live peers as dead.
assert timeout > ivl
finally:
sock.close()


def test_pub_socket_heartbeat_respects_opts(zmq_context):
"""
Operators must be able to tune the heartbeat interval and timeout
via ``zmq_heartbeat_ivl`` / ``zmq_heartbeat_timeout`` master config.
"""
sock = zmq_context.socket(zmq.PUB)
try:
opts = {"zmq_heartbeat_ivl": 5000, "zmq_heartbeat_timeout": 15000}
salt.transport.zeromq._set_zmq_heartbeat(sock, opts)
assert sock.getsockopt(zmq.HEARTBEAT_IVL) == 5000
assert sock.getsockopt(zmq.HEARTBEAT_TIMEOUT) == 15000
finally:
sock.close()
Loading