diff --git a/.changeset/process-subtype-attribute.md b/.changeset/process-subtype-attribute.md new file mode 100644 index 0000000000..e9c7085b04 --- /dev/null +++ b/.changeset/process-subtype-attribute.md @@ -0,0 +1,5 @@ +--- +"@core/electric-telemetry": minor +--- + +Emit a new `process_subtype` attribute alongside the existing `process_type` on `vm.monitor.long_gc`, `vm.monitor.long_schedule`, `vm.monitor.long_message_queue`, `process.memory`, and `process.bin_memory` telemetry events. For the three coarse `process_type` buckets that previously hid most of the signal during overload — `supervisor`, `erlang`, and `logger_olp` — `process_subtype` carries a stable, low-cardinality string identifying the specific process (registered name, falling back to `$ancestors` for unnamed supervisors or to the initial-call MFA for anonymous `:erlang` spawns). Existing `process_type` values are unchanged. diff --git a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex index 4cd5bca788..7ebc4a56a3 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex @@ -87,12 +87,18 @@ defmodule ElectricTelemetry.ApplicationTelemetry do def metrics(telemetry_opts) do [ - last_value("process.memory.total", tags: [:process_type], unit: :byte), - last_value("process.bin_memory.total", tags: [:process_type], unit: :byte), - last_value("process.bin_memory.max_bin_count", tags: [:process_type]), - last_value("process.bin_memory.avg_bin_count", tags: [:process_type]), - last_value("process.bin_memory.max_ref_count", tags: [:process_type]), - last_value("process.bin_memory.avg_ref_count", tags: [:process_type]), + last_value("process.memory.total", + tags: [:process_type, :process_subtype], + unit: :byte + ), + last_value("process.bin_memory.total", + tags: [:process_type, :process_subtype], + unit: :byte + ), + last_value("process.bin_memory.max_bin_count", tags: [:process_type, :process_subtype]), + last_value("process.bin_memory.avg_bin_count", tags: [:process_type, :process_subtype]), + last_value("process.bin_memory.max_ref_count", tags: [:process_type, :process_subtype]), + last_value("process.bin_memory.avg_ref_count", tags: [:process_type, :process_subtype]), last_value("ets.memory.total", tags: [:table_type], unit: :byte), last_value("system.cpu.core_count"), last_value("system.cpu.utilization.total"), @@ -113,12 +119,15 @@ defmodule ElectricTelemetry.ApplicationTelemetry do last_value("vm.memory.processes_used", unit: :byte), last_value("vm.memory.system", unit: :byte), last_value("vm.memory.total", unit: :byte), - sum("vm.monitor.long_message_queue.length", tags: [:process_type]), + sum("vm.monitor.long_message_queue.length", tags: [:process_type, :process_subtype]), distribution("vm.monitor.long_schedule.timeout", - tags: [:process_type], + tags: [:process_type, :process_subtype], + unit: :millisecond + ), + distribution("vm.monitor.long_gc.timeout", + tags: [:process_type, :process_subtype], unit: :millisecond ), - distribution("vm.monitor.long_gc.timeout", tags: [:process_type], unit: :millisecond), last_value("vm.persistent_term.count"), last_value("vm.persistent_term.memory", unit: :byte), last_value("vm.reductions.total"), @@ -182,7 +191,7 @@ defmodule ElectricTelemetry.ApplicationTelemetry do :telemetry.execute( [:process, :memory], %{total: map.proc_mem}, - %{process_type: to_string(map.type)} + %{process_type: to_string(map.type), process_subtype: map.subtype} ) end end @@ -198,7 +207,7 @@ defmodule ElectricTelemetry.ApplicationTelemetry do max_ref_count: map.max_ref_count, avg_ref_count: map.avg_ref_count }, - %{process_type: to_string(map.type)} + %{process_type: to_string(map.type), process_subtype: map.subtype} ) end end diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index ed07365dad..2f7de49934 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -8,6 +8,11 @@ defmodule ElectricTelemetry.Processes do # Minimum memory threshold for a process group when using :mem_percent mode. @min_group_memory 1024 * 1024 + # Coarse `process_type` values for which we additionally derive a `process_subtype`. + # Membership here is matched against the value returned by `proc_type/2` (an atom + # like `:supervisor`, `:erlang`, `:logger_olp`). + @proc_types_with_subtype [:erlang, :logger_olp, :supervisor] + defguardp is_valid_mem_percent(percent) when is_integer(percent) and percent >= 1 and percent <= 100 @@ -21,6 +26,19 @@ defmodule ElectricTelemetry.Processes do def proc_type(pid), do: proc_type(pid, info(pid)) + @doc """ + Compute both `process_type` and `process_subtype` using a single `Process.info/2` call. + + The subtype is a stable, low-cardinality string (or `nil`) intended to be emitted + as a companion attribute on telemetry events. + """ + @spec proc_type_and_subtype(pid()) :: {atom() | binary(), atom() | binary() | nil} + def proc_type_and_subtype(pid) do + info = info(pid) + type = proc_type(pid, info) + {type, proc_subtype(type, info)} + end + def top_memory_by_type, do: top_by(:proc_mem) def top_memory_by_type(proc_list_or_limit), do: top_by(:proc_mem, proc_list_or_limit) def top_memory_by_type(proc_list, limit), do: top_by(:proc_mem, proc_list, limit) @@ -69,11 +87,12 @@ defmodule ElectricTelemetry.Processes do process_list |> Enum.map(&type_and_memory/1) |> Enum.reject(&(&1.type == :dead)) - |> Enum.group_by(& &1.type) - |> Enum.map(fn {type, proc_infos} -> + |> Enum.group_by(&{&1.type, &1.subtype}) + |> Enum.map(fn {{type, subtype}, proc_infos} -> proc_infos |> mem_stats_for_procs() |> Map.put(:type, type) + |> Map.put(:subtype, subtype) end) |> Enum.sort_by(&(-Map.fetch!(&1, sort_key))) end @@ -129,14 +148,18 @@ defmodule ElectricTelemetry.Processes do defp type_and_memory(pid) do info = info(pid) + type = proc_type(pid, info) + subtype = proc_subtype(type, info) info |> memory_from_info() - |> Map.put(:type, proc_type(pid, info)) + |> Map.put(:type, type) + |> Map.put(:subtype, subtype) end - defp info(pid) do - Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary]) + @doc false + def info(pid) do + Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary, :registered_name]) end defp proc_type(pid, info) do @@ -145,6 +168,42 @@ defmodule ElectricTelemetry.Processes do if(Process.alive?(pid), do: :unknown, else: :dead) end + defp proc_subtype(proc_type, info) when proc_type in @proc_types_with_subtype do + registered_name(info) || ancestor_name(info) || initial_call_mfa_string(info) + end + + defp proc_subtype(_, _), do: nil + + defp registered_name(info) do + # Process.info(pid, :registered_name) returns an empty list for unregistered processes + with [] <- info[:registered_name], do: nil + end + + defp ancestor_name(info) do + case get_in(info, [:dictionary, :"$ancestors"]) do + [name | _] when is_atom(name) and not is_nil(name) -> name + _ -> nil + end + end + + defp initial_call_mfa_string(info) do + # Prefer the dictionary-stored $initial_call (set by proc_lib for OTP processes), + # falling back to the raw initial_call reported by the VM. Returns "Module.fun/arity". + mfa = + case get_in(info, [:dictionary, :"$initial_call"]) do + {m, f, a} -> {m, f, a} + _ -> info[:initial_call] + end + + case mfa do + {m, f, a} when is_atom(m) and is_atom(f) and is_integer(a) -> + Exception.format_mfa(m, f, a) + + _ -> + nil + end + end + defp label_from_info(info) do case info[:label] do :undefined -> nil diff --git a/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex b/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex index ebb0764020..02c3d0191d 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/system_monitor.ex @@ -12,7 +12,7 @@ defmodule ElectricTelemetry.SystemMonitor do use GenServer - import ElectricTelemetry.Processes, only: [proc_type: 1] + import ElectricTelemetry.Processes, only: [proc_type_and_subtype: 1] require Logger @@ -32,17 +32,20 @@ defmodule ElectricTelemetry.SystemMonitor do {opts.long_message_queue_disable_threshold, opts.long_message_queue_enable_threshold} ) - {:ok, %{long_message_queue_pids: %{}, long_message_queue_timer: nil}} + {:ok, %{long_message_queue_pids: MapSet.new(), long_message_queue_timer: nil}} end def handle_info({:monitor, gc_pid, :long_gc, info}, state) do - type = proc_type(gc_pid) + {type, subtype} = proc_type_and_subtype(gc_pid) Logger.debug( - "Long GC detected for pid #{inspect(gc_pid)} (#{inspect(type)}): took #{Keyword.fetch!(info, :timeout)}ms. #{inspect(info, limit: :infinity)}" + "Long GC detected for pid #{inspect(gc_pid)} (#{inspect(type)}/#{inspect(subtype)}): took #{Keyword.fetch!(info, :timeout)}ms. #{inspect(info, limit: :infinity)}" ) - :telemetry.execute(@vm_monitor_long_gc, Map.new(info), %{process_type: type}) + :telemetry.execute(@vm_monitor_long_gc, Map.new(info), %{ + process_type: type, + process_subtype: subtype + }) {:noreply, state} end @@ -52,13 +55,16 @@ defmodule ElectricTelemetry.SystemMonitor do "Long schedule detected for port #{inspect(port)}, took #{Keyword.fetch!(info, :timeout)}ms" ) - :telemetry.execute(@vm_monitor_long_schedule, Map.new(info), %{process_type: :port}) + :telemetry.execute(@vm_monitor_long_schedule, Map.new(info), %{ + process_type: :port, + process_subtype: nil + }) {:noreply, state} end def handle_info({:monitor, pid, :long_schedule, info}, state) when is_pid(pid) do - type = proc_type(pid) + {type, subtype} = proc_type_and_subtype(pid) Logger.debug(fn -> locations = @@ -76,26 +82,32 @@ defmodule ElectricTelemetry.SystemMonitor do "" end - "Long schedule detected for pid #{inspect(pid)} (#{inspect(type)}), took #{Keyword.fetch!(info, :timeout)}ms" <> + "Long schedule detected for pid #{inspect(pid)} (#{inspect(type)}/#{inspect(subtype)}), took #{Keyword.fetch!(info, :timeout)}ms" <> locs_str end) :telemetry.execute(@vm_monitor_long_schedule, %{timeout: Keyword.fetch!(info, :timeout)}, %{ - process_type: type + process_type: type, + process_subtype: subtype }) {:noreply, state} end def handle_info({:monitor, pid, :long_message_queue, true}, state) do - type = proc_type(pid) + {type, subtype} = proc_type_and_subtype(pid) - Logger.debug("Long message queue detected for pid #{inspect(pid)} (#{inspect(type)})") + Logger.debug( + "Long message queue detected for pid #{inspect(pid)} (#{inspect(type)}/#{inspect(subtype)})" + ) - log_long_message_queue_event(pid, type) + log_long_message_queue_event(pid, type, subtype) state = - %{state | long_message_queue_pids: Map.put(state.long_message_queue_pids, pid, type)} + %{ + state + | long_message_queue_pids: MapSet.put(state.long_message_queue_pids, pid) + } |> maybe_start_long_message_queue_timer() {:noreply, state} @@ -104,27 +116,29 @@ defmodule ElectricTelemetry.SystemMonitor do def handle_info({:monitor, pid, :long_message_queue, false}, state) do Logger.debug("Long message queue no longer detected for pid #{inspect(pid)}") - {:noreply, %{state | long_message_queue_pids: Map.delete(state.long_message_queue_pids, pid)}} - end - - def handle_info(:recheck_message_queues, state) - when map_size(state.long_message_queue_pids) == 0 do - :timer.cancel(state.long_message_queue_timer) - {:noreply, %{state | long_message_queue_timer: nil}} + {:noreply, + %{state | long_message_queue_pids: MapSet.delete(state.long_message_queue_pids, pid)}} end def handle_info(:recheck_message_queues, state) do - Enum.each(state.long_message_queue_pids, fn {pid, type} -> - log_long_message_queue_event(pid, type) - end) - - {:noreply, state} + if MapSet.size(state.long_message_queue_pids) == 0 do + :timer.cancel(state.long_message_queue_timer) + {:noreply, %{state | long_message_queue_timer: nil}} + else + Enum.each(state.long_message_queue_pids, fn pid -> + {type, subtype} = proc_type_and_subtype(pid) + log_long_message_queue_event(pid, type, subtype) + end) + + {:noreply, state} + end end - defp log_long_message_queue_event(pid, type) do + defp log_long_message_queue_event(pid, type, subtype) do with {:message_queue_len, queue_len} <- Process.info(pid, :message_queue_len) do :telemetry.execute(@vm_monitor_long_message_queue, %{length: queue_len}, %{ - process_type: type + process_type: type, + process_subtype: subtype }) end end diff --git a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs index 22462b5b51..762ea9c3ff 100644 --- a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs @@ -119,6 +119,127 @@ defmodule ElectricTelemetry.ProcessesTest do end end + describe "proc_type_and_subtype/1 for :supervisor" do + test "returns the registered name when the supervisor is named" do + name = :"sup_named_#{System.unique_integer([:positive])}" + {:ok, pid} = Supervisor.start_link([], strategy: :one_for_one, name: name) + assert {:supervisor, name} == proc_type_and_subtype(pid) + end + + test "falls back to $ancestors atom for an unnamed supervisor" do + parent_sup_name = :"sup_parent_#{System.unique_integer([:positive])}" + child_sup_name = :"sup_child_#{System.unique_integer([:positive])}" + + child_sup = %{ + id: Supervisor, + type: :supervisor, + start: {Supervisor, :start_link, [[], [strategy: :one_for_one, name: child_sup_name]]} + } + + {:ok, sup_pid} = + Supervisor.start_link([child_sup], strategy: :one_for_one, name: parent_sup_name) + + [{_, child_pid, _, _}] = Supervisor.which_children(sup_pid) + + assert {:supervisor, child_sup_name} == proc_type_and_subtype(child_pid) + + true = Process.unregister(child_sup_name) + assert {:supervisor, parent_sup_name} == proc_type_and_subtype(child_pid) + end + + test "falls back to initial_call when neither registered name nor named ancestor is available" do + # Run the supervisor from a spawned, unregistered process so the entire $ancestors + # chain is pids rather than atoms. + parent = self() + + spawn_link(fn -> + {:ok, pid} = Supervisor.start_link([], strategy: :one_for_one) + send(parent, {:sup, pid}) + Process.sleep(:infinity) + end) + + assert_receive {:sup, pid}, 200 + + assert {:supervisor, ":supervisor.\"Elixir.Supervisor.Default\"/1"} == + proc_type_and_subtype(pid) + end + end + + describe "proc_type_and_subtype/1 for :erlang" do + test "falls back to initial_call MFA for an anonymous spawn_link" do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + assert {:erlang, ":erlang.apply/2"} == proc_type_and_subtype(pid) + end + + test "uses the registered name when an :erlang-typed process is named" do + name = :"erlang_named_#{System.unique_integer([:positive])}" + pid = spawn_link(fn -> Process.sleep(:infinity) end) + Process.register(pid, name) + + assert {:erlang, name} == proc_type_and_subtype(pid) + end + end + + describe "proc_type_and_subtype/1 for :logger_olp" do + test "uses the registered name as the handler id" do + parent = self() + name = :"logger_olp_test_#{System.unique_integer([:positive])}" + + pid = + spawn_link(fn -> + # Mimic an OLP-spawned process: `$initial_call` MFA module is `:logger_olp`, + # which is how the real OLP processes show up via proc_lib. + Process.put(:"$initial_call", {:logger_olp, :init, 1}) + Process.register(self(), name) + send(parent, :ready) + Process.sleep(:infinity) + end) + + assert_receive :ready, 200 + + assert {:logger_olp, name} == proc_type_and_subtype(pid) + end + + test "falls back to initial call for an unregistered :logger_olp-typed process" do + parent = self() + + pid = + spawn_link(fn -> + Process.put(:"$initial_call", {:logger_olp, :init, 1}) + send(parent, :ready) + Process.sleep(:infinity) + end) + + assert_receive :ready, 200 + + assert {:logger_olp, ":logger_olp.init/1"} == proc_type_and_subtype(pid) + end + end + + describe "proc_type_and_subtype/1 for non-subtyped buckets" do + test "returns nil subtype for a labelled (non-bucketed) process" do + pid = spawn_with_label(:my_process) + assert {:my_process, nil} == proc_type_and_subtype(pid) + end + end + + describe "proc_type_and_subtype/1 for dead processes" do + test "returns {:dead, nil} for a process that has exited" do + pid = + spawn_link(fn -> + receive do + :die -> :ok + end + end) + + ref = Process.monitor(pid) + send(pid, :die) + assert_receive {:DOWN, ^ref, :process, ^pid, :normal} + + assert {:dead, nil} == proc_type_and_subtype(pid) + end + end + describe "top_memory_by_type/[1, 2]" do test "handles dead processes" do parent = self()