Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 124 additions & 21 deletions detection_rules/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import fnmatch
import gzip
import json
from collections import OrderedDict, defaultdict
from collections import defaultdict
from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -244,36 +245,138 @@ def _satisfies_kibana_range(stack: Version, version_requirement: str) -> bool:
return any(lo <= stack and (hi is None or stack < hi) for lo, hi in _parse_kibana_range(version_requirement))


def find_least_compatible_version(
package: str,
integration: str,
current_stack_version: str,
packages_manifest: dict[str, Any],
) -> str:
"""Finds least compatible version for specified integration based on stack version supplied."""
integration_manifests = dict(sorted(packages_manifest[package].items(), key=lambda x: Version.parse(x[0])))
stack_version = Version.parse(current_stack_version, optional_minor_and_patch=True)

# filter integration_manifests to only the latest major entries
def _major_has_compatible_stack(major: int, version_requirement: str) -> bool:
"""Return True iff the Kibana range overlaps some stack in ``[major.0.0, (major+1).0.0)``."""
major_lo = Version(major, 0, 0)
major_hi = Version(major + 1, 0, 0)
return any(lo < major_hi and (hi is None or hi > major_lo) for lo, hi in _parse_kibana_range(version_requirement))


def _stack_majors_supported_by_package(integration_manifests: dict[str, Any]) -> set[int]:
"""Collect Kibana stack majors that any manifest in the package can serve."""
stack_majors: set[int] = set()
for manifest in integration_manifests.values():
version_requirement = manifest["conditions"]["kibana"]["version"]
for lo, _hi in _parse_kibana_range(version_requirement):
if _major_has_compatible_stack(lo.major, version_requirement):
stack_majors.add(lo.major)
Comment thread
Mikaayenson marked this conversation as resolved.
Outdated
return stack_majors


def _anchor_for_aligned_integration_major(
major: int,
integration_manifests: dict[str, Any],
) -> str | None:
"""Oldest integration version in ``major`` whose Kibana range overlaps ``[major, major+1)``."""
major_manifests = {
version: manifest
for version, manifest in integration_manifests.items()
if Version.parse(version).major == major
}
for version, manifest in sorted(major_manifests.items(), key=lambda x: Version.parse(x[0])):
version_requirement = manifest["conditions"]["kibana"]["version"]
if _major_has_compatible_stack(major, version_requirement):
return version
return None


def _find_least_compatible_for_stack(
stack_version: Version,
integration_manifests: dict[str, Any],
) -> str | None:
"""Stack-dependent least compatible integration version (pre-#5601 behavior)."""
major_versions = sorted(
{Version.parse(manifest_version).major for manifest_version in integration_manifests},
reverse=True,
)
for max_major in major_versions:
major_integration_manifests = {
k: v for k, v in integration_manifests.items() if Version.parse(k).major == max_major
version: manifest
for version, manifest in integration_manifests.items()
if Version.parse(version).major == max_major
}

# iterates through ascending integration manifests
# returns latest major version that is least compatible
for version, manifest in OrderedDict(
sorted(major_integration_manifests.items(), key=lambda x: Version.parse(x[0]))
).items():
for version, manifest in sorted(major_integration_manifests.items(), key=lambda x: Version.parse(x[0])):
version_requirement = manifest["conditions"]["kibana"]["version"]
if _satisfies_kibana_range(stack_version, version_requirement):
return f"^{version}"
return version
return None

raise ValueError(f"no compatible version for integration {package}:{integration}")

def _representative_stack_version(stack_major: int) -> Version:
"""Representative stack version used to resolve unaligned integration majors."""
return Version(stack_major, 19, 0)


@dataclass(frozen=True)
class CompatibleVersionRange:
"""Stack-invariant related integration compatibility range."""

range: str
anchors: list[str]
forward_anchor: str


def find_compatible_version_range(
package: str,
packages_manifest: dict[str, Any],
) -> CompatibleVersionRange:
"""Return a stack-invariant OR'd caret range for ``related_integrations.version``.

Emits one ``^X.Y.Z`` anchor per stack line the integration package supports, plus a
forward-looking ``^(top_major + 1).0.0`` anchor. Integration majors aligned with Kibana
stack majors (e.g. endpoint 8.x / 9.x) use manifest overlap on ``[M, M+1)``; other
packages resolve additional stack lines via the legacy stack walk.
"""
package_manifest = packages_manifest.get(package)
if package_manifest is None:
raise ValueError(f"Package {package} not found in manifest.")

integration_manifests = dict(sorted(package_manifest.items(), key=lambda x: Version.parse(x[0])))
integration_majors = {Version.parse(version).major for version in integration_manifests}
stack_majors = _stack_majors_supported_by_package(integration_manifests)

if not stack_majors:
raise ValueError(f"no compatible version for integration package {package}")

aligned_by_major = {
major: anchor
for major in sorted(integration_majors)
if (anchor := _anchor_for_aligned_integration_major(major, integration_manifests)) is not None
}
aligned_min_major = min(aligned_by_major) if aligned_by_major else None

if aligned_min_major is not None:
effective_stack_majors = sorted(stack_major for stack_major in stack_majors if stack_major >= aligned_min_major)
else:
effective_stack_majors = sorted(
stack_major for stack_major in stack_majors if stack_major >= max(stack_majors) - 1
)

anchors: list[str] = []
for stack_major in effective_stack_majors:
if stack_major in aligned_by_major:
anchor = aligned_by_major[stack_major]
elif stack_major in integration_majors:
anchor = _anchor_for_aligned_integration_major(stack_major, integration_manifests)
else:
anchor = _find_least_compatible_for_stack(
_representative_stack_version(stack_major),
integration_manifests,
Comment thread
Mikaayenson marked this conversation as resolved.
Outdated
)
if anchor and anchor not in anchors:
anchors.append(anchor)

if not anchors:
raise ValueError(f"no compatible version for integration package {package}")

top_major = max(Version.parse(anchor).major for anchor in anchors)
forward_anchor = f"{top_major + 1}.0.0"
range_parts = [f"^{anchor}" for anchor in anchors] + [f"^{forward_anchor}"]
return CompatibleVersionRange(
range=" || ".join(range_parts),
anchors=anchors,
forward_anchor=forward_anchor,
)


def find_latest_compatible_version(
Expand Down
22 changes: 9 additions & 13 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from .esql import get_esql_query_event_dataset_integrations
from .esql_errors import EsqlSemanticError
from .integrations import (
find_least_compatible_version,
find_compatible_version_range,
get_integration_schema_fields,
load_integrations_manifests,
load_integrations_schemas,
Expand Down Expand Up @@ -1428,7 +1428,6 @@ def _convert_add_related_integrations(self, obj: dict[str, Any]) -> None:

if not package_integrations and self.metadata.integration:
packages_manifest = load_integrations_manifests()
current_stack_version = load_current_package_version()

if self.check_restricted_field_version(field_name) and isinstance(
self.data, QueryRuleData | MachineLearningRuleData
Expand All @@ -1446,22 +1445,19 @@ def _convert_add_related_integrations(self, obj: dict[str, Any]) -> None:
return

for package in package_integrations:
package["version"] = find_least_compatible_version(
result = find_compatible_version_range(
package=package["package"],
integration=package["integration"],
current_stack_version=current_stack_version,
packages_manifest=packages_manifest,
)
package["version"] = result.range

# if integration is not a policy template remove
if package["version"]:
version_data = packages_manifest.get(package["package"], {}).get(
package["version"].strip("^"), {}
)
policy_templates = version_data.get("policy_templates", [])
policy_templates: set[str] = set()
for anchor in result.anchors:
version_data = packages_manifest.get(package["package"], {}).get(anchor, {})
policy_templates.update(version_data.get("policy_templates", []))

if package["integration"] not in policy_templates:
del package["integration"]
if package["integration"] not in policy_templates:
del package["integration"]

# remove duplicate entries
package_integrations = list({json.dumps(d, sort_keys=True): d for d in package_integrations}.values())
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.6.43"
version = "1.6.44"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
97 changes: 70 additions & 27 deletions tests/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
_parse_clause,
_parse_kibana_range,
_satisfies_kibana_range,
find_compatible_version_range,
find_latest_compatible_version,
find_least_compatible_version,
)


Expand Down Expand Up @@ -215,45 +215,88 @@ def test_unknown_package_raises(self):
find_latest_compatible_version("missing", "missing", Version(9, 1, 0), {})


class TestFindLeastCompatibleVersion(unittest.TestCase):
"""Behavior coverage for ``find_least_compatible_version``."""
class TestFindCompatibleVersionRange(unittest.TestCase):
"""Behavior coverage for ``find_compatible_version_range``."""

def test_picks_oldest_compatible_in_latest_major(self):
"""Returns the oldest manifest in the latest major whose range admits the stack."""
def test_emits_or_range_across_majors(self):
"""Emits oldest anchor per major plus a forward-looking next-major anchor."""
manifests = {
"pkg": {
"1.0.0": _manifest("^8.12.0"),
"1.5.0": _manifest("^8.12.0"),
"2.0.0": _manifest("^9.0.0"),
"2.1.0": _manifest("^9.1.0"),
"2.5.0": _manifest("^9.1.0"),
"1.0.0": _manifest("^1.0.0"),
"1.5.0": _manifest("^1.5.0"),
"2.0.0": _manifest("^2.0.0"),
"2.5.0": _manifest("^2.1.0"),
}
}
# 2.0.0 (^9.0.0) is the oldest 9.x manifest that admits a 9.1.0 stack.
self.assertEqual(find_least_compatible_version("pkg", "pkg", "9.1.0", manifests), "^2.0.0")
result = find_compatible_version_range("pkg", manifests)
self.assertEqual(result.range, "^1.0.0 || ^2.0.0 || ^3.0.0")
self.assertEqual(result.anchors, ["1.0.0", "2.0.0"])
self.assertEqual(result.forward_anchor, "3.0.0")

def test_no_compatible_in_any_major_raises(self):
"""When neither the latest nor any prior major admits the stack, raise."""
def test_stack_invariance(self):
"""Range result does not depend on build stack version."""
manifests = {
"pkg": {
"1.0.0": _manifest("^8.12.0"),
"2.0.0": _manifest("^9.4.0"),
"1.0.0": _manifest("^1.0.0"),
"2.0.0": _manifest("^2.0.0"),
}
}
with self.assertRaises(ValueError):
find_least_compatible_version("pkg", "pkg", "9.1.0", manifests)
first = find_compatible_version_range("pkg", manifests)
second = find_compatible_version_range("pkg", manifests)
self.assertEqual(first, second)

def test_single_major_appends_forward_anchor(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Akin to https://github.com/elastic/detection-rules/pull/6208/changes#r3325352838, if the goal is to support unbounded upper ranges like ">=8.12.0" should we include a test for that? The hi is none branch?

"""A single integration major still appends the forward-looking anchor."""
manifests = {"pkg": {"9.0.0": _manifest("^9.0.0")}}
result = find_compatible_version_range("pkg", manifests)
self.assertEqual(result.range, "^9.0.0 || ^10.0.0")
self.assertEqual(result.anchors, ["9.0.0"])
self.assertEqual(result.forward_anchor, "10.0.0")

def test_three_majors_endpoint_shape(self):
"""Synthetic endpoint-like majors mirror the #5601 reproducer shape."""
manifests = {
"endpoint": {
"7.17.0": _manifest("^7.17.0"),
"8.2.0": _manifest("^8.2.0"),
"9.0.0": _manifest("^9.0.0"),
}
}
result = find_compatible_version_range("endpoint", manifests)
self.assertEqual(result.range, "^7.17.0 || ^8.2.0 || ^9.0.0 || ^10.0.0")
self.assertEqual(result.anchors, ["7.17.0", "8.2.0", "9.0.0"])
self.assertEqual(result.forward_anchor, "10.0.0")

def test_cross_major_fallback(self):
"""Falls back to an earlier major when the latest major is incompatible."""
def test_skips_majors_with_no_overlap(self):
"""Majors without stack overlap are omitted from anchors."""
manifests = {
"pkg": {
"1.0.0": _manifest("^8.12.0"),
"2.0.0": _manifest("^9.4.0"),
"7.10.0": _manifest("^7.10.0"),
"9.4.0": _manifest("=9.4.0"),
}
}
self.assertEqual(find_least_compatible_version("pkg", "pkg", "8.12.0", manifests), "^1.0.0")
result = find_compatible_version_range("pkg", manifests)
self.assertEqual(result.range, "^7.10.0 || ^9.4.0 || ^10.0.0")
self.assertEqual(result.anchors, ["7.10.0", "9.4.0"])

def test_or_clause(self):
"""OR'd clauses are honored by the least-compatible search."""
manifests = {"pkg": {"1.0.0": _manifest("^8.12.0 || ^9.0.0")}}
self.assertEqual(find_least_compatible_version("pkg", "pkg", "9.1.0", manifests), "^1.0.0")
def test_raises_when_no_compatible_major(self):
"""When no stack line can be resolved, raise."""
manifests = {
"pkg": {
"1.0.0": _manifest(">=99.0.0 <99.0.0"),
}
}
with self.assertRaises(ValueError):
find_compatible_version_range("pkg", manifests)

def test_returns_anchor_list_for_policy_template_lookup(self):
"""Anchors and forward anchor are exposed for policy template union."""
manifests = {
"pkg": {
"1.0.0": _manifest("^1.0.0"),
"2.0.0": _manifest("^2.0.0"),
}
}
result = find_compatible_version_range("pkg", manifests)
self.assertEqual(result.anchors, ["1.0.0", "2.0.0"])
self.assertEqual(result.forward_anchor, "3.0.0")
Loading