Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
728e8f6
fix(integrations): emit stack-invariant OR ranges for related_integra…
Mikaayenson May 27, 2026
6d88831
fix(integrations): emit stack-invariant OR ranges for related_integra…
Mikaayenson May 28, 2026
73c0780
fix(integrations): satisfy ruff SIM110 in _major_has_compatible_stack
Mikaayenson May 28, 2026
07cd5a0
fix(integrations): address review feedback for stack-major resolution
Mikaayenson May 28, 2026
2ac6605
Merge branch 'main' into 5601-bug-improper-prebuilt-rule-version-usag…
Mikaayenson May 29, 2026
1d9fae1
fix(integrations): tighten stack-major overlap and anchor resolution
Mikaayenson May 29, 2026
2302006
fix(integrations): annotate majors_to_check for pyright
Mikaayenson May 29, 2026
76e0369
fix(integrations): rebase #6208 onto main with #6251 schema-aware OR …
Mikaayenson Jun 4, 2026
eef18f2
style(rule): apply ruff format for CI code-checks
Mikaayenson Jun 4, 2026
6ae62b1
fix(integrations): address PR review on version range export
Mikaayenson Jun 4, 2026
adc4f10
fix(integrations): anchor RI export to shipped stack backports only
Mikaayenson Jun 4, 2026
655cffc
refactor(integrations): simplify version range export helpers
Mikaayenson Jun 4, 2026
9adb920
fix(rule): dedupe ES|QL related_integrations metadata package row
Mikaayenson Jun 5, 2026
d4cc69c
test(integrations): drop brittle ES|QL rule-file export tests
Mikaayenson Jun 5, 2026
0919416
refactor(integrations): dedupe schema floor lookup and trim comments
Mikaayenson Jun 5, 2026
6a37067
fix(integrations): walk shipped stack lines for RI anchor collection
Mikaayenson Jun 5, 2026
429ab99
fix(rule): dedupe NON_DATASET metadata RI rows when datasets cover pa…
Mikaayenson Jun 5, 2026
bdbec2a
Merge branch 'main' into 5601-bug-improper-prebuilt-rule-version-usag…
Mikaayenson Jun 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 238 additions & 45 deletions detection_rules/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import fnmatch
import gzip
import json
from collections import OrderedDict, defaultdict
from collections import defaultdict
from collections.abc import Iterable, Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand All @@ -22,7 +23,7 @@
from . import ecs
from .beats import flatten_ecs_schema
from .config import load_current_package_version
from .schemas import definitions
from .schemas import definitions, get_stack_versions
from .utils import cached, get_etc_path, read_gzip, unzip

if TYPE_CHECKING:
Expand Down Expand Up @@ -245,18 +246,17 @@ def _satisfies_kibana_range(stack: Version, version_requirement: str) -> bool:


def find_latest_integration_patch_for_minor(packages: Iterable[str], major: int, minor: int) -> int:
"""Find the latest stack patch the given integration packages need for a major.minor."""
# The stack-schema-map keys stacks at MAJOR.MINOR.0, but an integration may gate its latest
# package (and newly-added data streams) behind a later patch (e.g. azure ~8.19.10). Resolving
# against the literal .0 falls back to an older package that predates the stream. Return the
# latest patch a package gates on for the minor, i.e. the stack patch needed to receive the most
# up-to-date integration package on that minor.
"""Find the latest stack patch integration packages need for a major.minor."""
# stack-schema-map keys stacks at MAJOR.MINOR.0, but an integration may gate its latest
# package (and newly-added data streams) behind a later patch (e.g. azure ~8.19.10).
# Resolving against the literal .0 falls back to an older package that predates the
# stream. Return the latest patch a package gates on for the minor.
#
# Track the *newest* package version's floor (not the max floor across all versions): Fleet always
# installs the latest compatible package, so that floor is the patch a stack actually needs. A
# newer package occasionally lowers its floor (e.g. apm 7.16.1 gates ^7.16.1 but the newer 7.16.2
# gates ^7.16.0); honoring the newest version matches what Fleet installs rather than an older,
# higher floor that would never be installed on that stack.
# Track the *newest* package version's floor (not the max floor across all versions):
# Fleet always installs the latest compatible package, so that floor is the patch a
# stack actually needs. A newer package occasionally lowers its floor (e.g. apm 7.16.1
# gates ^7.16.1 but the newer 7.16.2 gates ^7.16.0); honoring the newest version
# matches what Fleet installs rather than an older, higher floor.
manifests = load_integrations_manifests()
latest_patch = 0
for package in packages:
Expand All @@ -282,49 +282,242 @@ def find_latest_integration_patch_for_minor(packages: Iterable[str], major: int,
return latest_patch


def find_least_compatible_version(
package: str,
integration: str | None,
current_stack_version: str,
packages_manifest: dict[str, Any],
) -> str:
"""Finds least compatible version for specified integration based on stack version supplied."""
integration_manifests = dict(sorted(packages_manifest[package].items(), key=lambda x: Version.parse(x[0])))
stack_version = Version.parse(current_stack_version, optional_minor_and_patch=True)

# The manifest's kibana condition only tells us whether the *package* installs on the stack, not
# whether this particular integration/data stream exists yet in that package version (e.g. azure
# added aadgraphactivitylogs in 1.37.0, but 1.0.0 already installs on 8.19). The schemas record
# the data streams present per package version, so use them to skip versions that predate the
# integration. Only filter when schema data exists for a version, otherwise fall back to kibana
# compatibility alone (e.g. for synthetic manifests in tests).
# Loaded only when an integration is specified, to avoid decompressing the schemas for
# package-only lookups where the schema check is never consulted.
package_schemas: dict[str, Any] = load_integrations_schemas().get(package, {}) if integration else {}

# filter integration_manifests to only the latest major entries
# Sentinel written by ``parse_datasets`` when a rule indexes a package but not a data stream.
UNKNOWN_PACKAGE_INTEGRATION = "Unknown"

# Cap majors walked for unbounded Kibana clauses (``>=X.Y.Z``). Intersection with
# ``_shipped_stack_majors()`` keeps only backport lines we ship rules to.
_MAX_UNBOUNDED_STACK_MAJOR_SPAN = 10


def _major_has_compatible_stack(major: int, version_requirement: str) -> bool:
"""Return True iff the Kibana range overlaps some stack in [major.0.0, (major+1).0.0)."""
major_lo = Version(major, 0, 0)
major_hi = Version(major + 1, 0, 0)
return any(lo < major_hi and (hi is None or hi > major_lo) for lo, hi in _parse_kibana_range(version_requirement))


def _package_version_has_integration(
version: str,
integration: str,
package_schemas: dict[str, Any],
) -> bool:
"""Return True when schema data is absent or includes the integration/data stream."""
if version not in package_schemas:
return True
return integration in package_schemas[version]


def _majors_overlapping_kibana_clause(
lo: Version,
hi: Version | None,
version_requirement: str,
) -> list[int]:
"""Return stack majors whose [M.0.0, (M+1).0.0) band intersects the parsed clause bounds."""
if hi is not None:
majors_to_check: list[int] = []
major = lo.major
while Version(major, 0, 0) < hi:
majors_to_check.append(major)
major += 1
return majors_to_check

# Unbounded upper (``>=``, ``>``): walk forward while the major still overlaps.
majors_to_check: list[int] = []
major = lo.major
while major <= lo.major + _MAX_UNBOUNDED_STACK_MAJOR_SPAN and _major_has_compatible_stack(
major, version_requirement
):
majors_to_check.append(major)
major += 1
return majors_to_check


def _stack_majors_supported_by_package(integration_manifests: dict[str, Any]) -> set[int]:
"""Collect Kibana stack majors that any manifest in the package can serve."""
stack_majors: set[int] = set()
for manifest in integration_manifests.values():
version_requirement = manifest["conditions"]["kibana"]["version"]
for lo, hi in _parse_kibana_range(version_requirement):
for major in _majors_overlapping_kibana_clause(lo, hi, version_requirement):
stack_majors.add(major)
return stack_majors


def _find_least_compatible_for_stack(
stack_version: Version,
integration_manifests: dict[str, Any],
integration: str | None = None,
package_schemas: dict[str, Any] | None = None,
) -> str | None:
"""Stack-dependent least compatible integration version (pre-#5601 behavior)."""
major_versions = sorted(
{Version.parse(manifest_version).major for manifest_version in integration_manifests},
reverse=True,
)
for max_major in major_versions:
major_integration_manifests = {
k: v for k, v in integration_manifests.items() if Version.parse(k).major == max_major
version: manifest
for version, manifest in integration_manifests.items()
if Version.parse(version).major == max_major
}

# iterates through ascending integration manifests
# returns latest major version that is least compatible
for version, manifest in OrderedDict(
sorted(major_integration_manifests.items(), key=lambda x: Version.parse(x[0]))
).items():
for version, manifest in sorted(major_integration_manifests.items(), key=lambda x: Version.parse(x[0])):
version_requirement = manifest["conditions"]["kibana"]["version"]
if not _satisfies_kibana_range(stack_version, version_requirement):
continue
if integration and version in package_schemas and integration not in package_schemas[version]:
if (
integration
and package_schemas is not None
and not _package_version_has_integration(version, integration, package_schemas)
):
continue
return f"^{version}"
return version
return None

raise ValueError(f"no compatible version for integration {package}:{integration}")

@dataclass(frozen=True)
class CompatibleVersionRange:
"""Stack-invariant related integration compatibility range."""

range: str
anchors: tuple[str, ...]
forward_anchor: str


def _build_compatible_version_range(anchors: list[str]) -> CompatibleVersionRange:
"""Build a CompatibleVersionRange from manifest-backed anchor versions."""
if not anchors:
raise ValueError("anchors must not be empty")

sorted_anchors = tuple(sorted(set(anchors), key=Version.parse))
top_major = max(Version.parse(anchor).major for anchor in sorted_anchors)
# Forward sentinel for the next integration major (no manifest entry yet).
forward_anchor = f"{top_major + 1}.0.0"
range_parts = [f"^{anchor}" for anchor in sorted_anchors] + [f"^{forward_anchor}"]
return CompatibleVersionRange(
range=" || ".join(range_parts),
anchors=sorted_anchors,
forward_anchor=forward_anchor,
)


@cached
def _shipped_stack_majors() -> set[int]:
"""Stack majors we ship prebuilt rules to (from the stack-schema-map backport lines)."""
return {Version.parse(version).major for version in get_stack_versions()}


def minimum_schema_package_version(
package: str,
integration: str,
integration_schemas: dict[str, Any],
) -> str | None:
"""Return the oldest package version whose schema includes integration, if any."""
package_schemas = integration_schemas.get(package)
if not package_schemas:
return None

for version in sorted(package_schemas, key=Version.parse):
if integration in package_schemas[version]:
return version
return None


def apply_schema_version_floor(
result: CompatibleVersionRange,
schema_floor: str,
) -> CompatibleVersionRange:
"""Raise anchors in the schema floor's package major when below schema_floor."""
floor_version = Version.parse(schema_floor)
floor_major = floor_version.major
bumped_anchors: list[str] = []

for anchor in result.anchors:
anchor_version = Version.parse(anchor)
if anchor_version.major == floor_major and anchor_version < floor_version:
continue
bumped_anchors.append(anchor)

if not any(Version.parse(anchor).major == floor_major for anchor in bumped_anchors):
bumped_anchors.append(schema_floor)

bumped_tuple = tuple(sorted(bumped_anchors, key=Version.parse))
if bumped_tuple == result.anchors:
return result

return _build_compatible_version_range(list(bumped_tuple))


def _collect_compatible_anchors(
integration_manifests: dict[str, Any],
stack_majors: set[int],
integration: str | None,
package_schemas: dict[str, Any],
) -> list[str]:
"""Oldest compatible integration version per shipped stack version line."""
anchors: list[str] = []
for stack_version_str in get_stack_versions():
stack_version = Version.parse(stack_version_str)
if stack_version.major not in stack_majors:
continue
anchor = _find_least_compatible_for_stack(
stack_version,
integration_manifests,
integration,
package_schemas,
)
if anchor and anchor not in anchors:
anchors.append(anchor)
return anchors


def _integration_schema_floor(
package: str,
integration: str | None,
package_schemas: dict[str, Any],
) -> str | None:
"""Oldest package version whose schema includes integration, when schemas are loaded."""
if not integration or not package_schemas:
return None
return minimum_schema_package_version(package, integration, {package: package_schemas})


def find_compatible_version_range(
package: str,
packages_manifest: dict[str, Any],
integration: str | None = None,
) -> CompatibleVersionRange:
"""Return a stack-invariant OR'd caret range for related_integrations.version."""
# One anchor per shipped stack version line (no build-time stack), OR'd carets, forward sentinel.
# With integration set, filter by integration-schemas when present (data-stream floor).
package_manifest = packages_manifest.get(package)
if package_manifest is None:
raise ValueError(f"Package {package} not found in manifest.")

package_schemas: dict[str, Any] = {}
if integration:
package_schemas = load_integrations_schemas().get(package, {})
schema_floor = _integration_schema_floor(package, integration, package_schemas)

integration_manifests = dict(sorted(package_manifest.items(), key=lambda x: Version.parse(x[0])))
stack_majors = _stack_majors_supported_by_package(integration_manifests) & _shipped_stack_majors()

if not stack_majors:
raise ValueError(f"no compatible version for integration package {package}")

anchors = _collect_compatible_anchors(integration_manifests, stack_majors, integration, package_schemas)

if not anchors:
if schema_floor:
baseline = find_compatible_version_range(package, packages_manifest)
return apply_schema_version_floor(baseline, schema_floor)
package_label = f"{package}:{integration}" if integration else package
raise ValueError(f"no compatible version for integration {package_label}")

result = _build_compatible_version_range(anchors)
if schema_floor:
result = apply_schema_version_floor(result, schema_floor)
return result


def find_latest_compatible_version(
Expand Down Expand Up @@ -537,7 +730,7 @@ def parse_datasets(datasets: list[str], package_manifest: dict[str, Any]) -> lis
# cleanup extra quotes pulled from ast field
value = _value.strip('"')

integration = "Unknown"
integration = UNKNOWN_PACKAGE_INTEGRATION
if "." in value:
package, integration = value.split(".", 1)
# Handle cases where endpoint event datasource needs to be parsed uniquely (e.g endpoint.events.network)
Expand Down
Loading
Loading