Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/poll-wait-status-monitor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Replace the mailbox-based `StatusMonitor.wait_until/3` not-ready path with adaptive per-process polling once the StatusMonitor's waiter set crosses a congestion threshold. Uncongested callers continue to use the existing `GenServer.call` for low-latency wakeup; congested callers switch to `Electric.PollWait.until/3` against `service_status/1`, bounding StatusMonitor mailbox growth to the threshold during cold-start bursts.
70 changes: 70 additions & 0 deletions packages/sync-service/lib/electric/poll_wait.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule Electric.PollWait do
@moduledoc """
Per-process bounded polling of a cheap (ETS-backed) condition.

`until/3` sleeps between checks with exponential backoff (doubling, capped)
and bounded jitter so concurrent waiters land on distinct ETS reads
instead of stampeding the same millisecond window.

All defaults can be overridden per-call so the primitive can be shared
between consumers with very different latency profiles.
"""

@default_initial_interval 25
@default_max_interval 500
@default_backoff 2.0
@default_jitter 0.25

@type ready :: :ready | {:ready, term()}
@type check :: (-> ready | :not_ready)

@spec until(check, timeout(), keyword()) :: ready | :timeout
def until(check_fun, timeout, opts \\ []) when is_function(check_fun, 0) do
initial = Keyword.get(opts, :initial_interval, @default_initial_interval)
max = Keyword.get(opts, :max_interval, @default_max_interval)
factor = Keyword.get(opts, :backoff, @default_backoff)
jitter = Keyword.get(opts, :jitter, @default_jitter)

do_until(check_fun, deadline(timeout), initial, max, factor, jitter)
end

defp deadline(:infinity), do: :infinity

defp deadline(t) when is_integer(t) and t >= 0,
do: System.monotonic_time(:millisecond) + t

defp do_until(check_fun, deadline, interval, max, factor, jitter) do
case check_fun.() do
:not_ready ->
case remaining(deadline) do
0 ->
:timeout

rem ->
sleep_for = min(jittered(interval, jitter), rem)
Process.sleep(sleep_for)

next_interval = min(round(interval * factor), max)
do_until(check_fun, deadline, next_interval, max, factor, jitter)
end

ready ->
ready
end
end

# Returns interval ± jitter*interval, clamped to >= 10ms so we never busy-loop.
defp jittered(interval, jitter) when jitter <= 0.0, do: max(10, interval)

defp jittered(interval, jitter) do
spread = max(1, round(interval * jitter))
# :rand.uniform(N) returns 1..N. Centre around 0 by subtracting spread+1.
offset = :rand.uniform(2 * spread + 1) - spread - 1
max(1, interval + offset)
end

defp remaining(:infinity), do: :infinity

defp remaining(deadline),
do: max(0, deadline - System.monotonic_time(:millisecond))
end
158 changes: 112 additions & 46 deletions packages/sync-service/lib/electric/status_monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Electric.StatusMonitor do

require Logger

alias Electric.PollWait

@type status() :: %{
conn: :waiting_on_lock | :starting | :up | :sleeping,
shape: :starting | :read_only | :up
Expand All @@ -23,6 +25,8 @@ defmodule Electric.StatusMonitor do
@default_results for condition <- @conditions, into: %{}, do: {condition, {false, %{}}}

@db_state_key :db_state
@congested_key :waiters_congested
@congested_threshold 1000
@spin_prevention_delay 10

def start_link(opts) do
Expand Down Expand Up @@ -57,6 +61,25 @@ defmodule Electric.StatusMonitor do
end
end

@doc """
Returns true once the StatusMonitor process has accumulated `congested_threshold/0` waiters
The flag is cleared once the set of waiters drains back to 0.

Used by callers to decide between a `GenServer.call` wait (low latency
when uncontended) and a `PollWait.until/3` wait (bounded mailbox growth
under burst).
"""
@spec congested?(String.t()) :: boolean()
def congested?(stack_id) do
:ets.lookup_element(ets_table(stack_id), @congested_key, 2, false)
rescue
ArgumentError -> false
end

@doc false
@spec congested_threshold() :: pos_integer()
def congested_threshold, do: @congested_threshold

@spec status(String.t()) :: status()
def status(stack_id) do
table = ets_table(stack_id)
Expand Down Expand Up @@ -185,69 +208,82 @@ defmodule Electric.StatusMonitor do

:sleeping ->
if Keyword.get(opts, :block_on_conn_sleeping, false) do
do_wait_until(stack_id, level, opts)
dispatch_wait(stack_id, level, opts)
else
:conn_sleeping
end

_ ->
do_wait_until(stack_id, level, opts)
dispatch_wait(stack_id, level, opts)
end
end

defp dispatch_wait(stack_id, level, opts) do
if congested?(stack_id) do
poll_wait(stack_id, level, opts)
else
do_wait_until(stack_id, level, opts)
end
end

defp do_wait_until(stack_id, level, opts) do
timeout = Keyword.fetch!(opts, :timeout)

try do
case stack_id |> name() |> GenServer.whereis() do
nil ->
maybe_retry_wait_until(
stack_id,
level,
opts,
timeout,
"Status monitor not found for stack ID: #{stack_id}"
)

pid when is_pid(pid) ->
GenServer.call(pid, {:wait_until, level, timeout}, :infinity)
check_fn = fn ->
case monitor_lookup(stack_id) do
{:ready, pid} ->
try do
{:ready, GenServer.call(pid, {:wait_until, level, timeout}, :infinity)}
catch
:exit, _reason -> :not_ready
end

_ ->
:not_ready
end
rescue
ArgumentError ->
maybe_retry_wait_until(
stack_id,
level,
opts,
timeout,
"Stack ID not recognised: #{stack_id}"
)
catch
:exit, _reason ->
maybe_retry_wait_until(
stack_id,
level,
opts,
timeout,
"Stack #{inspect(stack_id)} has terminated"
)
end
end

defp maybe_retry_wait_until(_stack_id, _level, _opts, timeout, last_error)
when timeout <= @spin_prevention_delay do
{:error, last_error}
case PollWait.until(check_fn, timeout,
initial_interval: @spin_prevention_delay,
max_interval: @spin_prevention_delay,
jitter: 0.0
) do
{:ready, result} -> result
:timeout -> {:error, monitor_unavailable_reason(stack_id)}
end
end

defp maybe_retry_wait_until(stack_id, level, opts, timeout, _) do
Process.sleep(@spin_prevention_delay)
defp poll_wait(stack_id, level, opts) do
timeout = Keyword.fetch!(opts, :timeout)

remaining_timeout =
case timeout do
:infinity -> :infinity
_ -> timeout - @spin_prevention_delay
check_fn = fn ->
case check_level(level, stack_id) do
{:ok, _} = ready -> {:ready, ready}
:not_ready -> :not_ready
end
end

case PollWait.until(check_fn, timeout) do
{:ready, value} -> value
:timeout -> {:error, timeout_message(stack_id)}
end
end

wait_until(stack_id, level, Keyword.put(opts, :timeout, remaining_timeout))
defp monitor_lookup(stack_id) do
case stack_id |> name() |> GenServer.whereis() do
nil -> :monitor_not_found
pid when is_pid(pid) -> {:ready, pid}
end
rescue
ArgumentError -> :registry_not_found
end

defp monitor_unavailable_reason(stack_id) do
case monitor_lookup(stack_id) do
:monitor_not_found -> "Status monitor not found for stack ID: #{stack_id}"
:registry_not_found -> "Stack ID not recognised: #{stack_id}"
{:ready, _} -> "Stack #{inspect(stack_id)} has terminated"
end
end

@doc "Convenience wrapper: wait until fully active. Returns `:ok` on success."
Expand Down Expand Up @@ -313,6 +349,16 @@ defmodule Electric.StatusMonitor do
{:noreply, state}
end

# Test-only: writes the congestion flag directly. The flag is normally
# set/cleared by the GenServer in response to waiter-set transitions; this
# cast lets tests force the polling branch without first enqueuing the
# threshold's worth of real waiters.
@doc false
def handle_cast({:set_congested_flag_for_test, value}, state) when is_boolean(value) do
:ets.insert(ets_table(state.stack_id), {@congested_key, value})
{:noreply, state}
end

def handle_call({:wait_until, level, timeout}, from, %{waiters: waiters} = state) do
case check_level(level, state.stack_id) do
{:ok, _} = reply ->
Expand All @@ -323,7 +369,10 @@ defmodule Electric.StatusMonitor do
Process.send_after(self(), {:timeout_waiter, {from, level}}, timeout)
end

{:noreply, %{state | waiters: MapSet.put(waiters, {from, level})}}
new_waiters = MapSet.put(waiters, {from, level})
maybe_set_congested(state.stack_id, MapSet.size(new_waiters))

{:noreply, %{state | waiters: new_waiters}}
end
end

Expand Down Expand Up @@ -352,7 +401,9 @@ defmodule Electric.StatusMonitor do
def handle_info({:timeout_waiter, {from, _level} = waiter}, state) do
if MapSet.member?(state.waiters, waiter) do
GenServer.reply(from, {:error, timeout_message(state.stack_id)})
{:noreply, %{state | waiters: MapSet.delete(state.waiters, waiter)}}
new_waiters = MapSet.delete(state.waiters, waiter)
maybe_clear_congested(state.stack_id, MapSet.size(new_waiters))
{:noreply, %{state | waiters: new_waiters}}
else
{:noreply, state}
end
Expand All @@ -375,9 +426,24 @@ defmodule Electric.StatusMonitor do
end
end)

maybe_clear_congested(state.stack_id, MapSet.size(waiters))
%{state | waiters: waiters}
end

defp maybe_set_congested(stack_id, size) when size >= @congested_threshold do
:ets.insert(ets_table(stack_id), {@congested_key, true})
:ok
end

defp maybe_set_congested(_stack_id, _size), do: :ok

defp maybe_clear_congested(stack_id, 0) do
:ets.insert(ets_table(stack_id), {@congested_key, false})
:ok
end

defp maybe_clear_congested(_stack_id, _size), do: :ok

defp db_state(table) do
:ets.lookup_element(table, @db_state_key, 2, :up)
rescue
Expand Down
Loading
Loading