diff --git a/changelog/66282.fixed.md b/changelog/66282.fixed.md new file mode 100644 index 000000000000..404beae139ae --- /dev/null +++ b/changelog/66282.fixed.md @@ -0,0 +1 @@ +Fixed master 4505 publish port becoming unresponsive under load: TCP `PubServer` now broadcasts to subscribers concurrently so a single slow subscriber no longer stalls the event publisher loop, and the ZeroMQ master PUB socket now enables ZMTP heartbeats so dead subscribers are reaped within seconds instead of waiting for the kernel TCP keepalive. diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 991a3d126657..b5e6e518c0f1 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -903,27 +903,36 @@ def publish_payload(self, package, topic_list=None): log.trace("TCP PubServer sending payload: %s \n\n %r", package, topic_list) payload = salt.transport.frame.frame_msg(package) to_remove = [] + # Start writes to every targeted client concurrently so a single + # slow subscriber can't stall delivery to the rest of the fleet. + # See https://github.com/saltstack/salt/issues/66282 — sequential + # ``yield client.stream.write(...)`` was clogging the event + # publisher loop, growing per-client write buffers and eventually + # wedging the master. + write_futures = [] if topic_list: for topic in topic_list: sent = False - for client in self.clients: + for client in list(self.clients): if topic == client.id_: try: - # Write the packed str - yield client.stream.write(payload) + write_futures.append((client, client.stream.write(payload))) sent = True - # self.io_loop.add_future(f, lambda f: True) except salt.ext.tornado.iostream.StreamClosedError: to_remove.append(client) if not sent: log.debug("Publish target %s not connected %r", topic, self.clients) else: - for client in self.clients: + for client in list(self.clients): try: - # Write the packed str - yield client.stream.write(payload) + write_futures.append((client, client.stream.write(payload))) except salt.ext.tornado.iostream.StreamClosedError: to_remove.append(client) + for client, future in write_futures: + try: + yield future + except salt.ext.tornado.iostream.StreamClosedError: + to_remove.append(client) for client in to_remove: log.debug( "Subscriber at %s has disconnected from publisher", client.address diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 9cb1873e8d02..f979d9d696d1 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -562,6 +562,42 @@ def _set_tcp_keepalive(zmq_socket, opts): zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, opts["tcp_keepalive_intvl"]) +# Defaults are intentionally generous: small enough to reap dead SUB peers +# within seconds (rather than the ~2h15m kernel TCP keepalive default), but +# large enough not to disrupt a healthy fleet of thousands of minions on a +# laggy network. Operators can tune via ``zmq_heartbeat_ivl`` / +# ``zmq_heartbeat_timeout`` in milliseconds. +_DEFAULT_ZMQ_HEARTBEAT_IVL = 10000 # 10s between heartbeats +_DEFAULT_ZMQ_HEARTBEAT_TIMEOUT = 30000 # 30s with no response -> peer dead + + +def _set_zmq_heartbeat(zmq_socket, opts): + """ + Enable ZMTP heartbeats on a ZeroMQ socket. + + Without ``ZMQ_HEARTBEAT_IVL`` / ``ZMQ_HEARTBEAT_TIMEOUT`` configured, + ZMQ relies on the kernel TCP keepalive to notice a peer that vanished + without sending FIN (host reboot, kernel panic, dropped firewall rule). + On Linux that's ~2h15m by default, during which the master's PUB + socket keeps buffering for the dead peer and ``netstat`` accumulates + ``CLOSE_WAIT`` entries on port 4505 — eventually the master stops + accepting new connections. See + https://github.com/saltstack/salt/issues/66282. + + Heartbeat opts are configured in milliseconds. Setting ``ivl`` or + ``timeout`` to ``0`` disables the corresponding option (matching the + ZMQ defaults). + """ + if not opts: + opts = {} + ivl = int(opts.get("zmq_heartbeat_ivl", _DEFAULT_ZMQ_HEARTBEAT_IVL)) + timeout = int(opts.get("zmq_heartbeat_timeout", _DEFAULT_ZMQ_HEARTBEAT_TIMEOUT)) + if hasattr(zmq, "HEARTBEAT_IVL"): + zmq_socket.setsockopt(zmq.HEARTBEAT_IVL, ivl) + if hasattr(zmq, "HEARTBEAT_TIMEOUT"): + zmq_socket.setsockopt(zmq.HEARTBEAT_TIMEOUT, timeout) + + # TODO: unit tests! class AsyncReqMessageClient: """ @@ -1122,6 +1158,7 @@ def publish_daemon( monitor = ZeroMQSocketMonitor(pub_sock) monitor.start_io_loop(ioloop) _set_tcp_keepalive(pub_sock, self.opts) + _set_zmq_heartbeat(pub_sock, self.opts) self.dpub_sock = pub_sock = zmq.eventloop.zmqstream.ZMQStream(pub_sock) # if 2.1 >= zmq < 3.0, we only have one HWM setting try: diff --git a/tests/pytests/functional/transport/tcp/test_pub_server_stability.py b/tests/pytests/functional/transport/tcp/test_pub_server_stability.py new file mode 100644 index 000000000000..b75e971ff27a --- /dev/null +++ b/tests/pytests/functional/transport/tcp/test_pub_server_stability.py @@ -0,0 +1,113 @@ +""" +Regression tests for salt-master 4505 publish-port stability. + +See https://github.com/saltstack/salt/issues/66282 — under prolonged load a +single slow IPC subscriber stalls the entire TCP ``PubServer`` broadcast +loop because ``publish_payload`` ``yield``s each ``client.stream.write()`` +serially. Bytes accumulate in the tornado write buffer of the slow +client, the publisher coroutine can't service the next event from the +pull socket, and the master eventually appears wedged. +""" + +import logging +import time + +import pytest + +import salt.ext.tornado.gen +import salt.ext.tornado.ioloop +import salt.transport.tcp + +log = logging.getLogger(__name__) + + +class _FakeStream: + """Mimic the bits of ``IOStream`` ``PubServer`` touches.""" + + def __init__(self, write_delay=0.0, name=""): + self.write_delay = write_delay + self.name = name + self.writes = [] + self._closed = False + self.received_at = None + + def closed(self): + return self._closed + + def close(self): + self._closed = True + + @salt.ext.tornado.gen.coroutine + def write(self, payload): + # Record receipt time *before* sleeping so the assertion measures + # the moment the broadcast loop actually reached this client, not + # the moment it finished. + self.received_at = time.monotonic() + self.writes.append(payload) + if self.write_delay: + yield salt.ext.tornado.gen.sleep(self.write_delay) + + +def _make_subscriber(stream, name): + sub = salt.transport.tcp.Subscriber(stream, name) + sub.id_ = name + return sub + + +@pytest.mark.timeout(60) +async def test_slow_subscriber_does_not_block_fast_subscriber(master_opts): + """ + A single slow subscriber must not stall delivery to other subscribers. + + Without the fix, the ``for client in self.clients: yield ...write()`` + loop in ``PubServer.publish_payload`` is serial: when the slow client + is iterated first, the fast client doesn't see its bytes until the + slow client's write completes ~3 s later. The test asserts the fast + client receives its payload within 1 s of ``publish_payload`` being + called. + """ + master_opts["transport"] = "tcp" + server = salt.transport.tcp.PubServer( + master_opts, io_loop=salt.ext.tornado.ioloop.IOLoop.current() + ) + try: + slow_stream = _FakeStream(write_delay=3.0, name="slow") + fast_stream = _FakeStream(write_delay=0.0, name="fast") + slow_sub = _make_subscriber(slow_stream, "slow") + fast_sub = _make_subscriber(fast_stream, "fast") + # ``self.clients`` is a ``set`` in production so iteration order is + # undefined. Pin the order [slow, fast] so the bug is hit + # deterministically. + server.clients = [slow_sub, fast_sub] + + start = time.monotonic() + # Don't await the broadcast — we want to observe the fast client + # *during* the slow client's write. ``publish_payload`` is a + # tornado-coroutine that returns a ``salt.ext.tornado.concurrent.Future``; + # ``convert_yielded`` makes it awaitable from an ``async def`` driven + # by the IOLoop running this test. + broadcast = salt.ext.tornado.gen.convert_yielded( + server.publish_payload({"jid": "abc"}) + ) + # Poll up to 1 s for the fast client to have received the payload. + deadline = start + 1.0 + while time.monotonic() < deadline and fast_stream.received_at is None: + await salt.ext.tornado.gen.convert_yielded(salt.ext.tornado.gen.sleep(0.05)) + elapsed = ( + (fast_stream.received_at - start) + if fast_stream.received_at is not None + else None + ) + # Let the broadcast finish so we don't leak the slow coroutine. + await broadcast + finally: + server.close() + + assert elapsed is not None, ( + "fast subscriber did not receive the payload within 1s of publish; " + "PubServer.publish_payload is broadcasting serially (#66282)" + ) + assert elapsed < 1.0, ( + f"fast subscriber waited {elapsed:.2f}s behind slow subscriber; " + "PubServer.publish_payload is broadcasting serially (#66282)" + ) diff --git a/tests/pytests/unit/transport/test_zeromq_pub_stability.py b/tests/pytests/unit/transport/test_zeromq_pub_stability.py new file mode 100644 index 000000000000..928a610bdbe2 --- /dev/null +++ b/tests/pytests/unit/transport/test_zeromq_pub_stability.py @@ -0,0 +1,90 @@ +""" +Regression tests for ZMQ PUB-socket heartbeat configuration. + +See https://github.com/saltstack/salt/issues/66282 — when a SUB peer dies +without sending FIN (host reboot, kernel panic, firewall block) the master's +PUB socket only reaps that peer once kernel TCP keepalive expires, which is +~2 h 15 min on default Linux. ZMTP heartbeats (``ZMQ_HEARTBEAT_IVL`` / +``ZMQ_HEARTBEAT_TIMEOUT``) reduce that to seconds, which prevents the +``CLOSE_WAIT`` build-up and per-peer buffer growth that eventually wedges +port 4505. + +These tests pin the heartbeat options on the publisher socket. The fix +adds them via a helper invoked from ``publish_daemon``; the helper is +unit-tested here without bringing up the full daemon. +""" + +import logging + +import pytest +import zmq + +import salt.transport.zeromq + +log = logging.getLogger(__name__) + + +pytestmark = [ + pytest.mark.core_test, +] + + +@pytest.fixture +def zmq_context(): + ctx = zmq.Context() + try: + yield ctx + finally: + ctx.destroy(linger=0) + + +def test_pub_socket_has_zmtp_heartbeats_configured(zmq_context): + """ + ``salt.transport.zeromq._set_zmq_heartbeat`` must enable ZMTP + heartbeats on the PUB socket so dead subscribers are reaped in + seconds instead of hours. Without this, ``netstat`` accumulates + ``CLOSE_WAIT`` entries on port 4505 and the master eventually stops + accepting new connections (#66282). + """ + sock = zmq_context.socket(zmq.PUB) + try: + # Default ZMQ behaviour: heartbeats disabled. Sanity check that + # the test is actually verifying a non-default configuration. + assert sock.getsockopt(zmq.HEARTBEAT_IVL) == 0 + assert sock.getsockopt(zmq.HEARTBEAT_TIMEOUT) == -1 + + opts = {} + salt.transport.zeromq._set_zmq_heartbeat(sock, opts) + + ivl = sock.getsockopt(zmq.HEARTBEAT_IVL) + timeout = sock.getsockopt(zmq.HEARTBEAT_TIMEOUT) + assert ivl > 0, ( + "PUB socket has ZMTP HEARTBEAT_IVL disabled; dead SUB peers " + "will not be reaped until kernel TCP keepalive expires " + "(~2h15m). #66282" + ) + assert timeout > 0, ( + "PUB socket has ZMTP HEARTBEAT_TIMEOUT unset; dead SUB peers " + "will not be reaped until kernel TCP keepalive expires. " + "#66282" + ) + # The timeout should exceed the interval; otherwise heartbeats + # would mark live peers as dead. + assert timeout > ivl + finally: + sock.close() + + +def test_pub_socket_heartbeat_respects_opts(zmq_context): + """ + Operators must be able to tune the heartbeat interval and timeout + via ``zmq_heartbeat_ivl`` / ``zmq_heartbeat_timeout`` master config. + """ + sock = zmq_context.socket(zmq.PUB) + try: + opts = {"zmq_heartbeat_ivl": 5000, "zmq_heartbeat_timeout": 15000} + salt.transport.zeromq._set_zmq_heartbeat(sock, opts) + assert sock.getsockopt(zmq.HEARTBEAT_IVL) == 5000 + assert sock.getsockopt(zmq.HEARTBEAT_TIMEOUT) == 15000 + finally: + sock.close()