From 73c6970351b5764aee350285549ef5f22384ac33 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Wed, 17 Jun 2026 23:07:43 -0700 Subject: [PATCH 1/3] Broadcast TCP PubServer writes concurrently PubServer.publish_payload serially yielded each client.stream.write(payload), so a single slow subscriber stalled delivery to every other client. With dozens to thousands of minions connected the event publisher loop would fall behind, per-client write buffers would grow (matching reporter observations of the EventPublisher subprocess ballooning to hundreds of MB before restart), and the master would eventually appear wedged on its publish port. Schedule every write on the IOLoop first, then yield on the resulting futures in order. tornado's @gen.coroutine runs the body when called (not when awaited), so kicking off the writes up-front lets the IOLoop interleave them: fast subscribers receive their payload immediately even while a slow subscriber's write is still draining. A new regression test installs two fake subscribers with a 3 s slow write and a 0 s fast write, then asserts the fast subscriber sees its payload within 1 s of publish_payload being called. Without the fix it does not. Refs #66282 --- salt/transport/tcp.py | 23 ++-- .../tcp/test_pub_server_stability.py | 113 ++++++++++++++++++ 2 files changed, 129 insertions(+), 7 deletions(-) create mode 100644 tests/pytests/functional/transport/tcp/test_pub_server_stability.py diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 991a3d126657..b5e6e518c0f1 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -903,27 +903,36 @@ def publish_payload(self, package, topic_list=None): log.trace("TCP PubServer sending payload: %s \n\n %r", package, topic_list) payload = salt.transport.frame.frame_msg(package) to_remove = [] + # Start writes to every targeted client concurrently so a single + # slow subscriber can't stall delivery to the rest of the fleet. + # See https://github.com/saltstack/salt/issues/66282 — sequential + # ``yield client.stream.write(...)`` was clogging the event + # publisher loop, growing per-client write buffers and eventually + # wedging the master. + write_futures = [] if topic_list: for topic in topic_list: sent = False - for client in self.clients: + for client in list(self.clients): if topic == client.id_: try: - # Write the packed str - yield client.stream.write(payload) + write_futures.append((client, client.stream.write(payload))) sent = True - # self.io_loop.add_future(f, lambda f: True) except salt.ext.tornado.iostream.StreamClosedError: to_remove.append(client) if not sent: log.debug("Publish target %s not connected %r", topic, self.clients) else: - for client in self.clients: + for client in list(self.clients): try: - # Write the packed str - yield client.stream.write(payload) + write_futures.append((client, client.stream.write(payload))) except salt.ext.tornado.iostream.StreamClosedError: to_remove.append(client) + for client, future in write_futures: + try: + yield future + except salt.ext.tornado.iostream.StreamClosedError: + to_remove.append(client) for client in to_remove: log.debug( "Subscriber at %s has disconnected from publisher", client.address diff --git a/tests/pytests/functional/transport/tcp/test_pub_server_stability.py b/tests/pytests/functional/transport/tcp/test_pub_server_stability.py new file mode 100644 index 000000000000..b75e971ff27a --- /dev/null +++ b/tests/pytests/functional/transport/tcp/test_pub_server_stability.py @@ -0,0 +1,113 @@ +""" +Regression tests for salt-master 4505 publish-port stability. + +See https://github.com/saltstack/salt/issues/66282 — under prolonged load a +single slow IPC subscriber stalls the entire TCP ``PubServer`` broadcast +loop because ``publish_payload`` ``yield``s each ``client.stream.write()`` +serially. Bytes accumulate in the tornado write buffer of the slow +client, the publisher coroutine can't service the next event from the +pull socket, and the master eventually appears wedged. +""" + +import logging +import time + +import pytest + +import salt.ext.tornado.gen +import salt.ext.tornado.ioloop +import salt.transport.tcp + +log = logging.getLogger(__name__) + + +class _FakeStream: + """Mimic the bits of ``IOStream`` ``PubServer`` touches.""" + + def __init__(self, write_delay=0.0, name=""): + self.write_delay = write_delay + self.name = name + self.writes = [] + self._closed = False + self.received_at = None + + def closed(self): + return self._closed + + def close(self): + self._closed = True + + @salt.ext.tornado.gen.coroutine + def write(self, payload): + # Record receipt time *before* sleeping so the assertion measures + # the moment the broadcast loop actually reached this client, not + # the moment it finished. + self.received_at = time.monotonic() + self.writes.append(payload) + if self.write_delay: + yield salt.ext.tornado.gen.sleep(self.write_delay) + + +def _make_subscriber(stream, name): + sub = salt.transport.tcp.Subscriber(stream, name) + sub.id_ = name + return sub + + +@pytest.mark.timeout(60) +async def test_slow_subscriber_does_not_block_fast_subscriber(master_opts): + """ + A single slow subscriber must not stall delivery to other subscribers. + + Without the fix, the ``for client in self.clients: yield ...write()`` + loop in ``PubServer.publish_payload`` is serial: when the slow client + is iterated first, the fast client doesn't see its bytes until the + slow client's write completes ~3 s later. The test asserts the fast + client receives its payload within 1 s of ``publish_payload`` being + called. + """ + master_opts["transport"] = "tcp" + server = salt.transport.tcp.PubServer( + master_opts, io_loop=salt.ext.tornado.ioloop.IOLoop.current() + ) + try: + slow_stream = _FakeStream(write_delay=3.0, name="slow") + fast_stream = _FakeStream(write_delay=0.0, name="fast") + slow_sub = _make_subscriber(slow_stream, "slow") + fast_sub = _make_subscriber(fast_stream, "fast") + # ``self.clients`` is a ``set`` in production so iteration order is + # undefined. Pin the order [slow, fast] so the bug is hit + # deterministically. + server.clients = [slow_sub, fast_sub] + + start = time.monotonic() + # Don't await the broadcast — we want to observe the fast client + # *during* the slow client's write. ``publish_payload`` is a + # tornado-coroutine that returns a ``salt.ext.tornado.concurrent.Future``; + # ``convert_yielded`` makes it awaitable from an ``async def`` driven + # by the IOLoop running this test. + broadcast = salt.ext.tornado.gen.convert_yielded( + server.publish_payload({"jid": "abc"}) + ) + # Poll up to 1 s for the fast client to have received the payload. + deadline = start + 1.0 + while time.monotonic() < deadline and fast_stream.received_at is None: + await salt.ext.tornado.gen.convert_yielded(salt.ext.tornado.gen.sleep(0.05)) + elapsed = ( + (fast_stream.received_at - start) + if fast_stream.received_at is not None + else None + ) + # Let the broadcast finish so we don't leak the slow coroutine. + await broadcast + finally: + server.close() + + assert elapsed is not None, ( + "fast subscriber did not receive the payload within 1s of publish; " + "PubServer.publish_payload is broadcasting serially (#66282)" + ) + assert elapsed < 1.0, ( + f"fast subscriber waited {elapsed:.2f}s behind slow subscriber; " + "PubServer.publish_payload is broadcasting serially (#66282)" + ) From fce746e481f5b26724fc7691f688cde8fada7491 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Wed, 17 Jun 2026 23:08:02 -0700 Subject: [PATCH 2/3] Enable ZMTP heartbeats on the master PUB socket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Without ZMQ_HEARTBEAT_IVL / ZMQ_HEARTBEAT_TIMEOUT configured, the PUB socket only notices a SUB peer that vanished without sending FIN (host reboot, kernel panic, dropped firewall rule) once kernel TCP keepalive expires. On Linux that's ~2 h 15 min by default, during which the PUB socket keeps buffering for the dead peer and the kernel accumulates CLOSE_WAIT entries on port 4505. Eventually the master stops accepting new connections — a state several users have reported in issue #66282. Add a _set_zmq_heartbeat helper, default it to 10 s interval / 30 s timeout, and call it alongside _set_tcp_keepalive when the PublishServer's PUB socket is set up. Operators can tune via zmq_heartbeat_ivl / zmq_heartbeat_timeout (milliseconds, matching the unit ZMQ uses). Refs #66282 --- salt/transport/zeromq.py | 37 ++++++++ .../transport/test_zeromq_pub_stability.py | 90 +++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 tests/pytests/unit/transport/test_zeromq_pub_stability.py diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 9cb1873e8d02..f979d9d696d1 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -562,6 +562,42 @@ def _set_tcp_keepalive(zmq_socket, opts): zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, opts["tcp_keepalive_intvl"]) +# Defaults are intentionally generous: small enough to reap dead SUB peers +# within seconds (rather than the ~2h15m kernel TCP keepalive default), but +# large enough not to disrupt a healthy fleet of thousands of minions on a +# laggy network. Operators can tune via ``zmq_heartbeat_ivl`` / +# ``zmq_heartbeat_timeout`` in milliseconds. +_DEFAULT_ZMQ_HEARTBEAT_IVL = 10000 # 10s between heartbeats +_DEFAULT_ZMQ_HEARTBEAT_TIMEOUT = 30000 # 30s with no response -> peer dead + + +def _set_zmq_heartbeat(zmq_socket, opts): + """ + Enable ZMTP heartbeats on a ZeroMQ socket. + + Without ``ZMQ_HEARTBEAT_IVL`` / ``ZMQ_HEARTBEAT_TIMEOUT`` configured, + ZMQ relies on the kernel TCP keepalive to notice a peer that vanished + without sending FIN (host reboot, kernel panic, dropped firewall rule). + On Linux that's ~2h15m by default, during which the master's PUB + socket keeps buffering for the dead peer and ``netstat`` accumulates + ``CLOSE_WAIT`` entries on port 4505 — eventually the master stops + accepting new connections. See + https://github.com/saltstack/salt/issues/66282. + + Heartbeat opts are configured in milliseconds. Setting ``ivl`` or + ``timeout`` to ``0`` disables the corresponding option (matching the + ZMQ defaults). + """ + if not opts: + opts = {} + ivl = int(opts.get("zmq_heartbeat_ivl", _DEFAULT_ZMQ_HEARTBEAT_IVL)) + timeout = int(opts.get("zmq_heartbeat_timeout", _DEFAULT_ZMQ_HEARTBEAT_TIMEOUT)) + if hasattr(zmq, "HEARTBEAT_IVL"): + zmq_socket.setsockopt(zmq.HEARTBEAT_IVL, ivl) + if hasattr(zmq, "HEARTBEAT_TIMEOUT"): + zmq_socket.setsockopt(zmq.HEARTBEAT_TIMEOUT, timeout) + + # TODO: unit tests! class AsyncReqMessageClient: """ @@ -1122,6 +1158,7 @@ def publish_daemon( monitor = ZeroMQSocketMonitor(pub_sock) monitor.start_io_loop(ioloop) _set_tcp_keepalive(pub_sock, self.opts) + _set_zmq_heartbeat(pub_sock, self.opts) self.dpub_sock = pub_sock = zmq.eventloop.zmqstream.ZMQStream(pub_sock) # if 2.1 >= zmq < 3.0, we only have one HWM setting try: diff --git a/tests/pytests/unit/transport/test_zeromq_pub_stability.py b/tests/pytests/unit/transport/test_zeromq_pub_stability.py new file mode 100644 index 000000000000..928a610bdbe2 --- /dev/null +++ b/tests/pytests/unit/transport/test_zeromq_pub_stability.py @@ -0,0 +1,90 @@ +""" +Regression tests for ZMQ PUB-socket heartbeat configuration. + +See https://github.com/saltstack/salt/issues/66282 — when a SUB peer dies +without sending FIN (host reboot, kernel panic, firewall block) the master's +PUB socket only reaps that peer once kernel TCP keepalive expires, which is +~2 h 15 min on default Linux. ZMTP heartbeats (``ZMQ_HEARTBEAT_IVL`` / +``ZMQ_HEARTBEAT_TIMEOUT``) reduce that to seconds, which prevents the +``CLOSE_WAIT`` build-up and per-peer buffer growth that eventually wedges +port 4505. + +These tests pin the heartbeat options on the publisher socket. The fix +adds them via a helper invoked from ``publish_daemon``; the helper is +unit-tested here without bringing up the full daemon. +""" + +import logging + +import pytest +import zmq + +import salt.transport.zeromq + +log = logging.getLogger(__name__) + + +pytestmark = [ + pytest.mark.core_test, +] + + +@pytest.fixture +def zmq_context(): + ctx = zmq.Context() + try: + yield ctx + finally: + ctx.destroy(linger=0) + + +def test_pub_socket_has_zmtp_heartbeats_configured(zmq_context): + """ + ``salt.transport.zeromq._set_zmq_heartbeat`` must enable ZMTP + heartbeats on the PUB socket so dead subscribers are reaped in + seconds instead of hours. Without this, ``netstat`` accumulates + ``CLOSE_WAIT`` entries on port 4505 and the master eventually stops + accepting new connections (#66282). + """ + sock = zmq_context.socket(zmq.PUB) + try: + # Default ZMQ behaviour: heartbeats disabled. Sanity check that + # the test is actually verifying a non-default configuration. + assert sock.getsockopt(zmq.HEARTBEAT_IVL) == 0 + assert sock.getsockopt(zmq.HEARTBEAT_TIMEOUT) == -1 + + opts = {} + salt.transport.zeromq._set_zmq_heartbeat(sock, opts) + + ivl = sock.getsockopt(zmq.HEARTBEAT_IVL) + timeout = sock.getsockopt(zmq.HEARTBEAT_TIMEOUT) + assert ivl > 0, ( + "PUB socket has ZMTP HEARTBEAT_IVL disabled; dead SUB peers " + "will not be reaped until kernel TCP keepalive expires " + "(~2h15m). #66282" + ) + assert timeout > 0, ( + "PUB socket has ZMTP HEARTBEAT_TIMEOUT unset; dead SUB peers " + "will not be reaped until kernel TCP keepalive expires. " + "#66282" + ) + # The timeout should exceed the interval; otherwise heartbeats + # would mark live peers as dead. + assert timeout > ivl + finally: + sock.close() + + +def test_pub_socket_heartbeat_respects_opts(zmq_context): + """ + Operators must be able to tune the heartbeat interval and timeout + via ``zmq_heartbeat_ivl`` / ``zmq_heartbeat_timeout`` master config. + """ + sock = zmq_context.socket(zmq.PUB) + try: + opts = {"zmq_heartbeat_ivl": 5000, "zmq_heartbeat_timeout": 15000} + salt.transport.zeromq._set_zmq_heartbeat(sock, opts) + assert sock.getsockopt(zmq.HEARTBEAT_IVL) == 5000 + assert sock.getsockopt(zmq.HEARTBEAT_TIMEOUT) == 15000 + finally: + sock.close() From f934a3a084283c3ead8bfa150de17d3c3e1218c3 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Wed, 17 Jun 2026 23:08:18 -0700 Subject: [PATCH 3/3] Add changelog entry for #66282 Fixes #66282 --- changelog/66282.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/66282.fixed.md diff --git a/changelog/66282.fixed.md b/changelog/66282.fixed.md new file mode 100644 index 000000000000..404beae139ae --- /dev/null +++ b/changelog/66282.fixed.md @@ -0,0 +1 @@ +Fixed master 4505 publish port becoming unresponsive under load: TCP `PubServer` now broadcasts to subscribers concurrently so a single slow subscriber no longer stalls the event publisher loop, and the ZeroMQ master PUB socket now enables ZMTP heartbeats so dead subscribers are reaped within seconds instead of waiting for the kernel TCP keepalive.