diff --git a/.changeset/poll-wait-status-monitor.md b/.changeset/poll-wait-status-monitor.md new file mode 100644 index 0000000000..77fd39d75f --- /dev/null +++ b/.changeset/poll-wait-status-monitor.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Replace the mailbox-based `StatusMonitor.wait_until/3` not-ready path with adaptive per-process polling once the StatusMonitor's waiter set crosses a congestion threshold. Uncongested callers continue to use the existing `GenServer.call` for low-latency wakeup; congested callers switch to `Electric.PollWait.until/3` against `service_status/1`, bounding StatusMonitor mailbox growth to the threshold during cold-start bursts. diff --git a/packages/sync-service/lib/electric/poll_wait.ex b/packages/sync-service/lib/electric/poll_wait.ex new file mode 100644 index 0000000000..bc6bc72539 --- /dev/null +++ b/packages/sync-service/lib/electric/poll_wait.ex @@ -0,0 +1,70 @@ +defmodule Electric.PollWait do + @moduledoc """ + Per-process bounded polling of a cheap (ETS-backed) condition. + + `until/3` sleeps between checks with exponential backoff (doubling, capped) + and bounded jitter so concurrent waiters land on distinct ETS reads + instead of stampeding the same millisecond window. + + All defaults can be overridden per-call so the primitive can be shared + between consumers with very different latency profiles. + """ + + @default_initial_interval 25 + @default_max_interval 500 + @default_backoff 2.0 + @default_jitter 0.25 + + @type ready :: :ready | {:ready, term()} + @type check :: (-> ready | :not_ready) + + @spec until(check, timeout(), keyword()) :: ready | :timeout + def until(check_fun, timeout, opts \\ []) when is_function(check_fun, 0) do + initial = Keyword.get(opts, :initial_interval, @default_initial_interval) + max = Keyword.get(opts, :max_interval, @default_max_interval) + factor = Keyword.get(opts, :backoff, @default_backoff) + jitter = Keyword.get(opts, :jitter, @default_jitter) + + do_until(check_fun, deadline(timeout), initial, max, factor, jitter) + end + + defp deadline(:infinity), do: :infinity + + defp deadline(t) when is_integer(t) and t >= 0, + do: System.monotonic_time(:millisecond) + t + + defp do_until(check_fun, deadline, interval, max, factor, jitter) do + case check_fun.() do + :not_ready -> + case remaining(deadline) do + 0 -> + :timeout + + rem -> + sleep_for = min(jittered(interval, jitter), rem) + Process.sleep(sleep_for) + + next_interval = min(round(interval * factor), max) + do_until(check_fun, deadline, next_interval, max, factor, jitter) + end + + ready -> + ready + end + end + + # Returns interval ± jitter*interval, clamped to >= 10ms so we never busy-loop. + defp jittered(interval, jitter) when jitter <= 0.0, do: max(10, interval) + + defp jittered(interval, jitter) do + spread = max(1, round(interval * jitter)) + # :rand.uniform(N) returns 1..N. Centre around 0 by subtracting spread+1. + offset = :rand.uniform(2 * spread + 1) - spread - 1 + max(1, interval + offset) + end + + defp remaining(:infinity), do: :infinity + + defp remaining(deadline), + do: max(0, deadline - System.monotonic_time(:millisecond)) +end diff --git a/packages/sync-service/lib/electric/status_monitor.ex b/packages/sync-service/lib/electric/status_monitor.ex index 1189a413da..cf726eb34f 100644 --- a/packages/sync-service/lib/electric/status_monitor.ex +++ b/packages/sync-service/lib/electric/status_monitor.ex @@ -4,6 +4,8 @@ defmodule Electric.StatusMonitor do require Logger + alias Electric.PollWait + @type status() :: %{ conn: :waiting_on_lock | :starting | :up | :sleeping, shape: :starting | :read_only | :up @@ -23,6 +25,8 @@ defmodule Electric.StatusMonitor do @default_results for condition <- @conditions, into: %{}, do: {condition, {false, %{}}} @db_state_key :db_state + @congested_key :waiters_congested + @congested_threshold 1000 @spin_prevention_delay 10 def start_link(opts) do @@ -57,6 +61,25 @@ defmodule Electric.StatusMonitor do end end + @doc """ + Returns true once the StatusMonitor process has accumulated `congested_threshold/0` waiters + The flag is cleared once the set of waiters drains back to 0. + + Used by callers to decide between a `GenServer.call` wait (low latency + when uncontended) and a `PollWait.until/3` wait (bounded mailbox growth + under burst). + """ + @spec congested?(String.t()) :: boolean() + def congested?(stack_id) do + :ets.lookup_element(ets_table(stack_id), @congested_key, 2, false) + rescue + ArgumentError -> false + end + + @doc false + @spec congested_threshold() :: pos_integer() + def congested_threshold, do: @congested_threshold + @spec status(String.t()) :: status() def status(stack_id) do table = ets_table(stack_id) @@ -185,69 +208,82 @@ defmodule Electric.StatusMonitor do :sleeping -> if Keyword.get(opts, :block_on_conn_sleeping, false) do - do_wait_until(stack_id, level, opts) + dispatch_wait(stack_id, level, opts) else :conn_sleeping end _ -> - do_wait_until(stack_id, level, opts) + dispatch_wait(stack_id, level, opts) + end + end + + defp dispatch_wait(stack_id, level, opts) do + if congested?(stack_id) do + poll_wait(stack_id, level, opts) + else + do_wait_until(stack_id, level, opts) end end defp do_wait_until(stack_id, level, opts) do timeout = Keyword.fetch!(opts, :timeout) - try do - case stack_id |> name() |> GenServer.whereis() do - nil -> - maybe_retry_wait_until( - stack_id, - level, - opts, - timeout, - "Status monitor not found for stack ID: #{stack_id}" - ) - - pid when is_pid(pid) -> - GenServer.call(pid, {:wait_until, level, timeout}, :infinity) + check_fn = fn -> + case monitor_lookup(stack_id) do + {:ready, pid} -> + try do + {:ready, GenServer.call(pid, {:wait_until, level, timeout}, :infinity)} + catch + :exit, _reason -> :not_ready + end + + _ -> + :not_ready end - rescue - ArgumentError -> - maybe_retry_wait_until( - stack_id, - level, - opts, - timeout, - "Stack ID not recognised: #{stack_id}" - ) - catch - :exit, _reason -> - maybe_retry_wait_until( - stack_id, - level, - opts, - timeout, - "Stack #{inspect(stack_id)} has terminated" - ) end - end - defp maybe_retry_wait_until(_stack_id, _level, _opts, timeout, last_error) - when timeout <= @spin_prevention_delay do - {:error, last_error} + case PollWait.until(check_fn, timeout, + initial_interval: @spin_prevention_delay, + max_interval: @spin_prevention_delay, + jitter: 0.0 + ) do + {:ready, result} -> result + :timeout -> {:error, monitor_unavailable_reason(stack_id)} + end end - defp maybe_retry_wait_until(stack_id, level, opts, timeout, _) do - Process.sleep(@spin_prevention_delay) + defp poll_wait(stack_id, level, opts) do + timeout = Keyword.fetch!(opts, :timeout) - remaining_timeout = - case timeout do - :infinity -> :infinity - _ -> timeout - @spin_prevention_delay + check_fn = fn -> + case check_level(level, stack_id) do + {:ok, _} = ready -> {:ready, ready} + :not_ready -> :not_ready end + end + + case PollWait.until(check_fn, timeout) do + {:ready, value} -> value + :timeout -> {:error, timeout_message(stack_id)} + end + end - wait_until(stack_id, level, Keyword.put(opts, :timeout, remaining_timeout)) + defp monitor_lookup(stack_id) do + case stack_id |> name() |> GenServer.whereis() do + nil -> :monitor_not_found + pid when is_pid(pid) -> {:ready, pid} + end + rescue + ArgumentError -> :registry_not_found + end + + defp monitor_unavailable_reason(stack_id) do + case monitor_lookup(stack_id) do + :monitor_not_found -> "Status monitor not found for stack ID: #{stack_id}" + :registry_not_found -> "Stack ID not recognised: #{stack_id}" + {:ready, _} -> "Stack #{inspect(stack_id)} has terminated" + end end @doc "Convenience wrapper: wait until fully active. Returns `:ok` on success." @@ -313,6 +349,16 @@ defmodule Electric.StatusMonitor do {:noreply, state} end + # Test-only: writes the congestion flag directly. The flag is normally + # set/cleared by the GenServer in response to waiter-set transitions; this + # cast lets tests force the polling branch without first enqueuing the + # threshold's worth of real waiters. + @doc false + def handle_cast({:set_congested_flag_for_test, value}, state) when is_boolean(value) do + :ets.insert(ets_table(state.stack_id), {@congested_key, value}) + {:noreply, state} + end + def handle_call({:wait_until, level, timeout}, from, %{waiters: waiters} = state) do case check_level(level, state.stack_id) do {:ok, _} = reply -> @@ -323,7 +369,10 @@ defmodule Electric.StatusMonitor do Process.send_after(self(), {:timeout_waiter, {from, level}}, timeout) end - {:noreply, %{state | waiters: MapSet.put(waiters, {from, level})}} + new_waiters = MapSet.put(waiters, {from, level}) + maybe_set_congested(state.stack_id, MapSet.size(new_waiters)) + + {:noreply, %{state | waiters: new_waiters}} end end @@ -352,7 +401,9 @@ defmodule Electric.StatusMonitor do def handle_info({:timeout_waiter, {from, _level} = waiter}, state) do if MapSet.member?(state.waiters, waiter) do GenServer.reply(from, {:error, timeout_message(state.stack_id)}) - {:noreply, %{state | waiters: MapSet.delete(state.waiters, waiter)}} + new_waiters = MapSet.delete(state.waiters, waiter) + maybe_clear_congested(state.stack_id, MapSet.size(new_waiters)) + {:noreply, %{state | waiters: new_waiters}} else {:noreply, state} end @@ -375,9 +426,24 @@ defmodule Electric.StatusMonitor do end end) + maybe_clear_congested(state.stack_id, MapSet.size(waiters)) %{state | waiters: waiters} end + defp maybe_set_congested(stack_id, size) when size >= @congested_threshold do + :ets.insert(ets_table(stack_id), {@congested_key, true}) + :ok + end + + defp maybe_set_congested(_stack_id, _size), do: :ok + + defp maybe_clear_congested(stack_id, 0) do + :ets.insert(ets_table(stack_id), {@congested_key, false}) + :ok + end + + defp maybe_clear_congested(_stack_id, _size), do: :ok + defp db_state(table) do :ets.lookup_element(table, @db_state_key, 2, :up) rescue diff --git a/packages/sync-service/test/electric/poll_wait_test.exs b/packages/sync-service/test/electric/poll_wait_test.exs new file mode 100644 index 0000000000..93cb83a806 --- /dev/null +++ b/packages/sync-service/test/electric/poll_wait_test.exs @@ -0,0 +1,111 @@ +defmodule Electric.PollWaitTest do + use ExUnit.Case, async: true + + alias Electric.PollWait + + describe "until/3" do + test "returns :ready immediately when the predicate is ready on first check" do + assert PollWait.until(fn -> :ready end, 1_000) == :ready + end + + test "returns {:ready, value} when the predicate yields a tagged ready" do + assert PollWait.until(fn -> {:ready, :foo} end, 1_000) == {:ready, :foo} + end + + test "returns :timeout when the predicate never becomes ready" do + # Use a tiny timeout so the test is cheap. Initial interval defaults + # to 25ms so 50ms is enough to guarantee at least one sleep. + assert PollWait.until(fn -> :not_ready end, 50) == :timeout + end + + test "stops polling once the deadline elapses, even mid-sleep" do + counter = :counters.new(1, [:atomics]) + + check = fn -> + :counters.add(counter, 1, 1) + :not_ready + end + + assert PollWait.until(check, 75, initial_interval: 25, max_interval: 25, jitter: 0.0) == + :timeout + + # 0ms check, sleep 25, 25ms check, sleep 25, 50ms check, sleep 25, 75ms deadline. + # The load-bearing assertion is the upper bound (<=4): the loop must be + # bounded by the deadline. The lower bound is loosened to 2 because a + # slow CI runner can overrun a single 25ms sleep and collapse the trace. + count = :counters.get(counter, 1) + assert count in 2..4, "expected 2..4 checks, got #{count}" + end + + test "respects per-call backoff opts (no global defaults baked in)" do + timestamps = :ets.new(:ts, [:public, :ordered_set]) + + check = fn -> + :ets.insert(timestamps, {System.monotonic_time(:millisecond), :tick}) + :not_ready + end + + _ = + PollWait.until(check, 300, + initial_interval: 5, + max_interval: 20, + backoff: 2.0, + jitter: 0.0 + ) + + ts = timestamps |> :ets.tab2list() |> Enum.map(&elem(&1, 0)) |> Enum.sort() + diffs = ts |> Enum.chunk_every(2, 1, :discard) |> Enum.map(fn [a, b] -> b - a end) + + # With 0 jitter and a 2.0 factor: 5, 10, 20, 20, 20, ... ms between checks. + [d1, d2, d3 | _] = diffs + + # Each measured delta must be at least the configured sleep (minus 1ms slop + # for monotonic-time resolution). + assert d1 >= 4, "expected d1 >= 4ms (configured 5), got #{d1}" + assert d2 >= 8, "expected d2 >= 8ms (configured 10), got #{d2}" + assert d3 >= 15, "expected d3 >= 15ms (configured 20), got #{d3}" + + # And growth must be monotonic: each subsequent interval is at least the + # previous one (within scheduler slop). This proves the backoff factor is + # being applied, regardless of how slow the CI runner is. + assert d2 >= d1, "expected d2 >= d1, got d1=#{d1}, d2=#{d2}" + assert d3 >= d2, "expected d3 >= d2, got d2=#{d2}, d3=#{d3}" + :ets.delete(timestamps) + end + + test "jitter never produces negative or zero sleeps" do + # Drive 200 jittered intervals at the minimum interval (1) with max jitter + # and ensure none of them blow up or return ready spuriously. + check = fn -> :not_ready end + + assert PollWait.until(check, 5, initial_interval: 1, max_interval: 1, jitter: 1.0) == + :timeout + end + + test ":infinity timeout never returns :timeout but yields when ready" do + parent = self() + + task = + Task.async(fn -> + PollWait.until( + fn -> + receive do + :go -> :ready + after + 0 -> :not_ready + end + end, + :infinity, + initial_interval: 5, + max_interval: 5, + jitter: 0.0 + ) + |> tap(fn r -> send(parent, {:result, r}) end) + end) + + Process.sleep(20) + send(task.pid, :go) + assert_receive {:result, :ready}, 200 + end + end +end diff --git a/packages/sync-service/test/electric/status_monitor_test.exs b/packages/sync-service/test/electric/status_monitor_test.exs index d4c94a7b17..3eaf6bd217 100644 --- a/packages/sync-service/test/electric/status_monitor_test.exs +++ b/packages/sync-service/test/electric/status_monitor_test.exs @@ -573,4 +573,178 @@ defmodule Electric.StatusMonitorTest do assert_receive {{StatusMonitor, ^ref_active}, {:ok, :active}}, 100 end end + + describe "congested?/1" do + test "returns false before any waiters are enqueued", %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + assert StatusMonitor.congested?(stack_id) == false + end + + test "returns false when the table does not exist (status monitor not started)", + %{stack_id: stack_id} do + assert StatusMonitor.congested?(stack_id) == false + end + + test "flips to true once the waiter set reaches the threshold and back to false on drain", + %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + + threshold = StatusMonitor.congested_threshold() + pid = GenServer.whereis(StatusMonitor.name(stack_id)) + + # Spawn `threshold` waiters that block on :active. + waiters = + for _ <- 1..threshold do + Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 5_000) end) + end + + # Wait deterministically until all `threshold` calls have landed in the + # StatusMonitor's state — `Task.async` doesn't guarantee the spawned task + # has executed its `GenServer.call`, so a simple `wait_for_messages_to_be_processed` + # could race ahead of the tasks. + wait_until_waiters_count(pid, threshold) + + assert StatusMonitor.congested?(stack_id) == true + + # Drive readiness to drain all waiters. + Support.TestUtils.set_status_to_active(%{stack_id: stack_id}) + Enum.each(waiters, &Task.await/1) + + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + assert StatusMonitor.congested?(stack_id) == false + end + + test "flips back to false when waiters drain via :timeout_waiter rather than readiness", + %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + pid = GenServer.whereis(StatusMonitor.name(stack_id)) + + # Enqueue a real waiter on the GenServer.call path (flag is still false + # here, so the call path is taken and the waiter lands in state.waiters). + task = Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 50) end) + wait_until_waiters_count(pid, 1) + + # Now force the flag on. When the waiter's deadline fires, the + # :timeout_waiter handler removes it from state.waiters (size → 0) and + # calls maybe_clear_congested/2, which is what we're testing. + force_congested(stack_id) + assert StatusMonitor.congested?(stack_id) == true + + assert {:error, _} = Task.await(task, 1_000) + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + assert StatusMonitor.congested?(stack_id) == false + end + + test "does not set the flag below the threshold", %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + + _ = + Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 50) end) + + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + assert StatusMonitor.congested?(stack_id) == false + end + + defp wait_until_waiters_count( + pid, + expected, + deadline \\ System.monotonic_time(:millisecond) + 2_000 + ) do + size = MapSet.size(:sys.get_state(pid).waiters) + + cond do + size >= expected -> + :ok + + System.monotonic_time(:millisecond) > deadline -> + flunk("Timed out waiting for #{expected} waiters; saw #{size}") + + true -> + Process.sleep(5) + wait_until_waiters_count(pid, expected, deadline) + end + end + end + + describe "wait_until/3 under congestion" do + test "polling path returns {:ok, :active} when readiness flips", %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + + # Force the flag on so the next caller takes the polling path. + force_congested(stack_id) + + test_process = self() + + Task.async(fn -> + result = StatusMonitor.wait_until(stack_id, :active, timeout: 1_000) + send(test_process, {:result, result}) + end) + + refute_receive {:result, _}, 50 + Support.TestUtils.set_status_to_active(%{stack_id: stack_id}) + assert_receive {:result, {:ok, :active}}, 1_000 + end + + test "polling path returns {:ok, :read_only} when only metadata becomes ready", + %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + force_congested(stack_id) + + test_process = self() + + Task.async(fn -> + result = StatusMonitor.wait_until(stack_id, :read_only, timeout: 1_000) + send(test_process, {:result, result}) + end) + + refute_receive {:result, _}, 50 + StatusMonitor.mark_shape_metadata_ready(stack_id, self()) + assert_receive {:result, {:ok, :read_only}}, 1_000 + end + + test "polling path returns {:error, _} on timeout", %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + force_congested(stack_id) + + assert {:error, message} = + StatusMonitor.wait_until(stack_id, :active, timeout: 50) + + assert message =~ "Postgres lock" + end + + test "sleeping branch short-circuits before the polling check (flag set + sleeping, not blocking)", + %{stack_id: stack_id} do + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + force_congested(stack_id) + + StatusMonitor.database_connections_going_to_sleep(stack_id) + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + + # The outer case in wait_until/3 handles sleeping before consulting + # congested?/1, so :conn_sleeping is returned regardless of the flag. + assert StatusMonitor.wait_until(stack_id, :active, timeout: 50) == :conn_sleeping + end + + test "uncongested callers continue to use the GenServer.call path", %{stack_id: stack_id} do + # Indirect check: the GenServer.call path enqueues into state.waiters. We + # confirm a single uncongested caller doesn't flip the flag — proving it + # took the call path, not the polling path (polling never touches + # state.waiters). + start_link_supervised!({StatusMonitor, stack_id: stack_id}) + + task = Task.async(fn -> StatusMonitor.wait_until(stack_id, :active, timeout: 50) end) + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + + assert StatusMonitor.congested?(stack_id) == false + # Drain the timeout waiter so the test exits cleanly. + assert {:error, _} = Task.await(task, 1_000) + end + + defp force_congested(stack_id) do + GenServer.cast(StatusMonitor.name(stack_id), {:set_congested_flag_for_test, true}) + # Round-trip a call to ensure the cast has been processed before the + # caller reads the flag. + StatusMonitor.wait_for_messages_to_be_processed(stack_id) + end + end end