diff --git a/detection_rules/integrations.py b/detection_rules/integrations.py index e4a5bad9482..a04be8de242 100644 --- a/detection_rules/integrations.py +++ b/detection_rules/integrations.py @@ -8,8 +8,9 @@ import fnmatch import gzip import json -from collections import OrderedDict, defaultdict +from collections import defaultdict from collections.abc import Iterable, Iterator +from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any @@ -22,7 +23,7 @@ from . import ecs from .beats import flatten_ecs_schema from .config import load_current_package_version -from .schemas import definitions +from .schemas import definitions, get_stack_versions from .utils import cached, get_etc_path, read_gzip, unzip if TYPE_CHECKING: @@ -245,18 +246,17 @@ def _satisfies_kibana_range(stack: Version, version_requirement: str) -> bool: def find_latest_integration_patch_for_minor(packages: Iterable[str], major: int, minor: int) -> int: - """Find the latest stack patch the given integration packages need for a major.minor.""" - # The stack-schema-map keys stacks at MAJOR.MINOR.0, but an integration may gate its latest - # package (and newly-added data streams) behind a later patch (e.g. azure ~8.19.10). Resolving - # against the literal .0 falls back to an older package that predates the stream. Return the - # latest patch a package gates on for the minor, i.e. the stack patch needed to receive the most - # up-to-date integration package on that minor. + """Find the latest stack patch integration packages need for a major.minor.""" + # stack-schema-map keys stacks at MAJOR.MINOR.0, but an integration may gate its latest + # package (and newly-added data streams) behind a later patch (e.g. azure ~8.19.10). + # Resolving against the literal .0 falls back to an older package that predates the + # stream. Return the latest patch a package gates on for the minor. # - # Track the *newest* package version's floor (not the max floor across all versions): Fleet always - # installs the latest compatible package, so that floor is the patch a stack actually needs. A - # newer package occasionally lowers its floor (e.g. apm 7.16.1 gates ^7.16.1 but the newer 7.16.2 - # gates ^7.16.0); honoring the newest version matches what Fleet installs rather than an older, - # higher floor that would never be installed on that stack. + # Track the *newest* package version's floor (not the max floor across all versions): + # Fleet always installs the latest compatible package, so that floor is the patch a + # stack actually needs. A newer package occasionally lowers its floor (e.g. apm 7.16.1 + # gates ^7.16.1 but the newer 7.16.2 gates ^7.16.0); honoring the newest version + # matches what Fleet installs rather than an older, higher floor. manifests = load_integrations_manifests() latest_patch = 0 for package in packages: @@ -282,49 +282,242 @@ def find_latest_integration_patch_for_minor(packages: Iterable[str], major: int, return latest_patch -def find_least_compatible_version( - package: str, - integration: str | None, - current_stack_version: str, - packages_manifest: dict[str, Any], -) -> str: - """Finds least compatible version for specified integration based on stack version supplied.""" - integration_manifests = dict(sorted(packages_manifest[package].items(), key=lambda x: Version.parse(x[0]))) - stack_version = Version.parse(current_stack_version, optional_minor_and_patch=True) - - # The manifest's kibana condition only tells us whether the *package* installs on the stack, not - # whether this particular integration/data stream exists yet in that package version (e.g. azure - # added aadgraphactivitylogs in 1.37.0, but 1.0.0 already installs on 8.19). The schemas record - # the data streams present per package version, so use them to skip versions that predate the - # integration. Only filter when schema data exists for a version, otherwise fall back to kibana - # compatibility alone (e.g. for synthetic manifests in tests). - # Loaded only when an integration is specified, to avoid decompressing the schemas for - # package-only lookups where the schema check is never consulted. - package_schemas: dict[str, Any] = load_integrations_schemas().get(package, {}) if integration else {} - - # filter integration_manifests to only the latest major entries +# Sentinel written by ``parse_datasets`` when a rule indexes a package but not a data stream. +UNKNOWN_PACKAGE_INTEGRATION = "Unknown" + +# Cap majors walked for unbounded Kibana clauses (``>=X.Y.Z``). Intersection with +# ``_shipped_stack_majors()`` keeps only backport lines we ship rules to. +_MAX_UNBOUNDED_STACK_MAJOR_SPAN = 10 + + +def _major_has_compatible_stack(major: int, version_requirement: str) -> bool: + """Return True iff the Kibana range overlaps some stack in [major.0.0, (major+1).0.0).""" + major_lo = Version(major, 0, 0) + major_hi = Version(major + 1, 0, 0) + return any(lo < major_hi and (hi is None or hi > major_lo) for lo, hi in _parse_kibana_range(version_requirement)) + + +def _package_version_has_integration( + version: str, + integration: str, + package_schemas: dict[str, Any], +) -> bool: + """Return True when schema data is absent or includes the integration/data stream.""" + if version not in package_schemas: + return True + return integration in package_schemas[version] + + +def _majors_overlapping_kibana_clause( + lo: Version, + hi: Version | None, + version_requirement: str, +) -> list[int]: + """Return stack majors whose [M.0.0, (M+1).0.0) band intersects the parsed clause bounds.""" + if hi is not None: + majors_to_check: list[int] = [] + major = lo.major + while Version(major, 0, 0) < hi: + majors_to_check.append(major) + major += 1 + return majors_to_check + + # Unbounded upper (``>=``, ``>``): walk forward while the major still overlaps. + majors_to_check: list[int] = [] + major = lo.major + while major <= lo.major + _MAX_UNBOUNDED_STACK_MAJOR_SPAN and _major_has_compatible_stack( + major, version_requirement + ): + majors_to_check.append(major) + major += 1 + return majors_to_check + + +def _stack_majors_supported_by_package(integration_manifests: dict[str, Any]) -> set[int]: + """Collect Kibana stack majors that any manifest in the package can serve.""" + stack_majors: set[int] = set() + for manifest in integration_manifests.values(): + version_requirement = manifest["conditions"]["kibana"]["version"] + for lo, hi in _parse_kibana_range(version_requirement): + for major in _majors_overlapping_kibana_clause(lo, hi, version_requirement): + stack_majors.add(major) + return stack_majors + + +def _find_least_compatible_for_stack( + stack_version: Version, + integration_manifests: dict[str, Any], + integration: str | None = None, + package_schemas: dict[str, Any] | None = None, +) -> str | None: + """Stack-dependent least compatible integration version (pre-#5601 behavior).""" major_versions = sorted( {Version.parse(manifest_version).major for manifest_version in integration_manifests}, reverse=True, ) for max_major in major_versions: major_integration_manifests = { - k: v for k, v in integration_manifests.items() if Version.parse(k).major == max_major + version: manifest + for version, manifest in integration_manifests.items() + if Version.parse(version).major == max_major } - - # iterates through ascending integration manifests - # returns latest major version that is least compatible - for version, manifest in OrderedDict( - sorted(major_integration_manifests.items(), key=lambda x: Version.parse(x[0])) - ).items(): + for version, manifest in sorted(major_integration_manifests.items(), key=lambda x: Version.parse(x[0])): version_requirement = manifest["conditions"]["kibana"]["version"] if not _satisfies_kibana_range(stack_version, version_requirement): continue - if integration and version in package_schemas and integration not in package_schemas[version]: + if ( + integration + and package_schemas is not None + and not _package_version_has_integration(version, integration, package_schemas) + ): continue - return f"^{version}" + return version + return None - raise ValueError(f"no compatible version for integration {package}:{integration}") + +@dataclass(frozen=True) +class CompatibleVersionRange: + """Stack-invariant related integration compatibility range.""" + + range: str + anchors: tuple[str, ...] + forward_anchor: str + + +def _build_compatible_version_range(anchors: list[str]) -> CompatibleVersionRange: + """Build a CompatibleVersionRange from manifest-backed anchor versions.""" + if not anchors: + raise ValueError("anchors must not be empty") + + sorted_anchors = tuple(sorted(set(anchors), key=Version.parse)) + top_major = max(Version.parse(anchor).major for anchor in sorted_anchors) + # Forward sentinel for the next integration major (no manifest entry yet). + forward_anchor = f"{top_major + 1}.0.0" + range_parts = [f"^{anchor}" for anchor in sorted_anchors] + [f"^{forward_anchor}"] + return CompatibleVersionRange( + range=" || ".join(range_parts), + anchors=sorted_anchors, + forward_anchor=forward_anchor, + ) + + +@cached +def _shipped_stack_majors() -> set[int]: + """Stack majors we ship prebuilt rules to (from the stack-schema-map backport lines).""" + return {Version.parse(version).major for version in get_stack_versions()} + + +def minimum_schema_package_version( + package: str, + integration: str, + integration_schemas: dict[str, Any], +) -> str | None: + """Return the oldest package version whose schema includes integration, if any.""" + package_schemas = integration_schemas.get(package) + if not package_schemas: + return None + + for version in sorted(package_schemas, key=Version.parse): + if integration in package_schemas[version]: + return version + return None + + +def apply_schema_version_floor( + result: CompatibleVersionRange, + schema_floor: str, +) -> CompatibleVersionRange: + """Raise anchors in the schema floor's package major when below schema_floor.""" + floor_version = Version.parse(schema_floor) + floor_major = floor_version.major + bumped_anchors: list[str] = [] + + for anchor in result.anchors: + anchor_version = Version.parse(anchor) + if anchor_version.major == floor_major and anchor_version < floor_version: + continue + bumped_anchors.append(anchor) + + if not any(Version.parse(anchor).major == floor_major for anchor in bumped_anchors): + bumped_anchors.append(schema_floor) + + bumped_tuple = tuple(sorted(bumped_anchors, key=Version.parse)) + if bumped_tuple == result.anchors: + return result + + return _build_compatible_version_range(list(bumped_tuple)) + + +def _collect_compatible_anchors( + integration_manifests: dict[str, Any], + stack_majors: set[int], + integration: str | None, + package_schemas: dict[str, Any], +) -> list[str]: + """Oldest compatible integration version per shipped stack version line.""" + anchors: list[str] = [] + for stack_version_str in get_stack_versions(): + stack_version = Version.parse(stack_version_str) + if stack_version.major not in stack_majors: + continue + anchor = _find_least_compatible_for_stack( + stack_version, + integration_manifests, + integration, + package_schemas, + ) + if anchor and anchor not in anchors: + anchors.append(anchor) + return anchors + + +def _integration_schema_floor( + package: str, + integration: str | None, + package_schemas: dict[str, Any], +) -> str | None: + """Oldest package version whose schema includes integration, when schemas are loaded.""" + if not integration or not package_schemas: + return None + return minimum_schema_package_version(package, integration, {package: package_schemas}) + + +def find_compatible_version_range( + package: str, + packages_manifest: dict[str, Any], + integration: str | None = None, +) -> CompatibleVersionRange: + """Return a stack-invariant OR'd caret range for related_integrations.version.""" + # One anchor per shipped stack version line (no build-time stack), OR'd carets, forward sentinel. + # With integration set, filter by integration-schemas when present (data-stream floor). + package_manifest = packages_manifest.get(package) + if package_manifest is None: + raise ValueError(f"Package {package} not found in manifest.") + + package_schemas: dict[str, Any] = {} + if integration: + package_schemas = load_integrations_schemas().get(package, {}) + schema_floor = _integration_schema_floor(package, integration, package_schemas) + + integration_manifests = dict(sorted(package_manifest.items(), key=lambda x: Version.parse(x[0]))) + stack_majors = _stack_majors_supported_by_package(integration_manifests) & _shipped_stack_majors() + + if not stack_majors: + raise ValueError(f"no compatible version for integration package {package}") + + anchors = _collect_compatible_anchors(integration_manifests, stack_majors, integration, package_schemas) + + if not anchors: + if schema_floor: + baseline = find_compatible_version_range(package, packages_manifest) + return apply_schema_version_floor(baseline, schema_floor) + package_label = f"{package}:{integration}" if integration else package + raise ValueError(f"no compatible version for integration {package_label}") + + result = _build_compatible_version_range(anchors) + if schema_floor: + result = apply_schema_version_floor(result, schema_floor) + return result def find_latest_compatible_version( @@ -537,7 +730,7 @@ def parse_datasets(datasets: list[str], package_manifest: dict[str, Any]) -> lis # cleanup extra quotes pulled from ast field value = _value.strip('"') - integration = "Unknown" + integration = UNKNOWN_PACKAGE_INTEGRATION if "." in value: package, integration = value.split(".", 1) # Handle cases where endpoint event datasource needs to be parsed uniquely (e.g endpoint.events.network) diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 28de81b9bb1..866bc45fb62 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -32,7 +32,8 @@ from .esql import get_esql_query_event_dataset_integrations from .esql_errors import EsqlSemanticError from .integrations import ( - find_least_compatible_version, + UNKNOWN_PACKAGE_INTEGRATION, + find_compatible_version_range, get_integration_schema_fields, load_integrations_manifests, load_integrations_schemas, @@ -1428,7 +1429,6 @@ def _convert_add_related_integrations(self, obj: dict[str, Any]) -> None: if not package_integrations and self.metadata.integration: packages_manifest = load_integrations_manifests() - current_stack_version = load_current_package_version() if self.check_restricted_field_version(field_name) and isinstance( self.data, QueryRuleData | MachineLearningRuleData @@ -1446,22 +1446,26 @@ def _convert_add_related_integrations(self, obj: dict[str, Any]) -> None: return for package in package_integrations: - package["version"] = find_least_compatible_version( + integration = package.get("integration") + integration_name = ( + integration if integration and integration != UNKNOWN_PACKAGE_INTEGRATION else None + ) + result = find_compatible_version_range( package=package["package"], - integration=package["integration"], - current_stack_version=current_stack_version, packages_manifest=packages_manifest, + integration=integration_name, ) + package["version"] = result.range - # if integration is not a policy template remove - if package["version"]: - version_data = packages_manifest.get(package["package"], {}).get( - package["version"].strip("^"), {} - ) - policy_templates = version_data.get("policy_templates", []) + # Union policy templates across manifest-backed anchors only. + # forward_anchor has no manifest entry and is excluded by design. + policy_templates: set[str] = set() + for anchor in result.anchors: + version_data = packages_manifest.get(package["package"], {}).get(anchor, {}) + policy_templates.update(version_data.get("policy_templates", [])) - if package["integration"] not in policy_templates: - del package["integration"] + if package["integration"] not in policy_templates: + del package["integration"] # remove duplicate entries package_integrations = list({json.dumps(d, sort_keys=True): d for d in package_integrations}.values()) @@ -1579,14 +1583,14 @@ def get_packaged_integrations( if isinstance(rule_integrations, str): rule_integrations = [rule_integrations] for integration in rule_integrations: - ineligible_integrations = [ - *definitions.NON_DATASET_PACKAGES, - *map(str.lower, definitions.MACHINE_LEARNING_PACKAGES), - ] - if ( - integration in ineligible_integrations - or isinstance(data, MachineLearningRuleData) - or (isinstance(data, ESQLRuleData) and integration not in datasets) + ml_packages_lower = set(map(str.lower, definitions.MACHINE_LEARNING_PACKAGES)) + if isinstance(data, MachineLearningRuleData): + packaged_integrations.append({"package": integration, "integration": None}) + elif integration in definitions.NON_DATASET_PACKAGES: + if _metadata_package_row_needed(integration, datasets): + packaged_integrations.append({"package": integration, "integration": None}) + elif integration.lower() in ml_packages_lower or ( + isinstance(data, ESQLRuleData) and _metadata_package_row_needed(integration, datasets) ): packaged_integrations.append({"package": integration, "integration": None}) @@ -1890,6 +1894,19 @@ def get_unique_query_fields(rule: TOMLRule) -> list[str] | None: return sorted({str(f) for f in parsed if isinstance(f, (eql.ast.Field | kql.ast.Field))}) # type: ignore[reportUnknownVariableType] +def _metadata_package_row_needed(integration: str, datasets: set[str]) -> bool: + """Return True when a metadata-only package row is still required.""" + # Metadata tags the package name; query datasets use package.stream (e.g. endpoint.events.api). + if integration in datasets: + return False + prefix = f"{integration}." + return not any(dataset.startswith(prefix) for dataset in datasets) + + +# Backward-compatible alias for ES|QL export tests and callers. +_esql_metadata_package_row_needed = _metadata_package_row_needed + + def parse_datasets(datasets: list[str], package_manifest: dict[str, Any]) -> list[dict[str, Any]]: """Parses datasets into packaged integrations from rule data.""" packaged_integrations: list[dict[str, Any]] = [] @@ -1897,7 +1914,7 @@ def parse_datasets(datasets: list[str], package_manifest: dict[str, Any]) -> lis # cleanup extra quotes pulled from ast field value = _value.strip('"') - integration = "Unknown" + integration = UNKNOWN_PACKAGE_INTEGRATION if "." in value: package, integration = value.split(".", 1) # Handle cases where endpoint event datasource needs to be parsed uniquely (e.g endpoint.events.network) diff --git a/pyproject.toml b/pyproject.toml index aa052f021a6..847c3622565 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.6.48" +version = "1.6.49" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12" diff --git a/tests/test_integrations.py b/tests/test_integrations.py index 80eb1135275..a8f8cda098e 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -11,12 +11,17 @@ from semver import Version from detection_rules.integrations import ( + _MAX_UNBOUNDED_STACK_MAJOR_SPAN, + _find_least_compatible_for_stack, + _majors_overlapping_kibana_clause, _parse_clause, _parse_kibana_range, _satisfies_kibana_range, + _stack_majors_supported_by_package, + find_compatible_version_range, find_latest_compatible_version, - find_least_compatible_version, ) +from detection_rules.schemas import get_stack_versions def _manifest(kibana_version: str) -> dict: @@ -216,53 +221,187 @@ def test_unknown_package_raises(self): find_latest_compatible_version("missing", "missing", Version(9, 1, 0), {}) -class TestFindLeastCompatibleVersion(unittest.TestCase): - """Behavior coverage for ``find_least_compatible_version``.""" +class TestFindCompatibleVersionRange(unittest.TestCase): + """Behavior coverage for ``find_compatible_version_range``.""" - def test_picks_oldest_compatible_in_latest_major(self): - """Returns the oldest manifest in the latest major whose range admits the stack.""" + def test_emits_or_range_across_majors(self): + """Emits oldest anchor per shipped stack major plus a forward-looking next-major anchor.""" manifests = { "pkg": { - "1.0.0": _manifest("^8.12.0"), - "1.5.0": _manifest("^8.12.0"), + "1.0.0": _manifest("^8.0.0"), + "1.5.0": _manifest("^8.0.0"), "2.0.0": _manifest("^9.0.0"), - "2.1.0": _manifest("^9.1.0"), "2.5.0": _manifest("^9.1.0"), } } - # 2.0.0 (^9.0.0) is the oldest 9.x manifest that admits a 9.1.0 stack. - self.assertEqual(find_least_compatible_version("pkg", "pkg", "9.1.0", manifests), "^2.0.0") + result = find_compatible_version_range("pkg", manifests) + self.assertEqual(result.range, "^1.0.0 || ^2.0.0 || ^3.0.0") + self.assertEqual(result.anchors, ("1.0.0", "2.0.0")) + self.assertEqual(result.forward_anchor, "3.0.0") - def test_no_compatible_in_any_major_raises(self): - """When neither the latest nor any prior major admits the stack, raise.""" + def test_stack_invariance(self): + """Range result does not depend on build stack version.""" manifests = { "pkg": { - "1.0.0": _manifest("^8.12.0"), - "2.0.0": _manifest("^9.4.0"), + "1.0.0": _manifest("^8.0.0"), + "2.0.0": _manifest("^9.0.0"), + } + } + first = find_compatible_version_range("pkg", manifests) + second = find_compatible_version_range("pkg", manifests) + self.assertEqual(first, second) + + def test_single_major_appends_forward_anchor(self): + """A single integration major still appends the forward-looking anchor.""" + manifests = {"pkg": {"9.0.0": _manifest("^9.0.0")}} + result = find_compatible_version_range("pkg", manifests) + self.assertEqual(result.range, "^9.0.0 || ^10.0.0") + self.assertEqual(result.anchors, ("9.0.0",)) + self.assertEqual(result.forward_anchor, "10.0.0") + + def test_three_majors_endpoint_shape(self): + """Synthetic endpoint-like majors on shipped stack lines (8.x and 9.x).""" + manifests = { + "endpoint": { + "7.17.0": _manifest("^7.17.0"), + "8.2.0": _manifest("^8.2.0"), + "9.0.0": _manifest("^9.0.0"), + } + } + result = find_compatible_version_range("endpoint", manifests) + self.assertEqual(result.range, "^8.2.0 || ^9.0.0 || ^10.0.0") + self.assertEqual(result.anchors, ("8.2.0", "9.0.0")) + self.assertEqual(result.forward_anchor, "10.0.0") + + def test_skips_majors_with_no_overlap(self): + """Majors without stack overlap are omitted from anchors.""" + manifests = { + "pkg": { + "7.10.0": _manifest("^7.10.0"), + "9.4.0": _manifest("=9.4.0"), + } + } + result = find_compatible_version_range("pkg", manifests) + self.assertEqual(result.range, "^9.4.0 || ^10.0.0") + self.assertEqual(result.anchors, ("9.4.0",)) + + def test_raises_when_no_compatible_major(self): + """When no stack line can be resolved, raise.""" + manifests = { + "pkg": { + "1.0.0": _manifest(">=99.0.0 <99.0.0"), } } with self.assertRaises(ValueError): - find_least_compatible_version("pkg", "pkg", "9.1.0", manifests) + find_compatible_version_range("pkg", manifests) + + def test_returns_anchor_list_for_policy_template_lookup(self): + """Anchors and forward anchor are exposed for policy template union.""" + manifests = { + "pkg": { + "1.0.0": _manifest("^8.0.0"), + "2.0.0": _manifest("^9.0.0"), + } + } + result = find_compatible_version_range("pkg", manifests) + self.assertEqual(result.anchors, ("1.0.0", "2.0.0")) + self.assertEqual(result.forward_anchor, "3.0.0") + + def test_unbounded_kibana_range_collects_multiple_stack_majors(self): + """``>=8.12.0`` (unbounded upper) must collect every overlapping stack major.""" + manifests = {"pkg": {"1.0.0": _manifest(">=8.12.0")}} + stack_majors = _stack_majors_supported_by_package(manifests["pkg"]) + lo_major = 8 + expected = set(range(lo_major, lo_major + _MAX_UNBOUNDED_STACK_MAJOR_SPAN + 1)) + self.assertEqual(stack_majors, expected) + + def test_bounded_kibana_range_includes_upper_major(self): + """``>=8.12.0 <9.1.0`` overlaps stack major 9 (9.0.x) and must include it.""" + majors = _majors_overlapping_kibana_clause( + Version(8, 12, 0), + Version(9, 1, 0), + ">=8.12.0 <9.1.0", + ) + self.assertIn(8, majors) + self.assertIn(9, majors) + self.assertNotIn(10, majors) - def test_cross_major_fallback(self): - """Falls back to an earlier major when the latest major is incompatible.""" + def test_non_aligned_package_covers_shipped_stack_majors(self): + """Non-aligned packages emit one anchor per shipped backport stack major.""" manifests = { "pkg": { "1.0.0": _manifest("^8.12.0"), - "2.0.0": _manifest("^9.4.0"), + "1.1.0": _manifest("^9.0.0"), + "1.2.0": _manifest("^10.0.0"), } } - self.assertEqual(find_least_compatible_version("pkg", "pkg", "8.12.0", manifests), "^1.0.0") + result = find_compatible_version_range("pkg", manifests) + # Stack 10 is not a shipped backport line; only 8.x and 9.x majors from stack-schema-map. + self.assertEqual(result.anchors, ("1.0.0", "1.1.0")) + self.assertEqual(result.range, "^1.0.0 || ^1.1.0 || ^2.0.0") - def test_or_clause(self): - """OR'd clauses are honored by the least-compatible search.""" - manifests = {"pkg": {"1.0.0": _manifest("^8.12.0 || ^9.0.0")}} - self.assertEqual(find_least_compatible_version("pkg", "pkg", "9.1.0", manifests), "^1.0.0") + def test_excludes_unshipped_stack_majors(self): + """Manifest stack lines outside shipped backports (e.g. Kibana 7.x) are not walked.""" + manifests = { + "pkg": { + "0.0.2": _manifest("^7.9.0"), + "1.0.0": _manifest("^8.0.0"), + "1.22.0": _manifest("^9.0.0"), + } + } + result = find_compatible_version_range("pkg", manifests) + self.assertEqual(result.anchors, ("1.0.0", "1.22.0")) + self.assertNotIn("0.0.2", result.anchors) + self.assertEqual(result.range, "^1.0.0 || ^1.22.0 || ^2.0.0") + + def test_keeps_zero_major_when_only_stable_option_missing(self): + """Keep 0.x anchors when no major >= 1 anchor exists.""" + manifests = {"pkg": {"0.5.0": _manifest("^8.0.0")}} + result = find_compatible_version_range("pkg", manifests) + self.assertEqual(result.anchors, ("0.5.0",)) + + def test_anchors_cover_each_shipped_stack_export(self): + """Each per-stack least-compatible anchor must appear in the OR range (Kibana semver.satisfies).""" + manifests = { + "pkg": { + "1.0.0": _manifest("^8.0.0"), + "2.0.0": _manifest("^9.2.0"), + "3.0.0": _manifest("^9.4.0"), + } + } + result = find_compatible_version_range("pkg", manifests) + for stack_version_str in get_stack_versions(): + stack_version = Version.parse(stack_version_str) + expected = _find_least_compatible_for_stack(stack_version, manifests["pkg"]) + if expected is None: + continue + self.assertIn( + expected, + result.anchors, + f"stack {stack_version_str} exported ^{expected} but anchors are {result.anchors}", + ) + + def test_aws_range_includes_late_stack_anchors(self): + """AWS 5.x/6.x require Kibana ^9.2+; walking 9.0.0 per major missed them.""" + from detection_rules.integrations import load_integrations_manifests + + manifests = load_integrations_manifests() + result = find_compatible_version_range("aws", manifests) + self.assertIn("5.0.0", result.anchors) + self.assertIn("6.0.0", result.anchors) + self.assertNotIn("1.5.0", result.anchors) + for stack_version_str in get_stack_versions(): + stack_version = Version.parse(stack_version_str) + expected = _find_least_compatible_for_stack(stack_version, manifests["aws"]) + self.assertIsNotNone(expected) + self.assertIn(expected, result.anchors, stack_version_str) + + +class TestFindCompatibleVersionRangeSchemaAware(unittest.TestCase): + """Schema-aware data stream filtering ported from #6251 into OR-range export.""" def test_skips_versions_missing_integration(self): """Kibana-compatible versions whose schema lacks the integration are skipped for a later one.""" - # Mirrors a data stream added in a later package (e.g. azure aadgraphactivitylogs in 1.37.0): - # older packages still install on the stack but predate the data stream. manifests = { "pkg": { "1.0.0": _manifest("^8.12.0"), @@ -278,16 +417,20 @@ def test_skips_versions_missing_integration(self): } } with unittest.mock.patch("detection_rules.integrations.load_integrations_schemas", return_value=schemas): - # 1.0.0/1.5.0 are kibana-compatible but lack new_ds; 1.9.0 is the oldest that has it. - self.assertEqual(find_least_compatible_version("pkg", "new_ds", "8.12.0", manifests), "^1.9.0") - # An integration present in every version is unaffected and still resolves to the oldest. - self.assertEqual(find_least_compatible_version("pkg", "existing_ds", "8.12.0", manifests), "^1.0.0") + new_ds = find_compatible_version_range("pkg", manifests, integration="new_ds") + self.assertIn("1.9.0", new_ds.anchors) + self.assertNotIn("1.0.0", new_ds.anchors) + self.assertNotIn("1.5.0", new_ds.anchors) + + existing_ds = find_compatible_version_range("pkg", manifests, integration="existing_ds") + self.assertEqual(existing_ds.anchors, ("1.0.0",)) def test_no_schema_data_falls_back_to_kibana_only(self): """Versions without schema data are not filtered; kibana compatibility alone decides.""" manifests = {"pkg": {"1.0.0": _manifest("^8.12.0"), "1.5.0": _manifest("^8.12.0")}} with unittest.mock.patch("detection_rules.integrations.load_integrations_schemas", return_value={}): - self.assertEqual(find_least_compatible_version("pkg", "new_ds", "8.12.0", manifests), "^1.0.0") + result = find_compatible_version_range("pkg", manifests, integration="new_ds") + self.assertEqual(result.anchors, ("1.0.0",)) def test_all_compatible_versions_missing_integration_raises(self): """Raise when every kibana-compatible version's schema lacks the requested integration.""" @@ -297,4 +440,93 @@ def test_all_compatible_versions_missing_integration_raises(self): unittest.mock.patch("detection_rules.integrations.load_integrations_schemas", return_value=schemas), self.assertRaises(ValueError), ): - find_least_compatible_version("pkg", "new_ds", "8.12.0", manifests) + find_compatible_version_range("pkg", manifests, integration="new_ds") + + def test_schema_floor_excludes_legacy_zero_major(self): + """Schema-floor fallback must not retain 0.x anchors from the package baseline.""" + manifests = { + "pkg": { + "0.0.2": _manifest("^7.9.0"), + "1.0.0": _manifest("^8.0.0"), + "1.37.0": _manifest("^9.0.0"), + } + } + schemas = { + "pkg": { + "0.0.2": {"other_ds": {}}, + "1.0.0": {"other_ds": {}}, + "1.37.0": {"aadgraphactivitylogs": {}}, + } + } + with unittest.mock.patch("detection_rules.integrations.load_integrations_schemas", return_value=schemas): + result = find_compatible_version_range("pkg", manifests, integration="aadgraphactivitylogs") + self.assertEqual(result.anchors, ("1.37.0",)) + self.assertEqual(result.range, "^1.37.0 || ^2.0.0") + + def test_azure_aadgraphactivitylogs_schema_floor(self): + """aadgraphactivitylogs floor is azure 1.37.0 (bundled integration-schemas.json.gz).""" + from detection_rules.integrations import load_integrations_manifests, load_integrations_schemas + + schemas = load_integrations_schemas() + manifests = load_integrations_manifests() + result = find_compatible_version_range("azure", manifests, integration="aadgraphactivitylogs") + self.assertIn("1.37.0", result.anchors) + self.assertNotIn("1.0.0", result.anchors) + self.assertNotIn("0.0.2", result.anchors) + self.assertIn("^1.37.0", result.range) + self.assertEqual(result.range, "^1.37.0 || ^2.0.0") + floor_versions = [ + version + for version in sorted(schemas["azure"], key=Version.parse) + if "aadgraphactivitylogs" in schemas["azure"][version] + ] + self.assertEqual(floor_versions[0], "1.37.0") + + +class TestMetadataPackageRowDedupe(unittest.TestCase): + """Skip redundant metadata package rows when query datasets already cover the package.""" + + def test_metadata_package_row_needed_helper(self): + from detection_rules.rule import _metadata_package_row_needed + + self.assertFalse(_metadata_package_row_needed("azure", {"azure.signinlogs"})) + self.assertFalse(_metadata_package_row_needed("aws", {"aws.cloudtrail", "aws.billing"})) + self.assertFalse(_metadata_package_row_needed("endpoint", {"endpoint.events.api"})) + self.assertFalse(_metadata_package_row_needed("windows", {"windows.sysmon_operational"})) + self.assertTrue(_metadata_package_row_needed("azure", set())) + self.assertTrue(_metadata_package_row_needed("aws_bedrock", set())) + self.assertTrue(_metadata_package_row_needed("endpoint", set())) + + def test_non_dataset_package_skips_metadata_row_when_query_has_datasets(self): + from pathlib import Path + + from detection_rules.integrations import load_integrations_manifests + from detection_rules.rule import TOMLRuleContents + from detection_rules.rule_loader import RuleCollection + + manifests = load_integrations_manifests() + rule = RuleCollection().load_file(Path("rules/windows/persistence_sysmon_wmi_event_subscription.toml")) + packaged = TOMLRuleContents.get_packaged_integrations(rule.contents.data, rule.contents.metadata, manifests) + packages = [entry["package"] for entry in packaged] + self.assertEqual(packages.count("endpoint"), 1) + self.assertEqual(packages.count("windows"), 1) + + api = rule.contents.to_api_format() + endpoint_rows = [row for row in api["related_integrations"] if row["package"] == "endpoint"] + windows_rows = [row for row in api["related_integrations"] if row["package"] == "windows"] + self.assertEqual(len(endpoint_rows), 1) + self.assertEqual(len(windows_rows), 1) + self.assertEqual(endpoint_rows[0]["version"], "^8.7.0 || ^9.0.0 || ^10.0.0") + self.assertEqual(windows_rows[0]["version"], "^1.0.0 || ^3.0.0 || ^4.0.0") + + +class TestEsqlPackagedIntegrations(unittest.TestCase): + """ES|QL must not emit a redundant metadata package row when datasets cover the package.""" + + def test_metadata_package_row_needed_helper(self): + from detection_rules.rule import _esql_metadata_package_row_needed + + self.assertFalse(_esql_metadata_package_row_needed("azure", {"azure.signinlogs"})) + self.assertFalse(_esql_metadata_package_row_needed("aws", {"aws.cloudtrail", "aws.billing"})) + self.assertTrue(_esql_metadata_package_row_needed("azure", set())) + self.assertTrue(_esql_metadata_package_row_needed("aws_bedrock", set()))