Skip to content
Merged
52 changes: 48 additions & 4 deletions detection_rules/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import gzip
import json
from collections import OrderedDict, defaultdict
from collections.abc import Iterator
from collections.abc import Iterable, Iterator
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -244,16 +244,57 @@ def _satisfies_kibana_range(stack: Version, version_requirement: str) -> bool:
return any(lo <= stack and (hi is None or stack < hi) for lo, hi in _parse_kibana_range(version_requirement))


def find_latest_integration_patch_for_minor(packages: Iterable[str], major: int, minor: int) -> int:
"""Find the latest stack patch the given integration packages need for a major.minor."""
# The stack-schema-map keys stacks at MAJOR.MINOR.0, but an integration may gate its latest
Comment thread
eric-forte-elastic marked this conversation as resolved.
Comment thread
eric-forte-elastic marked this conversation as resolved.
# package (and newly-added data streams) behind a later patch (e.g. azure ~8.19.10). Resolving
# against the literal .0 falls back to an older package that predates the stream. Return the
# latest patch a package gates on for the minor, i.e. the stack patch needed to receive the most
# up-to-date integration package on that minor. Scan each package once and track the newest
# matching package manifest.
manifests = load_integrations_manifests()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_integrations_manifests() is called on every invocation, and it's called in a for version in get_stack_versions() loop in rule_validators.py.

Can we think of caching this somewhere. If the function isn't memoized/cached, this is operationally heavy and does the same load over and over again per version. Cache or optimised calls of manifest loads is a good idea.

Why this calls for optimised calls we have seen Manifest growing big with growing integration versions. Especially for AWS and Azure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, no we cannot directly cache them given that each rule requirements (fields, versions, etc.) are different/potentially different per rule. Given this, we would need to evaluate them on a per rule level.

Granted we could build a hash map as an optimization so if a rule has the exact same integration info passed that we do not need to compute it again, but the goal of this PR was to go for a less complex approach first, de-duplicate with #6208 and then polish as needed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ Agreed. We should revisit this optimisations in future it will greatly enhance execution times.

latest_patch = 0
for package in packages:
latest_package_version: Version | None = None
latest_package_patch = 0
for package_version, manifest in manifests.get(package, {}).items():
version_requirement = manifest.get("conditions", {}).get("kibana", {}).get("version")
if not version_requirement:
continue
try:
clauses = _parse_kibana_range(version_requirement)
except ValueError:
# Skip manifests whose kibana condition uses tokens we cannot parse.
continue
floors = [lo.patch for lo, _ in clauses if lo.major == major and lo.minor == minor]
if not floors:
continue
parsed_package_version = Version.parse(package_version)
if latest_package_version is None or parsed_package_version > latest_package_version:
latest_package_version = parsed_package_version
latest_package_patch = max(floors)
latest_patch = max(latest_patch, latest_package_patch)
return latest_patch


def find_least_compatible_version(
package: str,
integration: str,
integration: str | None,
current_stack_version: str,
packages_manifest: dict[str, Any],
) -> str:
"""Finds least compatible version for specified integration based on stack version supplied."""
integration_manifests = dict(sorted(packages_manifest[package].items(), key=lambda x: Version.parse(x[0])))
stack_version = Version.parse(current_stack_version, optional_minor_and_patch=True)

# The manifest's kibana condition only tells us whether the *package* installs on the stack, not
# whether this particular integration/data stream exists yet in that package version (e.g. azure
# added aadgraphactivitylogs in 1.37.0, but 1.0.0 already installs on 8.19). The schemas record
# the data streams present per package version, so use them to skip versions that predate the
# integration. Only filter when schema data exists for a version, otherwise fall back to kibana
# compatibility alone (e.g. for synthetic manifests in tests).
package_schemas = load_integrations_schemas().get(package, {})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #6208 removed find_least_compatible_version from the public API and replaced it with find_compatible_version_range. This PR modifies find_least_compatible_version. If #6208 merges before this , this fix needs to be ported to the new function structure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are tracking this 👍

Copy link
Copy Markdown
Contributor

@Mikaayenson Mikaayenson Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ im going to update the other PR after this lands

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

Comment thread
eric-forte-elastic marked this conversation as resolved.
Outdated

# filter integration_manifests to only the latest major entries
major_versions = sorted(
{Version.parse(manifest_version).major for manifest_version in integration_manifests},
Expand All @@ -270,8 +311,11 @@ def find_least_compatible_version(
sorted(major_integration_manifests.items(), key=lambda x: Version.parse(x[0]))
).items():
version_requirement = manifest["conditions"]["kibana"]["version"]
if _satisfies_kibana_range(stack_version, version_requirement):
return f"^{version}"
if not _satisfies_kibana_range(stack_version, version_requirement):
continue
if integration and version in package_schemas and integration not in package_schemas[version]:
continue
return f"^{version}"
Comment thread
eric-forte-elastic marked this conversation as resolved.

raise ValueError(f"no compatible version for integration {package}:{integration}")

Expand Down
15 changes: 13 additions & 2 deletions detection_rules/rule_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
prepare_mappings,
)
from .integrations import (
find_latest_integration_patch_for_minor,
get_integration_schema_data,
load_integrations_manifests,
parse_datasets,
Expand Down Expand Up @@ -924,8 +925,18 @@ def remote_validate_rule( # noqa: PLR0913
# mismatch error, as the EsqlSchemaError and EsqlSyntaxError errors from the stack
# will not be impacted by the difference in schema type mapping.
mappings_lookup: dict[str, dict[str, Any]] = {stack_version: combined_mappings}
versions = get_stack_versions()
for version in versions:

# The schema-map keys stacks at MAJOR.MINOR.0, but an integration may gate its data stream
# behind a later patch (e.g. azure ~8.19.10). Validating at the literal .0 resolves an older
# package that predates the stream, so for each minor use the latest patch the rule's own
# integrations gate on. Only the rule's packages are inspected, not the full manifest.
rule_packages = set(get_rule_integrations(metadata))
rule_packages.update(integration.package for integration in event_dataset_integrations)

for version in get_stack_versions():
parsed = Version.parse(version)
inferred_patch = find_latest_integration_patch_for_minor(rule_packages, parsed.major, parsed.minor)
version = str(parsed.replace(patch=max(parsed.patch, inferred_patch))) # noqa: PLW2901
if version in mappings_lookup:
continue
_, _, combined_mappings = prepare_mappings(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.6.47"
version = "1.6.48"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
41 changes: 41 additions & 0 deletions tests/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""Test integration version resolution against EPR manifest ranges."""

import unittest
import unittest.mock

from semver import Version

Expand Down Expand Up @@ -257,3 +258,43 @@ def test_or_clause(self):
"""OR'd clauses are honored by the least-compatible search."""
manifests = {"pkg": {"1.0.0": _manifest("^8.12.0 || ^9.0.0")}}
self.assertEqual(find_least_compatible_version("pkg", "pkg", "9.1.0", manifests), "^1.0.0")

def test_skips_versions_missing_integration(self):
"""Kibana-compatible versions whose schema lacks the integration are skipped for a later one."""
# Mirrors a data stream added in a later package (e.g. azure aadgraphactivitylogs in 1.37.0):
# older packages still install on the stack but predate the data stream.
manifests = {
"pkg": {
"1.0.0": _manifest("^8.12.0"),
"1.5.0": _manifest("^8.12.0"),
"1.9.0": _manifest("^8.12.0"),
}
}
schemas = {
"pkg": {
"1.0.0": {"existing_ds": {}},
"1.5.0": {"existing_ds": {}},
"1.9.0": {"existing_ds": {}, "new_ds": {}},
}
}
with unittest.mock.patch("detection_rules.integrations.load_integrations_schemas", return_value=schemas):
# 1.0.0/1.5.0 are kibana-compatible but lack new_ds; 1.9.0 is the oldest that has it.
self.assertEqual(find_least_compatible_version("pkg", "new_ds", "8.12.0", manifests), "^1.9.0")
# An integration present in every version is unaffected and still resolves to the oldest.
self.assertEqual(find_least_compatible_version("pkg", "existing_ds", "8.12.0", manifests), "^1.0.0")

def test_no_schema_data_falls_back_to_kibana_only(self):
"""Versions without schema data are not filtered; kibana compatibility alone decides."""
manifests = {"pkg": {"1.0.0": _manifest("^8.12.0"), "1.5.0": _manifest("^8.12.0")}}
with unittest.mock.patch("detection_rules.integrations.load_integrations_schemas", return_value={}):
self.assertEqual(find_least_compatible_version("pkg", "new_ds", "8.12.0", manifests), "^1.0.0")

def test_all_compatible_versions_missing_integration_raises(self):
"""Raise when every kibana-compatible version's schema lacks the requested integration."""
manifests = {"pkg": {"1.0.0": _manifest("^8.12.0"), "1.5.0": _manifest("^8.12.0")}}
schemas = {"pkg": {"1.0.0": {"existing_ds": {}}, "1.5.0": {"existing_ds": {}}}}
with (
unittest.mock.patch("detection_rules.integrations.load_integrations_schemas", return_value=schemas),
self.assertRaises(ValueError),
):
find_least_compatible_version("pkg", "new_ds", "8.12.0", manifests)
9 changes: 9 additions & 0 deletions tests/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,15 @@ def test_stack_schema_map(self):
class TestESQLValidation(unittest.TestCase):
"""Test ESQL rule validation"""

def setUp(self):
"""Force local validation for these tests."""
# These cases exercise local AST/semantic validation (KEEP/METADATA checks). Routing them
# through remote validation is possible, but the explicit goal of these is to use local vs remote,
# so we patch the environment variable to force local validation regardless of other settings.
patcher = unittest.mock.patch.dict(os.environ, {"DR_REMOTE_ESQL_VALIDATION": ""})
patcher.start()
self.addCleanup(patcher.stop)

def test_esql_data_validation(self):
"""Test ESQL rule data validation"""

Expand Down
Loading