Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
728e8f6
fix(integrations): emit stack-invariant OR ranges for related_integra…
Mikaayenson May 27, 2026
6d88831
fix(integrations): emit stack-invariant OR ranges for related_integra…
Mikaayenson May 28, 2026
73c0780
fix(integrations): satisfy ruff SIM110 in _major_has_compatible_stack
Mikaayenson May 28, 2026
07cd5a0
fix(integrations): address review feedback for stack-major resolution
Mikaayenson May 28, 2026
2ac6605
Merge branch 'main' into 5601-bug-improper-prebuilt-rule-version-usag…
Mikaayenson May 29, 2026
1d9fae1
fix(integrations): tighten stack-major overlap and anchor resolution
Mikaayenson May 29, 2026
2302006
fix(integrations): annotate majors_to_check for pyright
Mikaayenson May 29, 2026
76e0369
fix(integrations): rebase #6208 onto main with #6251 schema-aware OR …
Mikaayenson Jun 4, 2026
eef18f2
style(rule): apply ruff format for CI code-checks
Mikaayenson Jun 4, 2026
6ae62b1
fix(integrations): address PR review on version range export
Mikaayenson Jun 4, 2026
adc4f10
fix(integrations): anchor RI export to shipped stack backports only
Mikaayenson Jun 4, 2026
655cffc
refactor(integrations): simplify version range export helpers
Mikaayenson Jun 4, 2026
9adb920
fix(rule): dedupe ES|QL related_integrations metadata package row
Mikaayenson Jun 5, 2026
d4cc69c
test(integrations): drop brittle ES|QL rule-file export tests
Mikaayenson Jun 5, 2026
0919416
refactor(integrations): dedupe schema floor lookup and trim comments
Mikaayenson Jun 5, 2026
6a37067
fix(integrations): walk shipped stack lines for RI anchor collection
Mikaayenson Jun 5, 2026
429ab99
fix(rule): dedupe NON_DATASET metadata RI rows when datasets cover pa…
Mikaayenson Jun 5, 2026
bdbec2a
Merge branch 'main' into 5601-bug-improper-prebuilt-rule-version-usag…
Mikaayenson Jun 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 274 additions & 32 deletions detection_rules/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import fnmatch
import gzip
import json
from collections import OrderedDict, defaultdict
from collections import defaultdict
from collections.abc import Iterable, Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -282,49 +283,290 @@ def find_latest_integration_patch_for_minor(packages: Iterable[str], major: int,
return latest_patch


def find_least_compatible_version(
package: str,
integration: str | None,
current_stack_version: str,
packages_manifest: dict[str, Any],
) -> str:
"""Finds least compatible version for specified integration based on stack version supplied."""
integration_manifests = dict(sorted(packages_manifest[package].items(), key=lambda x: Version.parse(x[0])))
stack_version = Version.parse(current_stack_version, optional_minor_and_patch=True)

# The manifest's kibana condition only tells us whether the *package* installs on the stack, not
# whether this particular integration/data stream exists yet in that package version (e.g. azure
# added aadgraphactivitylogs in 1.37.0, but 1.0.0 already installs on 8.19). The schemas record
# the data streams present per package version, so use them to skip versions that predate the
# integration. Only filter when schema data exists for a version, otherwise fall back to kibana
# compatibility alone (e.g. for synthetic manifests in tests).
# Loaded only when an integration is specified, to avoid decompressing the schemas for
# package-only lookups where the schema check is never consulted.
package_schemas: dict[str, Any] = load_integrations_schemas().get(package, {}) if integration else {}

# filter integration_manifests to only the latest major entries
def _major_has_compatible_stack(major: int, version_requirement: str) -> bool:
"""Return True iff the Kibana range overlaps some stack in [major.0.0, (major+1).0.0)."""
major_lo = Version(major, 0, 0)
major_hi = Version(major + 1, 0, 0)
return any(lo < major_hi and (hi is None or hi > major_lo) for lo, hi in _parse_kibana_range(version_requirement))


def _package_version_has_integration(
version: str,
integration: str,
package_schemas: dict[str, Any],
) -> bool:
"""Return True when schema data is absent or includes the integration/data stream."""
if version not in package_schemas:
return True
return integration in package_schemas[version]


def _stack_majors_supported_by_package(integration_manifests: dict[str, Any]) -> set[int]:
"""Collect Kibana stack majors that any manifest in the package can serve."""
stack_majors: set[int] = set()
for manifest in integration_manifests.values():
version_requirement = manifest["conditions"]["kibana"]["version"]
for lo, hi in _parse_kibana_range(version_requirement):
majors_to_check: list[int]
if hi is None:
majors_to_check = [lo.major]
Copy link
Copy Markdown
Contributor

@eric-forte-elastic eric-forte-elastic May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this if check:

fix(integrations): tighten stack-major overlap and anchor resolution
Walk every stack major whose band intersects a bounded Kibana clause
(e.g. >=8.12.0 <9.1.0 includes major 9) and pick the earliest compatible
stack point within a major for the legacy least-compatible walk.

I don't think any of our manifests include a situation where hi is None unless one were to have a manifest with an unbounded range (e.g. ">=8.12.0"). Fine to have this check, but given this I am not sure that this functions as intended.

For the case with an unbounded range majors_to_check only will have [8] and not any other stack majors. Not sure if this accomplishes the goal.

Image

Whereas for the case in the note, it correctly has multiple stack versions to check.

Image

else:
major = lo.major
majors_to_check = []
while Version(major, 0, 0) < hi:
majors_to_check.append(major)
major += 1
for major in majors_to_check:
if _major_has_compatible_stack(major, version_requirement):
stack_majors.add(major)
return stack_majors


def _anchor_for_aligned_integration_major(
major: int,
integration_manifests: dict[str, Any],
integration: str | None = None,
package_schemas: dict[str, Any] | None = None,
) -> str | None:
"""Oldest integration version in major whose Kibana range overlaps [major, major+1)."""
major_manifests = {
version: manifest
for version, manifest in integration_manifests.items()
if Version.parse(version).major == major
}
for version, manifest in sorted(major_manifests.items(), key=lambda x: Version.parse(x[0])):
version_requirement = manifest["conditions"]["kibana"]["version"]
if not _major_has_compatible_stack(major, version_requirement):
continue
if (
integration
and package_schemas is not None
and not _package_version_has_integration(version, integration, package_schemas)
):
continue
return version
return None


def _find_least_compatible_for_stack(
stack_version: Version,
integration_manifests: dict[str, Any],
integration: str | None = None,
package_schemas: dict[str, Any] | None = None,
) -> str | None:
"""Stack-dependent least compatible integration version (pre-#5601 behavior)."""
major_versions = sorted(
{Version.parse(manifest_version).major for manifest_version in integration_manifests},
reverse=True,
)
for max_major in major_versions:
major_integration_manifests = {
k: v for k, v in integration_manifests.items() if Version.parse(k).major == max_major
version: manifest
for version, manifest in integration_manifests.items()
if Version.parse(version).major == max_major
}

# iterates through ascending integration manifests
# returns latest major version that is least compatible
for version, manifest in OrderedDict(
sorted(major_integration_manifests.items(), key=lambda x: Version.parse(x[0]))
).items():
for version, manifest in sorted(major_integration_manifests.items(), key=lambda x: Version.parse(x[0])):
version_requirement = manifest["conditions"]["kibana"]["version"]
if not _satisfies_kibana_range(stack_version, version_requirement):
continue
if integration and version in package_schemas and integration not in package_schemas[version]:
if (
integration
and package_schemas is not None
and not _package_version_has_integration(version, integration, package_schemas)
):
continue
return f"^{version}"
return version
return None

raise ValueError(f"no compatible version for integration {package}:{integration}")

def _stack_version_for_major(stack_major: int, integration_manifests: dict[str, Any]) -> Version:
"""Pick the earliest stack version within stack_major that satisfies manifest ranges."""
major_lo = Version(stack_major, 0, 0)
major_hi = Version(stack_major + 1, 0, 0)
candidates: list[Version] = []

for manifest in integration_manifests.values():
version_requirement = manifest["conditions"]["kibana"]["version"]
if not _major_has_compatible_stack(stack_major, version_requirement):
continue
for lo, hi in _parse_kibana_range(version_requirement):
if hi is not None and hi <= major_lo:
continue
if lo >= major_hi:
continue
in_major = lo if lo >= major_lo else major_lo
if _satisfies_kibana_range(in_major, version_requirement):
candidates.append(in_major)
elif _satisfies_kibana_range(major_lo, version_requirement):
candidates.append(major_lo)

return min(candidates) if candidates else major_lo


@dataclass(frozen=True)
class CompatibleVersionRange:
"""Stack-invariant related integration compatibility range."""

range: str
anchors: list[str]
forward_anchor: str


def _build_compatible_version_range(anchors: list[str]) -> CompatibleVersionRange:
"""Build a CompatibleVersionRange from manifest-backed anchor versions."""
if not anchors:
raise ValueError("anchors must not be empty")

sorted_anchors = sorted(set(anchors), key=Version.parse)
top_major = max(Version.parse(anchor).major for anchor in sorted_anchors)
forward_anchor = f"{top_major + 1}.0.0"
range_parts = [f"^{anchor}" for anchor in sorted_anchors] + [f"^{forward_anchor}"]
return CompatibleVersionRange(
range=" || ".join(range_parts),
anchors=sorted_anchors,
forward_anchor=forward_anchor,
)


def minimum_schema_package_version(
package: str,
integration: str,
integration_schemas: dict[str, Any],
) -> str | None:
"""Return the oldest package version whose schema includes integration, if any."""
package_schemas = integration_schemas.get(package)
if not package_schemas:
return None

for version in sorted(package_schemas, key=Version.parse):
if integration in package_schemas[version]:
return version
return None


def apply_schema_version_floor(
result: CompatibleVersionRange,
schema_floor: str,
) -> CompatibleVersionRange:
"""Raise anchors in the schema floor's package major when below schema_floor."""
floor_version = Version.parse(schema_floor)
floor_major = floor_version.major
bumped_anchors: list[str] = []

for anchor in result.anchors:
anchor_version = Version.parse(anchor)
if anchor_version.major == floor_major and anchor_version < floor_version:
continue
bumped_anchors.append(anchor)

if not any(Version.parse(anchor).major == floor_major for anchor in bumped_anchors):
bumped_anchors.append(schema_floor)

if bumped_anchors == result.anchors:
return result

return _build_compatible_version_range(bumped_anchors)


def _collect_compatible_anchors(
integration_manifests: dict[str, Any],
stack_majors: set[int],
integration: str | None,
package_schemas: dict[str, Any],
) -> list[str]:
"""Collect manifest anchors for each supported stack major."""
integration_majors = {Version.parse(version).major for version in integration_manifests}
aligned_by_major = {
major: anchor
for major in sorted(integration_majors)
if (
anchor := _anchor_for_aligned_integration_major(
major,
integration_manifests,
integration,
package_schemas,
)
)
is not None
}
aligned_min_major = min(aligned_by_major) if aligned_by_major else None

if aligned_min_major is not None:
effective_stack_majors = sorted(stack_major for stack_major in stack_majors if stack_major >= aligned_min_major)
else:
effective_stack_majors = sorted(
stack_major for stack_major in stack_majors if stack_major >= max(stack_majors) - 1
)

anchors: list[str] = []
for stack_major in effective_stack_majors:
if stack_major in aligned_by_major:
anchor = aligned_by_major[stack_major]
else:
anchor = _find_least_compatible_for_stack(
_stack_version_for_major(stack_major, integration_manifests),
integration_manifests,
integration,
package_schemas,
)
if anchor and anchor not in anchors:
anchors.append(anchor)
return anchors


def _schema_floor_compatible_range(
package: str,
packages_manifest: dict[str, Any],
integration: str,
package_schemas: dict[str, Any],
) -> CompatibleVersionRange | None:
"""Build a range from the package baseline when only schema data defines the floor."""
schema_floor = minimum_schema_package_version(package, integration, {package: package_schemas})
if not schema_floor:
return None
baseline = find_compatible_version_range(package, packages_manifest)
return apply_schema_version_floor(baseline, schema_floor)


def find_compatible_version_range(
package: str,
packages_manifest: dict[str, Any],
integration: str | None = None,
) -> CompatibleVersionRange:
"""Return a stack-invariant OR'd caret range for related_integrations.version."""
package_manifest = packages_manifest.get(package)
if package_manifest is None:
raise ValueError(f"Package {package} not found in manifest.")

package_schemas: dict[str, Any] = {}
if integration:
package_schemas = load_integrations_schemas().get(package, {})

integration_manifests = dict(sorted(package_manifest.items(), key=lambda x: Version.parse(x[0])))
stack_majors = _stack_majors_supported_by_package(integration_manifests)

if not stack_majors:
raise ValueError(f"no compatible version for integration package {package}")

anchors = _collect_compatible_anchors(integration_manifests, stack_majors, integration, package_schemas)

if not anchors:
schema_range = (
_schema_floor_compatible_range(package, packages_manifest, integration, package_schemas)
if integration and package_schemas
else None
)
if schema_range:
return schema_range
package_label = f"{package}:{integration}" if integration else package
raise ValueError(f"no compatible version for integration {package_label}")

result = _build_compatible_version_range(anchors)
if integration and package_schemas:
schema_floor = minimum_schema_package_version(package, integration, {package: package_schemas})
if schema_floor:
result = apply_schema_version_floor(result, schema_floor)
return result


def find_latest_compatible_version(
Expand Down
25 changes: 12 additions & 13 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from .esql import get_esql_query_event_dataset_integrations
from .esql_errors import EsqlSemanticError
from .integrations import (
find_least_compatible_version,
find_compatible_version_range,
get_integration_schema_fields,
load_integrations_manifests,
load_integrations_schemas,
Expand Down Expand Up @@ -1428,7 +1428,6 @@ def _convert_add_related_integrations(self, obj: dict[str, Any]) -> None:

if not package_integrations and self.metadata.integration:
packages_manifest = load_integrations_manifests()
current_stack_version = load_current_package_version()

if self.check_restricted_field_version(field_name) and isinstance(
self.data, QueryRuleData | MachineLearningRuleData
Expand All @@ -1446,22 +1445,22 @@ def _convert_add_related_integrations(self, obj: dict[str, Any]) -> None:
return

for package in package_integrations:
package["version"] = find_least_compatible_version(
integration = package.get("integration")
integration_name = integration if integration and integration != "Unknown" else None
result = find_compatible_version_range(
package=package["package"],
integration=package["integration"],
current_stack_version=current_stack_version,
packages_manifest=packages_manifest,
integration=integration_name,
)
package["version"] = result.range

# if integration is not a policy template remove
if package["version"]:
version_data = packages_manifest.get(package["package"], {}).get(
package["version"].strip("^"), {}
)
policy_templates = version_data.get("policy_templates", [])
policy_templates: set[str] = set()
for anchor in result.anchors:
version_data = packages_manifest.get(package["package"], {}).get(anchor, {})
policy_templates.update(version_data.get("policy_templates", []))

if package["integration"] not in policy_templates:
del package["integration"]
if package["integration"] not in policy_templates:
del package["integration"]

# remove duplicate entries
package_integrations = list({json.dumps(d, sort_keys=True): d for d in package_integrations}.values())
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.6.48"
version = "1.6.49"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
Loading
Loading