Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions livekit-agents/livekit/agents/ipc/supervised_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def __init__(
self._memory_baseline_mb: float | None = None
self._last_memory_warn_time: float = 0.0
self._last_memory_warn_mb: float = 0.0
self._psutil_process: psutil.Process | None = None

self._start_atask: asyncio.Task[None] | None = None
self._supervise_atask: asyncio.Task[None] | None = None
Expand Down Expand Up @@ -547,11 +548,13 @@ def _should_emit_memory_warning(self, memory_mb: float, *, now: float) -> bool:
return True
return False

def _memory_logging_extra(self, memory_mb: float) -> dict[str, Any]:
def _memory_logging_extra(self, memory_mb: float, memory_metric: str) -> dict[str, Any]:
Comment thread
Bobronium marked this conversation as resolved.
"""Diagnostic context for the memory logs: tells a process that started
heavy from one that grew over time (uptime, baseline RSS, growth)."""
heavy from one that grew over time (uptime, baseline, growth). memory_metric
records which footprint metric the number reflects (pss/uss/rss)."""
extra: dict[str, Any] = {
"memory_usage_mb": round(memory_mb, 1),
"memory_metric": memory_metric,
"memory_warn_mb": self._opts.memory_warn_mb,
"memory_limit_mb": self._opts.memory_limit_mb,
"uptime": round(self.uptime, 1),
Expand All @@ -564,6 +567,36 @@ def _memory_logging_extra(self, memory_mb: float) -> dict[str, Any]:
extra.update(self.logging_extra())
return extra

def _sample_memory_mb(self) -> tuple[float, str]:
"""Real footprint of the process, preferring metrics that don't
double-count shared pages. RSS counts copy-on-write pages inherited
from the forkserver and file-backed library pages at memory_info weight, so it
overstates what the process actually consumes and oversums across
processes. PSS (proportional, Linux only) divides shared pages among
their sharers and sums correctly to physical use; USS (private only) is
the cross-platform fallback since psutil exposes no PSS on macOS/Windows.
RSS is a last resort if the extended metrics are unavailable.

Synchronous and blocking: the PSS/USS path walks the target process's
page tables in the kernel, which scales with its mapping count and is
~ms for a large child. Run via run_in_executor so that walk overlaps
with the event loop rather than stalling it (the read syscall releases
the GIL, so the offload pays off even on a single core)."""
process = self._psutil_process
if process is None:
process = self._psutil_process = psutil.Process(self._pid)

try:
memory_info: Any = process.memory_full_info()
except (psutil.AccessDenied, NotImplementedError):
memory_info = process.memory_info()

for metric in "pss", "uss", "rss":
if memory_bytes := getattr(memory_info, metric, 0):
return memory_bytes / (1024 * 1024), metric

return 0, "unknown"

@log_exceptions(logger=logger)
async def _memory_monitor_task(self) -> None:
"""Monitor memory usage and kill the process if it exceeds the limit."""
Expand All @@ -573,10 +606,9 @@ async def _memory_monitor_task(self) -> None:
await asyncio.sleep(_MEMORY_MONITOR_INTERVAL)
continue

# get process memory info
process = psutil.Process(self._pid)
memory_info = process.memory_info()
memory_mb = memory_info.rss / (1024 * 1024) # Convert to MB
memory_mb, memory_metric = await self._loop.run_in_executor(
None, self._sample_memory_mb
)

# the first sample (taken shortly after initialization) is treated as the
# post-prewarm baseline, so later samples can report growth since startup
Expand All @@ -586,7 +618,7 @@ async def _memory_monitor_task(self) -> None:
if self._opts.memory_limit_mb > 0 and memory_mb > self._opts.memory_limit_mb:
logger.error(
f"{self.process_kind} process exceeded memory limit, killing it",
extra=self._memory_logging_extra(memory_mb),
extra=self._memory_logging_extra(memory_mb, memory_metric),
)
await self._send_dump_signal()
await self._send_kill_signal()
Expand All @@ -606,7 +638,7 @@ async def _memory_monitor_task(self) -> None:
if advisory
else ""
),
extra=self._memory_logging_extra(memory_mb),
extra=self._memory_logging_extra(memory_mb, memory_metric),
)

except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
Expand Down
4 changes: 2 additions & 2 deletions livekit-agents/livekit/agents/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class ServerOptions:
Defaults to 0.7 on "production" mode, and is disabled in "development" mode.
"""

job_memory_warn_mb: float = 1000
job_memory_warn_mb: float = 500
Comment thread
Bobronium marked this conversation as resolved.
"""Memory warning threshold in MB. If the job process exceeds this limit, a warning will be logged.""" # noqa: E501
job_memory_limit_mb: float = 0
"""Maximum memory usage for a job in MB, the job process will be killed if it exceeds this limit.
Expand Down Expand Up @@ -304,7 +304,7 @@ def __init__(
*,
job_executor_type: JobExecutorType = _default_job_executor_type,
load_threshold: float | ServerEnvOption[float] = _default_load_threshold,
job_memory_warn_mb: float = 1000,
job_memory_warn_mb: float = 500,
job_memory_limit_mb: float = 0,
drain_timeout: int = 1800,
num_idle_processes: int | ServerEnvOption[int] = _default_num_idle_processes,
Expand Down
67 changes: 65 additions & 2 deletions tests/test_supervised_proc_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import asyncio
import multiprocessing as mp
import os
import socket
import sys

import psutil
import pytest

from livekit.agents.ipc.supervised_proc import (
Expand Down Expand Up @@ -74,7 +77,7 @@ async def test_memory_logging_extra_reports_baseline_and_growth() -> None:
proc = _make_proc()

# before a baseline is captured, only the basic fields are present
extra = proc._memory_logging_extra(520.0)
extra = proc._memory_logging_extra(520.0, "pss")
assert extra["memory_usage_mb"] == 520.0
assert extra["memory_warn_mb"] == 500
assert extra["has_running_job"] is False
Expand All @@ -83,7 +86,7 @@ async def test_memory_logging_extra_reports_baseline_and_growth() -> None:

# once a baseline is set, growth-since-startup is reported
proc._memory_baseline_mb = 300.0
extra = proc._memory_logging_extra(520.0)
extra = proc._memory_logging_extra(520.0, "pss")
assert extra["baseline_memory_mb"] == 300.0
assert extra["growth_memory_mb"] == 220.0

Expand Down Expand Up @@ -169,3 +172,63 @@ async def _main_task(self, ipc_ch): # pragma: no cover
mp_ctx=mp.get_context("spawn"),
loop=asyncio.get_event_loop(),
)


def test_sample_memory_picks_metric_psutil_actually_exposes() -> None:
proc = _make_proc()
proc._pid = os.getpid()

value_mb, metric = proc._sample_memory_mb()

assert metric in ("pss", "uss", "rss")
assert value_mb > 0

# must select the highest-priority field psutil reports nonzero on this host
info = psutil.Process(os.getpid()).memory_full_info()
expected_metric = next(m for m in ("pss", "uss", "rss") if getattr(info, m, 0))
assert metric == expected_metric


@pytest.mark.skipif(sys.platform != "linux", reason="PSS is only exposed on Linux")
def test_sample_memory_reports_pss_on_linux() -> None:
proc = _make_proc()
proc._pid = os.getpid()

value_mb, metric = proc._sample_memory_mb()

assert metric == "pss"
assert value_mb > 0


@pytest.mark.skipif(
sys.platform not in ("darwin", "win32"),
reason="USS-without-PSS path applies to macOS and Windows",
)
def test_sample_memory_reports_uss_on_macos_and_windows() -> None:
proc = _make_proc()
proc._pid = os.getpid()

value_mb, metric = proc._sample_memory_mb()

assert metric == "uss"
assert value_mb > 0


def test_sample_memory_falls_back_to_rss_when_full_info_unavailable() -> None:
# AccessDenied/NotImplementedError can't be provoked on one's own process, so
# only the trigger is faked: memory_full_info is shadowed on a real Process
# instance (instance attribute, so no global/class state leaks across the
# concurrent suite) while memory_info().rss stays a genuine psutil read.
proc = _make_proc()
real = psutil.Process(os.getpid())

def _unavailable():
raise NotImplementedError

real.memory_full_info = _unavailable # type: ignore[method-assign]
proc._psutil_process = real

value_mb, metric = proc._sample_memory_mb()

assert metric == "rss"
assert value_mb > 0
Loading