Skip to content
5 changes: 5 additions & 0 deletions .changeset/process-subtype-attribute.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric-telemetry": minor
---

Emit a new `process_subtype` attribute alongside the existing `process_type` on `vm.monitor.long_gc`, `vm.monitor.long_schedule`, `vm.monitor.long_message_queue`, `process.memory`, and `process.bin_memory` telemetry events. For the three coarse `process_type` buckets that previously hid most of the signal during overload — `supervisor`, `erlang`, and `logger_olp` — `process_subtype` carries a stable, low-cardinality string identifying the specific process (registered name, falling back to `$ancestors` for unnamed supervisors or to the initial-call MFA for anonymous `:erlang` spawns). Existing `process_type` values are unchanged.
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,18 @@ defmodule ElectricTelemetry.ApplicationTelemetry do

def metrics(telemetry_opts) do
[
last_value("process.memory.total", tags: [:process_type], unit: :byte),
last_value("process.bin_memory.total", tags: [:process_type], unit: :byte),
last_value("process.bin_memory.max_bin_count", tags: [:process_type]),
last_value("process.bin_memory.avg_bin_count", tags: [:process_type]),
last_value("process.bin_memory.max_ref_count", tags: [:process_type]),
last_value("process.bin_memory.avg_ref_count", tags: [:process_type]),
last_value("process.memory.total",
tags: [:process_type, :process_subtype],
unit: :byte
),
last_value("process.bin_memory.total",
tags: [:process_type, :process_subtype],
unit: :byte
),
last_value("process.bin_memory.max_bin_count", tags: [:process_type, :process_subtype]),
last_value("process.bin_memory.avg_bin_count", tags: [:process_type, :process_subtype]),
last_value("process.bin_memory.max_ref_count", tags: [:process_type, :process_subtype]),
last_value("process.bin_memory.avg_ref_count", tags: [:process_type, :process_subtype]),
last_value("ets.memory.total", tags: [:table_type], unit: :byte),
last_value("system.cpu.core_count"),
last_value("system.cpu.utilization.total"),
Expand All @@ -113,12 +119,15 @@ defmodule ElectricTelemetry.ApplicationTelemetry do
last_value("vm.memory.processes_used", unit: :byte),
last_value("vm.memory.system", unit: :byte),
last_value("vm.memory.total", unit: :byte),
sum("vm.monitor.long_message_queue.length", tags: [:process_type]),
sum("vm.monitor.long_message_queue.length", tags: [:process_type, :process_subtype]),
distribution("vm.monitor.long_schedule.timeout",
tags: [:process_type],
tags: [:process_type, :process_subtype],
unit: :millisecond
),
distribution("vm.monitor.long_gc.timeout",
tags: [:process_type, :process_subtype],
unit: :millisecond
),
distribution("vm.monitor.long_gc.timeout", tags: [:process_type], unit: :millisecond),
last_value("vm.persistent_term.count"),
last_value("vm.persistent_term.memory", unit: :byte),
last_value("vm.reductions.total"),
Expand Down Expand Up @@ -182,7 +191,7 @@ defmodule ElectricTelemetry.ApplicationTelemetry do
:telemetry.execute(
[:process, :memory],
%{total: map.proc_mem},
%{process_type: to_string(map.type)}
%{process_type: to_string(map.type), process_subtype: map.subtype}
)
end
end
Expand All @@ -198,7 +207,7 @@ defmodule ElectricTelemetry.ApplicationTelemetry do
max_ref_count: map.max_ref_count,
avg_ref_count: map.avg_ref_count
},
%{process_type: to_string(map.type)}
%{process_type: to_string(map.type), process_subtype: map.subtype}
)
end
end
Expand Down
69 changes: 64 additions & 5 deletions packages/electric-telemetry/lib/electric/telemetry/processes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ defmodule ElectricTelemetry.Processes do
# Minimum memory threshold for a process group when using :mem_percent mode.
@min_group_memory 1024 * 1024

# Coarse `process_type` values for which we additionally derive a `process_subtype`.
# Membership here is matched against the value returned by `proc_type/2` (an atom
# like `:supervisor`, `:erlang`, `:logger_olp`).
@proc_types_with_subtype [:erlang, :logger_olp, :supervisor]

defguardp is_valid_mem_percent(percent)
when is_integer(percent) and percent >= 1 and percent <= 100

Expand All @@ -21,6 +26,19 @@ defmodule ElectricTelemetry.Processes do

def proc_type(pid), do: proc_type(pid, info(pid))

@doc """
Compute both `process_type` and `process_subtype` using a single `Process.info/2` call.

The subtype is a stable, low-cardinality string (or `nil`) intended to be emitted
as a companion attribute on telemetry events.
"""
@spec proc_type_and_subtype(pid()) :: {atom() | binary(), atom() | binary() | nil}
def proc_type_and_subtype(pid) do
info = info(pid)
type = proc_type(pid, info)
{type, proc_subtype(type, info)}
end

def top_memory_by_type, do: top_by(:proc_mem)
def top_memory_by_type(proc_list_or_limit), do: top_by(:proc_mem, proc_list_or_limit)
def top_memory_by_type(proc_list, limit), do: top_by(:proc_mem, proc_list, limit)
Expand Down Expand Up @@ -69,11 +87,12 @@ defmodule ElectricTelemetry.Processes do
process_list
|> Enum.map(&type_and_memory/1)
|> Enum.reject(&(&1.type == :dead))
|> Enum.group_by(& &1.type)
|> Enum.map(fn {type, proc_infos} ->
|> Enum.group_by(&{&1.type, &1.subtype})
|> Enum.map(fn {{type, subtype}, proc_infos} ->
proc_infos
|> mem_stats_for_procs()
|> Map.put(:type, type)
|> Map.put(:subtype, subtype)
end)
|> Enum.sort_by(&(-Map.fetch!(&1, sort_key)))
end
Expand Down Expand Up @@ -129,14 +148,18 @@ defmodule ElectricTelemetry.Processes do

defp type_and_memory(pid) do
info = info(pid)
type = proc_type(pid, info)
subtype = proc_subtype(type, info)

info
|> memory_from_info()
|> Map.put(:type, proc_type(pid, info))
|> Map.put(:type, type)
|> Map.put(:subtype, subtype)
end

defp info(pid) do
Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary])
@doc false
def info(pid) do
Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary, :registered_name])
end

defp proc_type(pid, info) do
Expand All @@ -145,6 +168,42 @@ defmodule ElectricTelemetry.Processes do
if(Process.alive?(pid), do: :unknown, else: :dead)
end

defp proc_subtype(proc_type, info) when proc_type in @proc_types_with_subtype do
registered_name(info) || ancestor_name(info) || initial_call_mfa_string(info)
end

defp proc_subtype(_, _), do: nil

defp registered_name(info) do
# Process.info(pid, :registered_name) returns an empty list for unregistered processes
with [] <- info[:registered_name], do: nil
end

defp ancestor_name(info) do
case get_in(info, [:dictionary, :"$ancestors"]) do
[name | _] when is_atom(name) and not is_nil(name) -> name
_ -> nil
end
end

defp initial_call_mfa_string(info) do
# Prefer the dictionary-stored $initial_call (set by proc_lib for OTP processes),
# falling back to the raw initial_call reported by the VM. Returns "Module.fun/arity".
mfa =
case get_in(info, [:dictionary, :"$initial_call"]) do
{m, f, a} -> {m, f, a}
_ -> info[:initial_call]
end

case mfa do
{m, f, a} when is_atom(m) and is_atom(f) and is_integer(a) ->
Exception.format_mfa(m, f, a)

_ ->
nil
end
end

defp label_from_info(info) do
case info[:label] do
:undefined -> nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule ElectricTelemetry.SystemMonitor do

use GenServer

import ElectricTelemetry.Processes, only: [proc_type: 1]
import ElectricTelemetry.Processes, only: [proc_type_and_subtype: 1]

require Logger

Expand All @@ -32,17 +32,20 @@ defmodule ElectricTelemetry.SystemMonitor do
{opts.long_message_queue_disable_threshold, opts.long_message_queue_enable_threshold}
)

{:ok, %{long_message_queue_pids: %{}, long_message_queue_timer: nil}}
{:ok, %{long_message_queue_pids: MapSet.new(), long_message_queue_timer: nil}}
end

def handle_info({:monitor, gc_pid, :long_gc, info}, state) do
type = proc_type(gc_pid)
{type, subtype} = proc_type_and_subtype(gc_pid)

Logger.debug(
"Long GC detected for pid #{inspect(gc_pid)} (#{inspect(type)}): took #{Keyword.fetch!(info, :timeout)}ms. #{inspect(info, limit: :infinity)}"
"Long GC detected for pid #{inspect(gc_pid)} (#{inspect(type)}/#{inspect(subtype)}): took #{Keyword.fetch!(info, :timeout)}ms. #{inspect(info, limit: :infinity)}"
)

:telemetry.execute(@vm_monitor_long_gc, Map.new(info), %{process_type: type})
:telemetry.execute(@vm_monitor_long_gc, Map.new(info), %{
process_type: type,
process_subtype: subtype
})

{:noreply, state}
end
Expand All @@ -52,13 +55,16 @@ defmodule ElectricTelemetry.SystemMonitor do
"Long schedule detected for port #{inspect(port)}, took #{Keyword.fetch!(info, :timeout)}ms"
)

:telemetry.execute(@vm_monitor_long_schedule, Map.new(info), %{process_type: :port})
:telemetry.execute(@vm_monitor_long_schedule, Map.new(info), %{
process_type: :port,
process_subtype: nil
})

{:noreply, state}
end

def handle_info({:monitor, pid, :long_schedule, info}, state) when is_pid(pid) do
type = proc_type(pid)
{type, subtype} = proc_type_and_subtype(pid)

Logger.debug(fn ->
locations =
Expand All @@ -76,26 +82,32 @@ defmodule ElectricTelemetry.SystemMonitor do
""
end

"Long schedule detected for pid #{inspect(pid)} (#{inspect(type)}), took #{Keyword.fetch!(info, :timeout)}ms" <>
"Long schedule detected for pid #{inspect(pid)} (#{inspect(type)}/#{inspect(subtype)}), took #{Keyword.fetch!(info, :timeout)}ms" <>
locs_str
end)

:telemetry.execute(@vm_monitor_long_schedule, %{timeout: Keyword.fetch!(info, :timeout)}, %{
process_type: type
process_type: type,
process_subtype: subtype
})

{:noreply, state}
end

def handle_info({:monitor, pid, :long_message_queue, true}, state) do
type = proc_type(pid)
{type, subtype} = proc_type_and_subtype(pid)

Logger.debug("Long message queue detected for pid #{inspect(pid)} (#{inspect(type)})")
Logger.debug(
"Long message queue detected for pid #{inspect(pid)} (#{inspect(type)}/#{inspect(subtype)})"
)

log_long_message_queue_event(pid, type)
log_long_message_queue_event(pid, type, subtype)

state =
%{state | long_message_queue_pids: Map.put(state.long_message_queue_pids, pid, type)}
%{
state
| long_message_queue_pids: MapSet.put(state.long_message_queue_pids, pid)
}
|> maybe_start_long_message_queue_timer()

{:noreply, state}
Expand All @@ -104,27 +116,29 @@ defmodule ElectricTelemetry.SystemMonitor do
def handle_info({:monitor, pid, :long_message_queue, false}, state) do
Logger.debug("Long message queue no longer detected for pid #{inspect(pid)}")

{:noreply, %{state | long_message_queue_pids: Map.delete(state.long_message_queue_pids, pid)}}
end

def handle_info(:recheck_message_queues, state)
when map_size(state.long_message_queue_pids) == 0 do
:timer.cancel(state.long_message_queue_timer)
{:noreply, %{state | long_message_queue_timer: nil}}
{:noreply,
%{state | long_message_queue_pids: MapSet.delete(state.long_message_queue_pids, pid)}}
end

def handle_info(:recheck_message_queues, state) do
Enum.each(state.long_message_queue_pids, fn {pid, type} ->
log_long_message_queue_event(pid, type)
end)

{:noreply, state}
if MapSet.size(state.long_message_queue_pids) == 0 do
:timer.cancel(state.long_message_queue_timer)
{:noreply, %{state | long_message_queue_timer: nil}}
else
Enum.each(state.long_message_queue_pids, fn pid ->
{type, subtype} = proc_type_and_subtype(pid)
log_long_message_queue_event(pid, type, subtype)
end)

{:noreply, state}
end
end

defp log_long_message_queue_event(pid, type) do
defp log_long_message_queue_event(pid, type, subtype) do
with {:message_queue_len, queue_len} <- Process.info(pid, :message_queue_len) do
:telemetry.execute(@vm_monitor_long_message_queue, %{length: queue_len}, %{
process_type: type
process_type: type,
process_subtype: subtype
})
end
end
Expand Down
Loading
Loading