From e1a129edffe86553c371363460664353b9067147 Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Fri, 22 May 2026 17:35:00 +0200 Subject: [PATCH 1/7] feat(electric-telemetry): add process_subtype attribute Adds a new low-cardinality `process_subtype` attribute alongside the existing `process_type` on all telemetry events that today carry it (`vm.monitor.long_{gc,schedule,message_queue}`, `process.memory`, `process.bin_memory`). For the three coarse `process_type` buckets that previously hid most of the signal during overload, `process_subtype` is derived as: * `:supervisor` -> registered name, else first atom in $ancestors * `:erlang` -> registered name, else initial_call MFA string * `:logger_olp` -> registered name (handler id) For every other `process_type` value, `process_subtype` is `nil`. The existing `process_type` taxonomy is unchanged, so Honeycomb boards and alerts that group by it continue to work; `process_subtype` adds a finer-grained drill-down without exploding cardinality. Refs electric-sql/alco-agent-tasks#46. --- .changeset/process-subtype-attribute.md | 5 + .../telemetry/application_telemetry.ex | 31 ++++-- .../lib/electric/telemetry/processes.ex | 100 ++++++++++++++++- .../lib/electric/telemetry/system_monitor.ex | 46 +++++--- .../electric/telemetry/processes_test.exs | 104 ++++++++++++++++++ 5 files changed, 255 insertions(+), 31 deletions(-) create mode 100644 .changeset/process-subtype-attribute.md diff --git a/.changeset/process-subtype-attribute.md b/.changeset/process-subtype-attribute.md new file mode 100644 index 0000000000..e9c7085b04 --- /dev/null +++ b/.changeset/process-subtype-attribute.md @@ -0,0 +1,5 @@ +--- +"@core/electric-telemetry": minor +--- + +Emit a new `process_subtype` attribute alongside the existing `process_type` on `vm.monitor.long_gc`, `vm.monitor.long_schedule`, `vm.monitor.long_message_queue`, `process.memory`, and `process.bin_memory` telemetry events. For the three coarse `process_type` buckets that previously hid most of the signal during overload — `supervisor`, `erlang`, and `logger_olp` — `process_subtype` carries a stable, low-cardinality string identifying the specific process (registered name, falling back to `$ancestors` for unnamed supervisors or to the initial-call MFA for anonymous `:erlang` spawns). Existing `process_type` values are unchanged. diff --git a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex index 4cd5bca788..7ebc4a56a3 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex @@ -87,12 +87,18 @@ defmodule ElectricTelemetry.ApplicationTelemetry do def metrics(telemetry_opts) do [ - last_value("process.memory.total", tags: [:process_type], unit: :byte), - last_value("process.bin_memory.total", tags: [:process_type], unit: :byte), - last_value("process.bin_memory.max_bin_count", tags: [:process_type]), - last_value("process.bin_memory.avg_bin_count", tags: [:process_type]), - last_value("process.bin_memory.max_ref_count", tags: [:process_type]), - last_value("process.bin_memory.avg_ref_count", tags: [:process_type]), + last_value("process.memory.total", + tags: [:process_type, :process_subtype], + unit: :byte + ), + last_value("process.bin_memory.total", + tags: [:process_type, :process_subtype], + unit: :byte + ), + last_value("process.bin_memory.max_bin_count", tags: [:process_type, :process_subtype]), + last_value("process.bin_memory.avg_bin_count", tags: [:process_type, :process_subtype]), + last_value("process.bin_memory.max_ref_count", tags: [:process_type, :process_subtype]), + last_value("process.bin_memory.avg_ref_count", tags: [:process_type, :process_subtype]), last_value("ets.memory.total", tags: [:table_type], unit: :byte), last_value("system.cpu.core_count"), last_value("system.cpu.utilization.total"), @@ -113,12 +119,15 @@ defmodule ElectricTelemetry.ApplicationTelemetry do last_value("vm.memory.processes_used", unit: :byte), last_value("vm.memory.system", unit: :byte), last_value("vm.memory.total", unit: :byte), - sum("vm.monitor.long_message_queue.length", tags: [:process_type]), + sum("vm.monitor.long_message_queue.length", tags: [:process_type, :process_subtype]), distribution("vm.monitor.long_schedule.timeout", - tags: [:process_type], + tags: [:process_type, :process_subtype], + unit: :millisecond + ), + distribution("vm.monitor.long_gc.timeout", + tags: [:process_type, :process_subtype], unit: :millisecond ), - distribution("vm.monitor.long_gc.timeout", tags: [:process_type], unit: :millisecond), last_value("vm.persistent_term.count"), last_value("vm.persistent_term.memory", unit: :byte), last_value("vm.reductions.total"), @@ -182,7 +191,7 @@ defmodule ElectricTelemetry.ApplicationTelemetry do :telemetry.execute( [:process, :memory], %{total: map.proc_mem}, - %{process_type: to_string(map.type)} + %{process_type: to_string(map.type), process_subtype: map.subtype} ) end end @@ -198,7 +207,7 @@ defmodule ElectricTelemetry.ApplicationTelemetry do max_ref_count: map.max_ref_count, avg_ref_count: map.avg_ref_count }, - %{process_type: to_string(map.type)} + %{process_type: to_string(map.type), process_subtype: map.subtype} ) end end diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index ed07365dad..244b401bf7 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -8,6 +8,11 @@ defmodule ElectricTelemetry.Processes do # Minimum memory threshold for a process group when using :mem_percent mode. @min_group_memory 1024 * 1024 + # Coarse `process_type` values for which we additionally derive a `process_subtype`. + # Membership here is matched against the value returned by `proc_type/2` (an atom + # like `:supervisor`, `:erlang`, `:logger_olp`). + @subtyped_types [:supervisor, :erlang, :logger_olp] + defguardp is_valid_mem_percent(percent) when is_integer(percent) and percent >= 1 and percent <= 100 @@ -21,6 +26,40 @@ defmodule ElectricTelemetry.Processes do def proc_type(pid), do: proc_type(pid, info(pid)) + @doc """ + Compute both the coarse `process_type` value and a finer-grained `process_subtype` + for a process in a single `Process.info/2` round-trip. + + The subtype is a stable, low-cardinality string (or `nil`) intended to be emitted + as a companion attribute on telemetry events. See `proc_subtype/1` for the + per-bucket rules. + """ + @spec proc_type_and_subtype(pid()) :: {term(), binary() | nil} + def proc_type_and_subtype(pid) do + info = info(pid) + type = proc_type(pid, info) + {type, proc_subtype(type, info)} + end + + @doc """ + Returns a low-cardinality string identifying the specific process behind a coarse + `process_type` bucket, or `nil` when no useful subtype can be derived. + + Currently populated for the three coarse buckets that hide the most signal during + overload: + + * `:supervisor` — registered name, falling back to first atom in `$ancestors`. + * `:erlang` — registered name, falling back to `initial_call` MFA. + * `:logger_olp` — registered name (handler id). + + All other `process_type` values return `nil`. + """ + @spec proc_subtype(pid()) :: binary() | nil + def proc_subtype(pid) do + info = info(pid) + proc_subtype(proc_type(pid, info), info) + end + def top_memory_by_type, do: top_by(:proc_mem) def top_memory_by_type(proc_list_or_limit), do: top_by(:proc_mem, proc_list_or_limit) def top_memory_by_type(proc_list, limit), do: top_by(:proc_mem, proc_list, limit) @@ -69,11 +108,12 @@ defmodule ElectricTelemetry.Processes do process_list |> Enum.map(&type_and_memory/1) |> Enum.reject(&(&1.type == :dead)) - |> Enum.group_by(& &1.type) - |> Enum.map(fn {type, proc_infos} -> + |> Enum.group_by(&{&1.type, &1.subtype}) + |> Enum.map(fn {{type, subtype}, proc_infos} -> proc_infos |> mem_stats_for_procs() |> Map.put(:type, type) + |> Map.put(:subtype, subtype) end) |> Enum.sort_by(&(-Map.fetch!(&1, sort_key))) end @@ -129,14 +169,17 @@ defmodule ElectricTelemetry.Processes do defp type_and_memory(pid) do info = info(pid) + type = proc_type(pid, info) + subtype = proc_subtype(type, info) info |> memory_from_info() - |> Map.put(:type, proc_type(pid, info)) + |> Map.put(:type, type) + |> Map.put(:subtype, subtype) end defp info(pid) do - Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary]) + Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary, :registered_name]) end defp proc_type(pid, info) do @@ -145,6 +188,55 @@ defmodule ElectricTelemetry.Processes do if(Process.alive?(pid), do: :unknown, else: :dead) end + defp proc_subtype(type, info) when type in @subtyped_types do + case type do + :supervisor -> + registered_name_string(info) || ancestor_atom_string(info) + + :erlang -> + registered_name_string(info) || initial_call_mfa_string(info) + + :logger_olp -> + registered_name_string(info) + end + end + + defp proc_subtype(_type, _info), do: nil + + defp registered_name_string(info) do + case info[:registered_name] do + [] -> nil + nil -> nil + name when is_atom(name) -> Atom.to_string(name) + _ -> nil + end + end + + defp ancestor_atom_string(info) do + case get_in(info, [:dictionary, :"$ancestors"]) do + [name | _] when is_atom(name) and not is_nil(name) -> Atom.to_string(name) + _ -> nil + end + end + + defp initial_call_mfa_string(info) do + # Prefer the dictionary-stored $initial_call (set by proc_lib for OTP processes), + # falling back to the raw initial_call reported by the VM. Returns "Module.fun/arity". + mfa = + case get_in(info, [:dictionary, :"$initial_call"]) do + {m, f, a} -> {m, f, a} + _ -> info[:initial_call] + end + + case mfa do + {m, f, a} when is_atom(m) and is_atom(f) and is_integer(a) -> + Exception.format_mfa(m, f, a) + + _ -> + nil + end + end + defp label_from_info(info) do case info[:label] do :undefined -> nil diff --git a/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex b/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex index ebb0764020..b55e5e4636 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex @@ -12,7 +12,7 @@ defmodule ElectricTelemetry.SystemMonitor do use GenServer - import ElectricTelemetry.Processes, only: [proc_type: 1] + import ElectricTelemetry.Processes, only: [proc_type_and_subtype: 1] require Logger @@ -36,13 +36,16 @@ defmodule ElectricTelemetry.SystemMonitor do end def handle_info({:monitor, gc_pid, :long_gc, info}, state) do - type = proc_type(gc_pid) + {type, subtype} = proc_type_and_subtype(gc_pid) Logger.debug( - "Long GC detected for pid #{inspect(gc_pid)} (#{inspect(type)}): took #{Keyword.fetch!(info, :timeout)}ms. #{inspect(info, limit: :infinity)}" + "Long GC detected for pid #{inspect(gc_pid)} (#{inspect(type)}/#{inspect(subtype)}): took #{Keyword.fetch!(info, :timeout)}ms. #{inspect(info, limit: :infinity)}" ) - :telemetry.execute(@vm_monitor_long_gc, Map.new(info), %{process_type: type}) + :telemetry.execute(@vm_monitor_long_gc, Map.new(info), %{ + process_type: type, + process_subtype: subtype + }) {:noreply, state} end @@ -52,13 +55,16 @@ defmodule ElectricTelemetry.SystemMonitor do "Long schedule detected for port #{inspect(port)}, took #{Keyword.fetch!(info, :timeout)}ms" ) - :telemetry.execute(@vm_monitor_long_schedule, Map.new(info), %{process_type: :port}) + :telemetry.execute(@vm_monitor_long_schedule, Map.new(info), %{ + process_type: :port, + process_subtype: nil + }) {:noreply, state} end def handle_info({:monitor, pid, :long_schedule, info}, state) when is_pid(pid) do - type = proc_type(pid) + {type, subtype} = proc_type_and_subtype(pid) Logger.debug(fn -> locations = @@ -76,26 +82,33 @@ defmodule ElectricTelemetry.SystemMonitor do "" end - "Long schedule detected for pid #{inspect(pid)} (#{inspect(type)}), took #{Keyword.fetch!(info, :timeout)}ms" <> + "Long schedule detected for pid #{inspect(pid)} (#{inspect(type)}/#{inspect(subtype)}), took #{Keyword.fetch!(info, :timeout)}ms" <> locs_str end) :telemetry.execute(@vm_monitor_long_schedule, %{timeout: Keyword.fetch!(info, :timeout)}, %{ - process_type: type + process_type: type, + process_subtype: subtype }) {:noreply, state} end def handle_info({:monitor, pid, :long_message_queue, true}, state) do - type = proc_type(pid) + {type, subtype} = proc_type_and_subtype(pid) - Logger.debug("Long message queue detected for pid #{inspect(pid)} (#{inspect(type)})") + Logger.debug( + "Long message queue detected for pid #{inspect(pid)} (#{inspect(type)}/#{inspect(subtype)})" + ) - log_long_message_queue_event(pid, type) + log_long_message_queue_event(pid, type, subtype) state = - %{state | long_message_queue_pids: Map.put(state.long_message_queue_pids, pid, type)} + %{ + state + | long_message_queue_pids: + Map.put(state.long_message_queue_pids, pid, {type, subtype}) + } |> maybe_start_long_message_queue_timer() {:noreply, state} @@ -114,17 +127,18 @@ defmodule ElectricTelemetry.SystemMonitor do end def handle_info(:recheck_message_queues, state) do - Enum.each(state.long_message_queue_pids, fn {pid, type} -> - log_long_message_queue_event(pid, type) + Enum.each(state.long_message_queue_pids, fn {pid, {type, subtype}} -> + log_long_message_queue_event(pid, type, subtype) end) {:noreply, state} end - defp log_long_message_queue_event(pid, type) do + defp log_long_message_queue_event(pid, type, subtype) do with {:message_queue_len, queue_len} <- Process.info(pid, :message_queue_len) do :telemetry.execute(@vm_monitor_long_message_queue, %{length: queue_len}, %{ - process_type: type + process_type: type, + process_subtype: subtype }) end end diff --git a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs index 22462b5b51..420d4b3f60 100644 --- a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs @@ -119,6 +119,110 @@ defmodule ElectricTelemetry.ProcessesTest do end end + describe "proc_subtype/1 and proc_type_and_subtype/1 for :supervisor" do + test "returns the registered name when the supervisor is named" do + name = :"sup_named_#{System.unique_integer([:positive])}" + {:ok, pid} = Supervisor.start_link([], strategy: :one_for_one, name: name) + + assert {:supervisor, subtype} = proc_type_and_subtype(pid) + assert subtype == Atom.to_string(name) + assert proc_subtype(pid) == Atom.to_string(name) + + Supervisor.stop(pid) + end + + test "falls back to $ancestors atom for an unnamed supervisor" do + parent_name = :"sup_parent_#{System.unique_integer([:positive])}" + Process.register(self(), parent_name) + + {:ok, pid} = Supervisor.start_link([], strategy: :one_for_one) + + assert {:supervisor, subtype} = proc_type_and_subtype(pid) + assert subtype == Atom.to_string(parent_name) + + Supervisor.stop(pid) + Process.unregister(parent_name) + end + + test "returns nil when neither registered name nor named ancestor is available" do + # Run the supervisor from a spawned, unregistered process so the entire $ancestors + # chain is pids rather than atoms. + parent = self() + + spawn_link(fn -> + {:ok, pid} = Supervisor.start_link([], strategy: :one_for_one) + send(parent, {:sup, pid}) + Process.sleep(:infinity) + end) + + assert_receive {:sup, pid}, 200 + + assert {:supervisor, nil} = proc_type_and_subtype(pid) + end + end + + describe "proc_subtype/1 and proc_type_and_subtype/1 for :erlang" do + test "falls back to initial_call MFA for an anonymous spawn_link" do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + + assert {:erlang, ":erlang.apply/2"} = proc_type_and_subtype(pid) + assert proc_subtype(pid) == ":erlang.apply/2" + end + + test "uses the registered name when an :erlang-typed process is named" do + name = :"erlang_named_#{System.unique_integer([:positive])}" + pid = spawn_link(fn -> Process.sleep(:infinity) end) + Process.register(pid, name) + + assert {:erlang, subtype} = proc_type_and_subtype(pid) + assert subtype == Atom.to_string(name) + end + end + + describe "proc_subtype/1 and proc_type_and_subtype/1 for :logger_olp" do + test "uses the registered name as the handler id" do + parent = self() + name = :"logger_olp_test_#{System.unique_integer([:positive])}" + + pid = + spawn_link(fn -> + # Mimic an OLP-spawned process: `$initial_call` MFA module is `:logger_olp`, + # which is how the real OLP processes show up via proc_lib. + Process.put(:"$initial_call", {:logger_olp, :init, 1}) + Process.register(self(), name) + send(parent, :ready) + Process.sleep(:infinity) + end) + + assert_receive :ready, 200 + + assert {:logger_olp, subtype} = proc_type_and_subtype(pid) + assert subtype == Atom.to_string(name) + end + + test "returns nil subtype for an unregistered :logger_olp-typed process" do + parent = self() + + pid = + spawn_link(fn -> + Process.put(:"$initial_call", {:logger_olp, :init, 1}) + send(parent, :ready) + Process.sleep(:infinity) + end) + + assert_receive :ready, 200 + + assert {:logger_olp, nil} = proc_type_and_subtype(pid) + end + end + + describe "proc_type_and_subtype/1 for non-subtyped buckets" do + test "returns nil subtype for a labelled (non-bucketed) process" do + pid = spawn_with_label(:my_process) + assert {:my_process, nil} = proc_type_and_subtype(pid) + end + end + describe "top_memory_by_type/[1, 2]" do test "handles dead processes" do parent = self() From ab9d67286735f25080e0c0659e2766805481a6f7 Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Fri, 22 May 2026 17:36:11 +0200 Subject: [PATCH 2/7] chore: mix format --- .../lib/electric/telemetry/system_monitor.ex | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex b/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex index b55e5e4636..bede6fa3c8 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex @@ -106,8 +106,7 @@ defmodule ElectricTelemetry.SystemMonitor do state = %{ state - | long_message_queue_pids: - Map.put(state.long_message_queue_pids, pid, {type, subtype}) + | long_message_queue_pids: Map.put(state.long_message_queue_pids, pid, {type, subtype}) } |> maybe_start_long_message_queue_timer() From 5ab7d1cc66b65668657b84da0420767bb98ddb15 Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Fri, 22 May 2026 17:48:17 +0200 Subject: [PATCH 3/7] chore(electric-telemetry): tighten proc_type_and_subtype/1 @spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit term() is correct but uninformative — in practice the type is always atom() | binary() (atoms cover :dead, :unknown, module atoms, and atom labels; binaries cover the string-label case). Helps dialyzer downstream. --- packages/electric-telemetry/lib/electric/telemetry/processes.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index 244b401bf7..3f2e17f6cb 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -34,7 +34,7 @@ defmodule ElectricTelemetry.Processes do as a companion attribute on telemetry events. See `proc_subtype/1` for the per-bucket rules. """ - @spec proc_type_and_subtype(pid()) :: {term(), binary() | nil} + @spec proc_type_and_subtype(pid()) :: {atom() | binary(), binary() | nil} def proc_type_and_subtype(pid) do info = info(pid) type = proc_type(pid, info) From aa316399095cc156c4e710bad3045e1ef0ef4ca1 Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Fri, 22 May 2026 17:48:35 +0200 Subject: [PATCH 4/7] chore(electric-telemetry): make registered_name_string/1 explicit about nil MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit is_atom(nil) is true, so the previous clause order (`nil -> nil` before the atom guard) was load-bearing — dropping it would silently turn into `"nil"`. Rewrite the guard to match on `is_atom(name) and not is_nil(name)` so the clause stands on its own. --- .../electric-telemetry/lib/electric/telemetry/processes.ex | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index 3f2e17f6cb..9800e61e9e 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -204,10 +204,11 @@ defmodule ElectricTelemetry.Processes do defp proc_subtype(_type, _info), do: nil defp registered_name_string(info) do + # `Process.info/2` returns `[]` for unregistered processes and the atom name + # otherwise. Be explicit about excluding `nil` here so the clause order isn't + # load-bearing — `is_atom(nil)` is true and would otherwise yield `"nil"`. case info[:registered_name] do - [] -> nil - nil -> nil - name when is_atom(name) -> Atom.to_string(name) + name when is_atom(name) and not is_nil(name) -> Atom.to_string(name) _ -> nil end end From fef04de3927583825d7c61e4463e4288978d28b6 Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Fri, 22 May 2026 17:49:00 +0200 Subject: [PATCH 5/7] test(electric-telemetry): pin dead-process contract for proc_type_and_subtype/1 Existing tests cover proc_type/1 returning :dead for an exited process, but proc_type_and_subtype/1 and proc_subtype/1 weren't exercised for that case. Implementation relies on Process.info/2 returning nil and Access on nil cascading nils through every helper; lock that contract down so a future refactor of info/1 doesn't silently change the answer. --- .../test/electric/telemetry/processes_test.exs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs index 420d4b3f60..ec2adaf133 100644 --- a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs @@ -223,6 +223,24 @@ defmodule ElectricTelemetry.ProcessesTest do end end + describe "proc_type_and_subtype/1 and proc_subtype/1 for dead processes" do + test "returns {:dead, nil} / nil for a process that has exited" do + pid = + spawn_link(fn -> + receive do + :die -> :ok + end + end) + + ref = Process.monitor(pid) + send(pid, :die) + assert_receive {:DOWN, ^ref, :process, ^pid, :normal} + + assert {:dead, nil} = proc_type_and_subtype(pid) + assert proc_subtype(pid) == nil + end + end + describe "top_memory_by_type/[1, 2]" do test "handles dead processes" do parent = self() From d780505450f69eed9e566a4c1dec868731c296e5 Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Mon, 1 Jun 2026 16:08:56 +0200 Subject: [PATCH 6/7] Simplify the implementation of proc_subtype --- .../lib/electric/telemetry/processes.ex | 62 +++++------------- .../electric/telemetry/processes_test.exs | 63 +++++++++---------- 2 files changed, 45 insertions(+), 80 deletions(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index 9800e61e9e..2f7de49934 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -11,7 +11,7 @@ defmodule ElectricTelemetry.Processes do # Coarse `process_type` values for which we additionally derive a `process_subtype`. # Membership here is matched against the value returned by `proc_type/2` (an atom # like `:supervisor`, `:erlang`, `:logger_olp`). - @subtyped_types [:supervisor, :erlang, :logger_olp] + @proc_types_with_subtype [:erlang, :logger_olp, :supervisor] defguardp is_valid_mem_percent(percent) when is_integer(percent) and percent >= 1 and percent <= 100 @@ -27,39 +27,18 @@ defmodule ElectricTelemetry.Processes do def proc_type(pid), do: proc_type(pid, info(pid)) @doc """ - Compute both the coarse `process_type` value and a finer-grained `process_subtype` - for a process in a single `Process.info/2` round-trip. + Compute both `process_type` and `process_subtype` using a single `Process.info/2` call. The subtype is a stable, low-cardinality string (or `nil`) intended to be emitted - as a companion attribute on telemetry events. See `proc_subtype/1` for the - per-bucket rules. + as a companion attribute on telemetry events. """ - @spec proc_type_and_subtype(pid()) :: {atom() | binary(), binary() | nil} + @spec proc_type_and_subtype(pid()) :: {atom() | binary(), atom() | binary() | nil} def proc_type_and_subtype(pid) do info = info(pid) type = proc_type(pid, info) {type, proc_subtype(type, info)} end - @doc """ - Returns a low-cardinality string identifying the specific process behind a coarse - `process_type` bucket, or `nil` when no useful subtype can be derived. - - Currently populated for the three coarse buckets that hide the most signal during - overload: - - * `:supervisor` — registered name, falling back to first atom in `$ancestors`. - * `:erlang` — registered name, falling back to `initial_call` MFA. - * `:logger_olp` — registered name (handler id). - - All other `process_type` values return `nil`. - """ - @spec proc_subtype(pid()) :: binary() | nil - def proc_subtype(pid) do - info = info(pid) - proc_subtype(proc_type(pid, info), info) - end - def top_memory_by_type, do: top_by(:proc_mem) def top_memory_by_type(proc_list_or_limit), do: top_by(:proc_mem, proc_list_or_limit) def top_memory_by_type(proc_list, limit), do: top_by(:proc_mem, proc_list, limit) @@ -178,7 +157,8 @@ defmodule ElectricTelemetry.Processes do |> Map.put(:subtype, subtype) end - defp info(pid) do + @doc false + def info(pid) do Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary, :registered_name]) end @@ -188,34 +168,20 @@ defmodule ElectricTelemetry.Processes do if(Process.alive?(pid), do: :unknown, else: :dead) end - defp proc_subtype(type, info) when type in @subtyped_types do - case type do - :supervisor -> - registered_name_string(info) || ancestor_atom_string(info) - - :erlang -> - registered_name_string(info) || initial_call_mfa_string(info) - - :logger_olp -> - registered_name_string(info) - end + defp proc_subtype(proc_type, info) when proc_type in @proc_types_with_subtype do + registered_name(info) || ancestor_name(info) || initial_call_mfa_string(info) end - defp proc_subtype(_type, _info), do: nil + defp proc_subtype(_, _), do: nil - defp registered_name_string(info) do - # `Process.info/2` returns `[]` for unregistered processes and the atom name - # otherwise. Be explicit about excluding `nil` here so the clause order isn't - # load-bearing — `is_atom(nil)` is true and would otherwise yield `"nil"`. - case info[:registered_name] do - name when is_atom(name) and not is_nil(name) -> Atom.to_string(name) - _ -> nil - end + defp registered_name(info) do + # Process.info(pid, :registered_name) returns an empty list for unregistered processes + with [] <- info[:registered_name], do: nil end - defp ancestor_atom_string(info) do + defp ancestor_name(info) do case get_in(info, [:dictionary, :"$ancestors"]) do - [name | _] when is_atom(name) and not is_nil(name) -> Atom.to_string(name) + [name | _] when is_atom(name) and not is_nil(name) -> name _ -> nil end end diff --git a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs index ec2adaf133..762ea9c3ff 100644 --- a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs @@ -119,32 +119,35 @@ defmodule ElectricTelemetry.ProcessesTest do end end - describe "proc_subtype/1 and proc_type_and_subtype/1 for :supervisor" do + describe "proc_type_and_subtype/1 for :supervisor" do test "returns the registered name when the supervisor is named" do name = :"sup_named_#{System.unique_integer([:positive])}" {:ok, pid} = Supervisor.start_link([], strategy: :one_for_one, name: name) - - assert {:supervisor, subtype} = proc_type_and_subtype(pid) - assert subtype == Atom.to_string(name) - assert proc_subtype(pid) == Atom.to_string(name) - - Supervisor.stop(pid) + assert {:supervisor, name} == proc_type_and_subtype(pid) end test "falls back to $ancestors atom for an unnamed supervisor" do - parent_name = :"sup_parent_#{System.unique_integer([:positive])}" - Process.register(self(), parent_name) + parent_sup_name = :"sup_parent_#{System.unique_integer([:positive])}" + child_sup_name = :"sup_child_#{System.unique_integer([:positive])}" + + child_sup = %{ + id: Supervisor, + type: :supervisor, + start: {Supervisor, :start_link, [[], [strategy: :one_for_one, name: child_sup_name]]} + } - {:ok, pid} = Supervisor.start_link([], strategy: :one_for_one) + {:ok, sup_pid} = + Supervisor.start_link([child_sup], strategy: :one_for_one, name: parent_sup_name) - assert {:supervisor, subtype} = proc_type_and_subtype(pid) - assert subtype == Atom.to_string(parent_name) + [{_, child_pid, _, _}] = Supervisor.which_children(sup_pid) - Supervisor.stop(pid) - Process.unregister(parent_name) + assert {:supervisor, child_sup_name} == proc_type_and_subtype(child_pid) + + true = Process.unregister(child_sup_name) + assert {:supervisor, parent_sup_name} == proc_type_and_subtype(child_pid) end - test "returns nil when neither registered name nor named ancestor is available" do + test "falls back to initial_call when neither registered name nor named ancestor is available" do # Run the supervisor from a spawned, unregistered process so the entire $ancestors # chain is pids rather than atoms. parent = self() @@ -157,16 +160,15 @@ defmodule ElectricTelemetry.ProcessesTest do assert_receive {:sup, pid}, 200 - assert {:supervisor, nil} = proc_type_and_subtype(pid) + assert {:supervisor, ":supervisor.\"Elixir.Supervisor.Default\"/1"} == + proc_type_and_subtype(pid) end end - describe "proc_subtype/1 and proc_type_and_subtype/1 for :erlang" do + describe "proc_type_and_subtype/1 for :erlang" do test "falls back to initial_call MFA for an anonymous spawn_link" do pid = spawn_link(fn -> Process.sleep(:infinity) end) - - assert {:erlang, ":erlang.apply/2"} = proc_type_and_subtype(pid) - assert proc_subtype(pid) == ":erlang.apply/2" + assert {:erlang, ":erlang.apply/2"} == proc_type_and_subtype(pid) end test "uses the registered name when an :erlang-typed process is named" do @@ -174,12 +176,11 @@ defmodule ElectricTelemetry.ProcessesTest do pid = spawn_link(fn -> Process.sleep(:infinity) end) Process.register(pid, name) - assert {:erlang, subtype} = proc_type_and_subtype(pid) - assert subtype == Atom.to_string(name) + assert {:erlang, name} == proc_type_and_subtype(pid) end end - describe "proc_subtype/1 and proc_type_and_subtype/1 for :logger_olp" do + describe "proc_type_and_subtype/1 for :logger_olp" do test "uses the registered name as the handler id" do parent = self() name = :"logger_olp_test_#{System.unique_integer([:positive])}" @@ -196,11 +197,10 @@ defmodule ElectricTelemetry.ProcessesTest do assert_receive :ready, 200 - assert {:logger_olp, subtype} = proc_type_and_subtype(pid) - assert subtype == Atom.to_string(name) + assert {:logger_olp, name} == proc_type_and_subtype(pid) end - test "returns nil subtype for an unregistered :logger_olp-typed process" do + test "falls back to initial call for an unregistered :logger_olp-typed process" do parent = self() pid = @@ -212,19 +212,19 @@ defmodule ElectricTelemetry.ProcessesTest do assert_receive :ready, 200 - assert {:logger_olp, nil} = proc_type_and_subtype(pid) + assert {:logger_olp, ":logger_olp.init/1"} == proc_type_and_subtype(pid) end end describe "proc_type_and_subtype/1 for non-subtyped buckets" do test "returns nil subtype for a labelled (non-bucketed) process" do pid = spawn_with_label(:my_process) - assert {:my_process, nil} = proc_type_and_subtype(pid) + assert {:my_process, nil} == proc_type_and_subtype(pid) end end - describe "proc_type_and_subtype/1 and proc_subtype/1 for dead processes" do - test "returns {:dead, nil} / nil for a process that has exited" do + describe "proc_type_and_subtype/1 for dead processes" do + test "returns {:dead, nil} for a process that has exited" do pid = spawn_link(fn -> receive do @@ -236,8 +236,7 @@ defmodule ElectricTelemetry.ProcessesTest do send(pid, :die) assert_receive {:DOWN, ^ref, :process, ^pid, :normal} - assert {:dead, nil} = proc_type_and_subtype(pid) - assert proc_subtype(pid) == nil + assert {:dead, nil} == proc_type_and_subtype(pid) end end From 7026b78a992026a86677416a5b7f612c0b424aa8 Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Tue, 2 Jun 2026 12:31:12 +0200 Subject: [PATCH 7/7] Requery process type/subtype every time long message queues are rechecked --- .../lib/electric/telemetry/system_monitor.ex | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex b/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex index bede6fa3c8..02c3d0191d 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex @@ -32,7 +32,7 @@ defmodule ElectricTelemetry.SystemMonitor do {opts.long_message_queue_disable_threshold, opts.long_message_queue_enable_threshold} ) - {:ok, %{long_message_queue_pids: %{}, long_message_queue_timer: nil}} + {:ok, %{long_message_queue_pids: MapSet.new(), long_message_queue_timer: nil}} end def handle_info({:monitor, gc_pid, :long_gc, info}, state) do @@ -106,7 +106,7 @@ defmodule ElectricTelemetry.SystemMonitor do state = %{ state - | long_message_queue_pids: Map.put(state.long_message_queue_pids, pid, {type, subtype}) + | long_message_queue_pids: MapSet.put(state.long_message_queue_pids, pid) } |> maybe_start_long_message_queue_timer() @@ -116,21 +116,22 @@ defmodule ElectricTelemetry.SystemMonitor do def handle_info({:monitor, pid, :long_message_queue, false}, state) do Logger.debug("Long message queue no longer detected for pid #{inspect(pid)}") - {:noreply, %{state | long_message_queue_pids: Map.delete(state.long_message_queue_pids, pid)}} - end - - def handle_info(:recheck_message_queues, state) - when map_size(state.long_message_queue_pids) == 0 do - :timer.cancel(state.long_message_queue_timer) - {:noreply, %{state | long_message_queue_timer: nil}} + {:noreply, + %{state | long_message_queue_pids: MapSet.delete(state.long_message_queue_pids, pid)}} end def handle_info(:recheck_message_queues, state) do - Enum.each(state.long_message_queue_pids, fn {pid, {type, subtype}} -> - log_long_message_queue_event(pid, type, subtype) - end) - - {:noreply, state} + if MapSet.size(state.long_message_queue_pids) == 0 do + :timer.cancel(state.long_message_queue_timer) + {:noreply, %{state | long_message_queue_timer: nil}} + else + Enum.each(state.long_message_queue_pids, fn pid -> + {type, subtype} = proc_type_and_subtype(pid) + log_long_message_queue_event(pid, type, subtype) + end) + + {:noreply, state} + end end defp log_long_message_queue_event(pid, type, subtype) do