From 04134c64db408dbceb398d76f5742f7a6968e237 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Tue, 26 May 2026 14:19:01 -0400 Subject: [PATCH 01/12] init java esql implementation --- detection_rules/esql_parser/__init__.py | 14 + detection_rules/esql_parser/validator.py | 297 ++++++++++++++++ lib/esql-validator/.gitignore | 1 + lib/esql-validator/README.md | 107 ++++++ lib/esql-validator/build.sh | 109 ++++++ .../esqlvalidator/AnalyzerFactory.java | 147 ++++++++ .../detectionrules/esqlvalidator/Main.java | 329 ++++++++++++++++++ .../esqlvalidator/MappingLoader.java | 184 ++++++++++ tests/test_esql_parser.py | 96 +++++ 9 files changed, 1284 insertions(+) create mode 100644 detection_rules/esql_parser/__init__.py create mode 100644 detection_rules/esql_parser/validator.py create mode 100644 lib/esql-validator/.gitignore create mode 100644 lib/esql-validator/README.md create mode 100755 lib/esql-validator/build.sh create mode 100644 lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java create mode 100644 lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java create mode 100644 lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/MappingLoader.java create mode 100644 tests/test_esql_parser.py diff --git a/detection_rules/esql_parser/__init__.py b/detection_rules/esql_parser/__init__.py new file mode 100644 index 00000000000..51228040b64 --- /dev/null +++ b/detection_rules/esql_parser/__init__.py @@ -0,0 +1,14 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. + +"""Python wrapper around the Elasticsearch Java ES|QL parser & verifier. + +Spawns the JVM-based daemon in ``lib/esql-validator`` and exchanges +line-delimited JSON over stdin/stdout to validate arbitrary ES|QL queries. +""" + +from .validator import EsqlValidator, ValidationError, ValidationResult + +__all__ = ("EsqlValidator", "ValidationError", "ValidationResult") diff --git a/detection_rules/esql_parser/validator.py b/detection_rules/esql_parser/validator.py new file mode 100644 index 00000000000..b09fbf79e4c --- /dev/null +++ b/detection_rules/esql_parser/validator.py @@ -0,0 +1,297 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. + +"""ES|QL validator: spawns the Java daemon and exchanges JSON over stdio.""" + +from __future__ import annotations + +import itertools +import json +import os +import subprocess +import sys +import threading +from dataclasses import dataclass, field +from pathlib import Path +from typing import IO, Any + +DEFAULT_REPO_ROOT = Path(__file__).resolve().parents[2] +DEFAULT_VALIDATOR_DIR = DEFAULT_REPO_ROOT / "lib" / "esql-validator" +DEFAULT_ES_HOME = Path("/tmp/elasticsearch") + + +class ValidationError(Exception): + """The daemon itself failed (couldn't start, bad request, crashed).""" + + +@dataclass +class _DiagnosticEntry: + """One line/column diagnostic in a validation result.""" + + type: str + message: str + line: int | None = None + column: int | None = None + + @classmethod + def from_json(cls, payload: dict[str, Any]) -> "_DiagnosticEntry": + return cls( + type=payload.get("type", "Unknown"), + message=payload.get("message", ""), + line=payload.get("line"), + column=payload.get("column"), + ) + + +@dataclass +class ValidationResult: + """Outcome of validating a single ES|QL query.""" + + status: str # 'ok' | 'parse_error' | 'verify_error' | 'request_error' + plan: str | None = None + errors: list[_DiagnosticEntry] = field(default_factory=list) + raw: dict[str, Any] = field(default_factory=dict) + + @property + def ok(self) -> bool: + return self.status == "ok" + + def raise_for_status(self) -> None: + if self.ok: + return + first = self.errors[0] if self.errors else None + msg = first.message if first else f"status={self.status}" + loc = f" at line {first.line}:{first.column}" if first and first.line else "" + raise ValidationError(f"ES|QL {self.status}{loc}: {msg}") + + +class EsqlValidator: + """Long-running JVM daemon that parses and verifies ES|QL queries. + + Usage: + with EsqlValidator() as v: + result = v.validate("FROM logs | LIMIT 5", + indices={"logs": {"properties": {"foo": {"type": "integer"}}}}) + if not result.ok: + ... + """ + + def __init__( + self, + validator_dir: Path | None = None, + es_home: Path | None = None, + java_bin: str = "java", + startup_timeout: float = 60.0, + request_timeout: float = 30.0, + build_if_missing: bool = True, + ) -> None: + self.validator_dir = Path(validator_dir or DEFAULT_VALIDATOR_DIR) + self.es_home = Path(es_home or os.environ.get("ES_HOME") or DEFAULT_ES_HOME) + self.java_bin = java_bin + self.startup_timeout = startup_timeout + self.request_timeout = request_timeout + self.build_if_missing = build_if_missing + + self._proc: subprocess.Popen[bytes] | None = None + self._lock = threading.Lock() + self._counter = itertools.count(1) + self._stderr_thread: threading.Thread | None = None + self._stderr_lines: list[str] = [] + + # --- public API --------------------------------------------------------- + + def __enter__(self) -> "EsqlValidator": + self.start() + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.stop() + + def start(self) -> None: + """Build (if needed) and launch the daemon, then wait for the ready handshake.""" + if self._proc is not None: + return + + jar = self.validator_dir / "build" / "esql-validator.jar" + classpath_file = self.validator_dir / "build" / "classpath.txt" + if not jar.exists() or not classpath_file.exists(): + if not self.build_if_missing: + raise ValidationError( + f"Validator JAR not found at {jar}; run lib/esql-validator/build.sh or " + f"pass build_if_missing=True." + ) + self._build() + + classpath = classpath_file.read_text().strip() + ":" + str(jar) + env = os.environ.copy() + env.setdefault("RUNTIME_JAVA_HOME", env.get("JAVA_HOME", "")) + + self._proc = subprocess.Popen( + [self.java_bin, "-cp", classpath, "co.elastic.detectionrules.esqlvalidator.Main"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + bufsize=0, + ) + assert self._proc.stdout is not None and self._proc.stdin is not None + self._stderr_thread = threading.Thread( + target=self._drain_stderr, args=(self._proc.stderr,), daemon=True + ) + self._stderr_thread.start() + + ready = self._read_line(self.startup_timeout) + try: + ready_payload = json.loads(ready) + except json.JSONDecodeError as e: + self._kill() + raise ValidationError(f"Daemon emitted non-JSON on startup: {ready!r}") from e + if ready_payload.get("status") != "ready": + self._kill() + raise ValidationError(f"Daemon startup handshake failed: {ready_payload}") + + def stop(self) -> None: + """Send a graceful shutdown and reap the JVM.""" + if self._proc is None: + return + try: + self._send({"id": "stop", "shutdown": True}) + except (BrokenPipeError, ValidationError): + pass + try: + self._proc.wait(timeout=5.0) + except subprocess.TimeoutExpired: + self._proc.kill() + finally: + self._proc = None + + def validate( + self, + query: str, + *, + indices: dict[str, dict[str, Any]] | None = None, + lookup_indices: dict[str, dict[str, Any]] | None = None, + enrich_policies: list[dict[str, Any]] | None = None, + params: list[Any] | None = None, + ) -> ValidationResult: + """Parse and verify an ES|QL query. + + Args: + query: the ES|QL query text. + indices: ``{index_pattern: es_mapping}`` for FROM targets. + ``es_mapping`` is the standard ES mapping JSON, e.g. + ``{"properties": {"foo": {"type": "integer"}}}``. + lookup_indices: same shape as ``indices``, for LOOKUP JOIN targets. + enrich_policies: list of ``{name, policy_type, match_field, index, mapping}``. + params: ES|QL positional query params (``?``). + """ + request_id = str(next(self._counter)) + payload: dict[str, Any] = {"id": request_id, "query": query} + if indices: + payload["indices"] = indices + if lookup_indices: + payload["lookup_indices"] = lookup_indices + if enrich_policies: + payload["enrich_policies"] = enrich_policies + if params: + payload["params"] = params + + response = self._roundtrip(payload) + if response.get("id") != request_id: + raise ValidationError( + f"Out-of-order response: expected id={request_id}, got {response.get('id')}" + ) + return ValidationResult( + status=response.get("status", "unknown"), + plan=response.get("plan"), + errors=[_DiagnosticEntry.from_json(e) for e in response.get("errors", [])], + raw=response, + ) + + # --- internals ---------------------------------------------------------- + + def _build(self) -> None: + script = self.validator_dir / "build.sh" + if not script.exists(): + raise ValidationError(f"Build script missing: {script}") + env = os.environ.copy() + env["ES_HOME"] = str(self.es_home) + env.setdefault("RUNTIME_JAVA_HOME", env.get("JAVA_HOME", "")) + print( + f"[esql-validator] Building daemon JAR (ES_HOME={self.es_home}). " + f"First run may take several minutes...", + file=sys.stderr, + ) + result = subprocess.run( + ["bash", str(script)], + cwd=str(self.validator_dir), + env=env, + check=False, + ) + if result.returncode != 0: + raise ValidationError( + f"Build failed (exit {result.returncode}). See {self.validator_dir}/build/gradle-classpath.err" + ) + + def _roundtrip(self, payload: dict[str, Any]) -> dict[str, Any]: + with self._lock: + self._send(payload) + line = self._read_line(self.request_timeout) + try: + return json.loads(line) + except json.JSONDecodeError as e: + raise ValidationError(f"Daemon emitted non-JSON: {line!r}") from e + + def _send(self, payload: dict[str, Any]) -> None: + if self._proc is None or self._proc.stdin is None: + raise ValidationError("Daemon not started") + if self._proc.poll() is not None: + tail = "\n".join(self._stderr_lines[-20:]) + raise ValidationError(f"Daemon died (exit {self._proc.returncode}). stderr:\n{tail}") + data = (json.dumps(payload) + "\n").encode("utf-8") + self._proc.stdin.write(data) + self._proc.stdin.flush() + + def _read_line(self, timeout: float) -> str: + if self._proc is None or self._proc.stdout is None: + raise ValidationError("Daemon not started") + # subprocess.Popen.stdout is a binary stream; we need to read a line with a timeout. + # Use a thread to enable a timeout (Python doesn't give us select() on pipes on Windows). + result: list[bytes | BaseException] = [] + + def _read() -> None: + try: + assert self._proc is not None and self._proc.stdout is not None + line = self._proc.stdout.readline() + result.append(line) + except BaseException as exc: # noqa: BLE001 + result.append(exc) + + t = threading.Thread(target=_read, daemon=True) + t.start() + t.join(timeout) + if t.is_alive(): + self._kill() + raise ValidationError(f"Daemon read timed out after {timeout}s") + if not result: + raise ValidationError("Daemon EOF without response") + item = result[0] + if isinstance(item, BaseException): + raise ValidationError(f"Daemon read failed: {item}") from item + if not item: + tail = "\n".join(self._stderr_lines[-20:]) + raise ValidationError(f"Daemon closed stdout. stderr:\n{tail}") + return item.decode("utf-8").rstrip("\n") + + def _drain_stderr(self, stream: IO[bytes]) -> None: + for raw in iter(stream.readline, b""): + self._stderr_lines.append(raw.decode("utf-8", errors="replace").rstrip("\n")) + # Bound memory; keep the most recent. + if len(self._stderr_lines) > 200: + del self._stderr_lines[: len(self._stderr_lines) - 200] + + def _kill(self) -> None: + if self._proc is not None and self._proc.poll() is None: + self._proc.kill() diff --git a/lib/esql-validator/.gitignore b/lib/esql-validator/.gitignore new file mode 100644 index 00000000000..84c048a73cc --- /dev/null +++ b/lib/esql-validator/.gitignore @@ -0,0 +1 @@ +/build/ diff --git a/lib/esql-validator/README.md b/lib/esql-validator/README.md new file mode 100644 index 00000000000..0a4e9dd3938 --- /dev/null +++ b/lib/esql-validator/README.md @@ -0,0 +1,107 @@ +# esql-validator + +A long-running JVM daemon that exposes Elasticsearch's own ES|QL parser and +verifier over a tiny line-delimited JSON protocol on stdin/stdout. It is the +Java half of the `detection_rules.esql_parser` Python module — the Python +side spawns it as a subprocess and validates rule queries against it without +needing an Elasticsearch cluster. + +Use it when you want the same `ParsingException` / `VerificationException` +errors a real cluster would raise (line and column included), but you don't +want to stand up Elasticsearch or Kibana just to syntax-check a query. + +## What it does + +For each request, the daemon: + +1. **Parses** the query with [`EsqlParser`](https://github.com/elastic/elasticsearch/blob/main/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/EsqlParser.java). +2. **Verifies** the resulting `LogicalPlan` with the same `Analyzer` + + `Verifier` used by [`VerifierTests`](https://github.com/elastic/elasticsearch/blob/main/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java), + wired up with the index mappings, lookup mappings, and enrich policies + supplied in the request. + +Returns either the analyzed plan as text, or structured `parse_error` / +`verify_error` diagnostics with line and column numbers. + +## Requirements + +- JDK 21 on `PATH` (matches what Elasticsearch is built against). +- A local checkout of [`elastic/elasticsearch`](https://github.com/elastic/elasticsearch). + Default location is `/tmp/elasticsearch`; override with `ES_HOME=…`. + +## Build + +```sh +ES_HOME=/path/to/elasticsearch ./build.sh +``` + +This: + +1. Invokes Elasticsearch's own gradle to compile `:x-pack:plugin:esql` and + resolve its full compile/runtime classpath. +2. Writes `build/classpath.txt` (colon-separated jar paths). +3. Compiles the daemon sources against that classpath. +4. Packages them as `build/esql-validator.jar` (manifest sets the main class). + +First build is slow because gradle has to compile the ES plugins; subsequent +builds reuse the gradle build cache and finish in seconds. + +## Run manually + +```sh +java -cp "$(cat build/classpath.txt):build/esql-validator.jar" \ + co.elastic.detectionrules.esqlvalidator.Main +``` + +Then send one JSON request per line on stdin: + +```json +{"id":"1","query":"FROM logs | WHERE foo == 1","indices":{"logs":{"properties":{"foo":{"type":"integer"}}}}} +``` + +You'll get one response per line on stdout. See `Main.java` for the request +and response shape. + +## Wire protocol + +**Request** + +| Field | Type | Notes | +|-------------------|-----------------------------------|----------------------------------------------------------------------------------------| +| `id` | string | echoed back in the response | +| `query` | string | the ES\|QL query | +| `indices` | `{pattern: es_mapping}` | mappings for `FROM` targets, e.g. `{"logs": {"properties": {"foo": {"type": "long"}}}}` | +| `lookup_indices` | `{name: es_mapping}` | mappings for `LOOKUP JOIN` targets (loaded in `LOOKUP` index mode) | +| `enrich_policies` | list of policy descriptors | `{name, policy_type, match_field, index, mapping}` | +| `params` | list | values for positional `?` parameters | +| `shutdown` | boolean | if true, daemon exits after responding | +| `ping` | boolean | if true, daemon responds with `{"status":"pong"}` | + +**Response** + +| `status` | Other fields | +|------------------|-------------------------------------------------------------| +| `ok` | `plan` — analyzed logical plan as text | +| `parse_error` | `errors[]` — `{type, message, line, column}` | +| `verify_error` | `errors[]` — one entry per Verifier diagnostic | +| `request_error` | `message` — malformed JSON or missing required field | +| `internal_error` | `message` — uncaught exception while serializing a response | + +## Python interface + +```python +from detection_rules.esql_parser import EsqlValidator + +with EsqlValidator() as v: + result = v.validate( + "FROM logs | WHERE foo == 1 | LIMIT 5", + indices={"logs": {"properties": {"foo": {"type": "integer"}}}}, + ) + if not result.ok: + for err in result.errors: + print(f"{err.type} at {err.line}:{err.column}: {err.message}") +``` + +The Python class spawns the daemon once and reuses it across calls, so JVM +startup cost is paid only on the first `validate(...)`. If the JAR is missing +and `build_if_missing=True` (the default), it'll invoke `build.sh` for you. diff --git a/lib/esql-validator/build.sh b/lib/esql-validator/build.sh new file mode 100755 index 00000000000..bea2fd1bce9 --- /dev/null +++ b/lib/esql-validator/build.sh @@ -0,0 +1,109 @@ +#!/usr/bin/env bash +# Build the esql-validator daemon JAR by reusing the elasticsearch x-pack-esql +# compiled classpath. Requires: +# - An elasticsearch checkout at $ES_HOME (default: /tmp/elasticsearch). +# - A JDK 21+ on $PATH (and $RUNTIME_JAVA_HOME exported, per ES convention). +# +# Output (paths relative to this script): +# build/esql-validator.jar — our compiled daemon, manifest Main-Class set. +# build/classpath.txt — colon-separated classpath of all ES jars needed +# to run the daemon. Pass with `java -cp`. +# +# The whole build is cached: re-running with the same ES checkout and unchanged +# sources is fast (gradle hits its build cache, javac re-runs only if .java +# files changed). + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ES_HOME="${ES_HOME:-/tmp/elasticsearch}" +BUILD_DIR="$SCRIPT_DIR/build" + +if [[ ! -x "$ES_HOME/gradlew" ]]; then + echo "error: ES_HOME=$ES_HOME does not contain a gradlew script." >&2 + exit 1 +fi +if ! command -v javac >/dev/null 2>&1; then + echo "error: javac not found on PATH. Install JDK 21 and re-run." >&2 + exit 1 +fi + +mkdir -p "$BUILD_DIR" + +# Init script: registers a one-shot task `drPrintRuntimeClasspath` on +# :x-pack:plugin:esql that prints the full runtime classpath in a +# greppable form. +INIT_SCRIPT="$BUILD_DIR/print-classpath.init.gradle" +cat > "$INIT_SCRIPT" <<'GRADLE' +allprojects { + afterEvaluate { proj -> + if (proj.path == ':x-pack:plugin:esql') { + proj.tasks.register('drPrintRuntimeClasspath') { + // We use compileClasspath rather than runtimeClasspath here because + // xpack-core, lang-painless and friends are declared `compileOnly` in + // ES (they're loaded as plugins at runtime). For a standalone JVM we + // need them on the classpath ourselves. + dependsOn proj.tasks.named('jar') + dependsOn ':x-pack:plugin:core:jar' + dependsOn ':modules:lang-painless:jar' + dependsOn ':x-pack:plugin:ml:jar' + doLast { + def files = [] as Set + files.addAll(proj.configurations.compileClasspath.files) + files.addAll(proj.configurations.runtimeClasspath.files) + files.add(proj.tasks.named('jar').get().archiveFile.get().asFile) + println 'DR_CLASSPATH_BEGIN' + files.collect { it.absolutePath }.toSet().sort().each { println 'DR_CP::' + it } + println 'DR_CLASSPATH_END' + } + } + } + } +} +GRADLE + +echo ">> Building x-pack-esql plugin in $ES_HOME (first build can be slow)…" >&2 +( + cd "$ES_HOME" + ./gradlew --console=plain --warning-mode=none -q \ + :x-pack:plugin:esql:jar \ + :x-pack:plugin:esql:drPrintRuntimeClasspath \ + --init-script "$INIT_SCRIPT" +) > "$BUILD_DIR/gradle-classpath.out" 2> "$BUILD_DIR/gradle-classpath.err" || { + echo "error: gradle invocation failed. See:" >&2 + echo " $BUILD_DIR/gradle-classpath.err" >&2 + tail -n 40 "$BUILD_DIR/gradle-classpath.err" >&2 || true + exit 1 +} + +# Extract jar paths between the markers and join with ':'. +CP=$(awk '/^DR_CLASSPATH_BEGIN$/{flag=1; next} /^DR_CLASSPATH_END$/{flag=0} flag && /^DR_CP::/{sub("^DR_CP::",""); print}' \ + "$BUILD_DIR/gradle-classpath.out" | paste -sd: -) +if [[ -z "$CP" ]]; then + echo "error: no classpath captured from gradle output. See $BUILD_DIR/gradle-classpath.out" >&2 + exit 1 +fi + +echo "$CP" > "$BUILD_DIR/classpath.txt" +echo ">> Resolved $(echo "$CP" | tr ':' '\n' | wc -l) classpath entries → build/classpath.txt" >&2 + +# Compile daemon sources. +CLASSES_DIR="$BUILD_DIR/classes" +rm -rf "$CLASSES_DIR" +mkdir -p "$CLASSES_DIR" + +echo ">> Compiling daemon sources…" >&2 +find "$SCRIPT_DIR/src/main/java" -name "*.java" > "$BUILD_DIR/sources.txt" +javac --release 21 -cp "$CP" -d "$CLASSES_DIR" @"$BUILD_DIR/sources.txt" + +# Build the jar. +MANIFEST="$BUILD_DIR/manifest.mf" +{ + echo "Manifest-Version: 1.0" + echo "Main-Class: co.elastic.detectionrules.esqlvalidator.Main" +} > "$MANIFEST" + +JAR_FILE="$BUILD_DIR/esql-validator.jar" +(cd "$CLASSES_DIR" && jar cfm "$JAR_FILE" "$MANIFEST" .) +echo ">> Built $JAR_FILE" >&2 +echo ">> Run with: java -cp \"\$(cat $BUILD_DIR/classpath.txt):$JAR_FILE\" co.elastic.detectionrules.esqlvalidator.Main" >&2 diff --git a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java new file mode 100644 index 00000000000..94d09729627 --- /dev/null +++ b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package co.elastic.detectionrules.esqlvalidator; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.AnalyzerContext; +import org.elasticsearch.xpack.esql.analysis.EnrichResolution; +import org.elasticsearch.xpack.esql.analysis.UnmappedResolution; +import org.elasticsearch.xpack.esql.analysis.Verifier; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; +import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; +import org.elasticsearch.xpack.esql.expression.promql.function.PromqlFunctionRegistry; +import org.elasticsearch.xpack.esql.index.EsIndex; +import org.elasticsearch.xpack.esql.index.IndexResolution; +import org.elasticsearch.xpack.esql.inference.InferenceResolution; +import org.elasticsearch.xpack.esql.plan.IndexPattern; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.telemetry.Metrics; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.plan.QuerySettings; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * Assembles an {@link Analyzer} from a request payload's resolutions + * (index mappings, lookup mappings, enrich policies, etc.). + * + *

The function registry, license state, and verifier are created once at + * construction and reused across requests. Per-request resolutions are passed + * into {@link #build(ResolutionInputs)}. + */ +final class AnalyzerFactory { + + private final EsqlFunctionRegistry functionRegistry = new EsqlFunctionRegistry(); + private final PromqlFunctionRegistry promqlFunctionRegistry = new PromqlFunctionRegistry(); + private final XPackLicenseState licenseState = new XPackLicenseState(() -> System.currentTimeMillis()); + private final Verifier verifier = new Verifier( + new Metrics(functionRegistry, /*isSnapshot*/ true, /*isServerless*/ true), + licenseState + ); + + EsqlFunctionRegistry functionRegistry() { + return functionRegistry; + } + + record ResolutionInputs( + Map> indices, + Map> lookupIndices, + List enrichPolicies + ) {} + + record EnrichPolicyInput( + String name, + String policyType, + String matchField, + String index, + Map mapping + ) {} + + Analyzer build(ResolutionInputs inputs) { + Map indexResolutions = new HashMap<>(); + if (inputs.indices() != null) { + for (Map.Entry> e : inputs.indices().entrySet()) { + indexResolutions.put(new IndexPattern(Source.EMPTY, e.getKey()), indexResolution(e.getKey(), e.getValue(), IndexMode.STANDARD)); + } + } + + Map lookupResolutions = new HashMap<>(); + if (inputs.lookupIndices() != null) { + for (Map.Entry> e : inputs.lookupIndices().entrySet()) { + lookupResolutions.put(e.getKey(), indexResolution(e.getKey(), e.getValue(), IndexMode.LOOKUP)); + } + } + + EnrichResolution enrichResolution = new EnrichResolution(); + if (inputs.enrichPolicies() != null) { + for (EnrichPolicyInput p : inputs.enrichPolicies()) { + IndexResolution ir = indexResolution(p.index(), p.mapping(), IndexMode.STANDARD); + List enrichFields = new ArrayList<>(ir.get().mapping().keySet()); + enrichFields.remove(p.matchField()); + enrichResolution.addResolvedPolicy( + p.name(), + Enrich.Mode.ANY, + new ResolvedEnrichPolicy(p.matchField(), p.policyType(), enrichFields, Map.of("", p.index()), ir.get().mapping()) + ); + } + } + + Configuration cfg = buildConfiguration(); + AnalyzerContext ctx = new AnalyzerContext( + cfg, + functionRegistry, + promqlFunctionRegistry, + indexResolutions, + lookupResolutions, + enrichResolution, + InferenceResolution.builder().build(), + TransportVersion.current(), + QuerySettings.UNMAPPED_FIELDS.defaultValue() + ); + return new Analyzer(ctx, verifier); + } + + private static IndexResolution indexResolution(String name, Map mapping, IndexMode mode) { + Map fields = MappingLoader.fromProperties(mapping); + return IndexResolution.valid(new EsIndex(name, fields, Map.of(name, mode), Map.of(), Map.of())); + } + + private static Configuration buildConfiguration() { + return new Configuration( + java.time.ZoneOffset.UTC, + Instant.now(), + Locale.US, + null, + null, + new QueryPragmas(Settings.EMPTY), + 1000, + 1000, + null, + false, + Map.of(), + System.nanoTime(), + false, + 1000, + 1000, + null, + null, + Map.of() + ); + } +} diff --git a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java new file mode 100644 index 00000000000..591d50fe582 --- /dev/null +++ b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java @@ -0,0 +1,329 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package co.elastic.detectionrules.esqlvalidator; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.VerificationException; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.inference.InferenceSettings; +import org.elasticsearch.xpack.esql.parser.EsqlConfig; +import org.elasticsearch.xpack.esql.parser.EsqlParser; +import org.elasticsearch.xpack.esql.parser.ParsingException; +import org.elasticsearch.xpack.esql.parser.QueryParam; +import org.elasticsearch.xpack.esql.parser.QueryParams; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.core.type.DataType; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Long-running daemon that validates ES|QL queries via line-delimited JSON over + * stdin/stdout. One request per stdin line, one response per stdout line. + * + *

Request shape: + *

{@code
+ *   {"id": "1",
+ *    "query": "FROM logs | WHERE foo == 1",
+ *    "indices": {"logs": {"properties": {"foo": {"type": "integer"}}}},
+ *    "lookup_indices": {...},
+ *    "enrich_policies": [{"name": "p1", "policy_type": "match", "match_field": "ip",
+ *                         "index": "idx", "mapping": {...}}],
+ *    "params": [1, "x"]}
+ * }
+ * + *

Response shape on success: + *

{@code {"id": "1", "status": "ok", "plan": "..."}}
+ * + *

Response shape on failure: + *

{@code {"id": "1", "status": "parse_error", "errors": [{"message": "...", "line": 1, "column": 5}]}}
+ * + *

To shut down: send {@code {"shutdown": true}} or close stdin. + */ +public final class Main { + + public static void main(String[] args) throws Exception { + // Replace stdout so we can keep it for protocol output. Anything written by + // ES internals or our own logging must go to stderr. + PrintStream protocolOut = System.out; + System.setOut(System.err); + + // ES code paths require the logging SPI to be wired up before any + // class with a static `LogManager.getLogger(...)` field is loaded. + org.elasticsearch.common.logging.LogConfigurator.configureESLogging(); + + AnalyzerFactory analyzerFactory = new AnalyzerFactory(); + EsqlParser parser = new EsqlParser(new EsqlConfig(analyzerFactory.functionRegistry())); + InferenceSettings inference = new InferenceSettings(Settings.EMPTY); + + // Handshake — Python wrapper waits for this before sending requests. + protocolOut.println("{\"status\":\"ready\"}"); + protocolOut.flush(); + + try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8))) { + String line; + while ((line = in.readLine()) != null) { + if (line.isBlank()) { + continue; + } + String response = handle(line, parser, inference, analyzerFactory); + protocolOut.println(response); + protocolOut.flush(); + if (response.contains("\"status\":\"bye\"")) { + return; + } + } + } + } + + private static String handle(String line, EsqlParser parser, InferenceSettings inference, AnalyzerFactory analyzerFactory) { + Request req; + try { + req = Request.parse(line); + } catch (Exception e) { + return write(b -> { + b.field("status", "request_error"); + b.field("message", e.getClass().getSimpleName() + ": " + e.getMessage()); + }); + } + + if (req.shutdown) { + return write(b -> b.field("id", req.id).field("status", "bye")); + } + if (req.ping) { + return write(b -> b.field("id", req.id).field("status", "pong")); + } + if (req.query == null) { + return write(b -> b.field("id", req.id).field("status", "request_error") + .field("message", "missing 'query' field")); + } + + // Stage 1: parse. + LogicalPlan parsed; + try { + parsed = parser.parseQuery(req.query, buildParams(req.params), inference); + } catch (ParsingException pe) { + return errorResponse(req.id, "parse_error", "ParsingException", + pe.getErrorMessage(), pe.getLineNumber(), pe.getColumnNumber()); + } catch (Exception e) { + return errorResponse(req.id, "parse_error", e.getClass().getSimpleName(), + e.getMessage(), -1, -1); + } + + // Stage 2: verify (analyze). + try { + Analyzer analyzer = analyzerFactory.build(new AnalyzerFactory.ResolutionInputs( + req.indices, req.lookupIndices, req.enrichPolicies)); + LogicalPlan analyzed = analyzer.analyze(parsed); + String planText = analyzed.toString(); + return write(b -> { + b.field("id", req.id); + b.field("status", "ok"); + b.field("plan", planText); + }); + } catch (VerificationException ve) { + return verifyErrorResponse(req.id, ve.getMessage()); + } catch (ParsingException pe) { + return errorResponse(req.id, "verify_error", "ParsingException", + pe.getErrorMessage(), pe.getLineNumber(), pe.getColumnNumber()); + } catch (Exception e) { + return errorResponse(req.id, "verify_error", e.getClass().getSimpleName(), + e.getMessage() == null ? e.toString() : e.getMessage(), -1, -1); + } + } + + private static QueryParams buildParams(List params) { + if (params == null || params.isEmpty()) { + return new QueryParams(); + } + List out = new ArrayList<>(params.size()); + for (Object v : params) { + DataType type = DataType.fromJava(v); + if (type == null) { + type = DataType.KEYWORD; + } + out.add(new QueryParam(null, v, type, org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.VALUE)); + } + return new QueryParams(out); + } + + private static String errorResponse(String id, String status, String type, String message, int line, int column) { + return write(b -> { + b.field("id", id); + b.field("status", status); + b.startArray("errors"); + b.startObject(); + b.field("type", type); + b.field("message", message); + if (line >= 0) { + b.field("line", line); + } + if (column >= 0) { + b.field("column", column); + } + b.endObject(); + b.endArray(); + }); + } + + /** Split a VerificationException's multi-error message into one error entry per line. */ + private static String verifyErrorResponse(String id, String message) { + return write(b -> { + b.field("id", id); + b.field("status", "verify_error"); + b.startArray("errors"); + for (String entry : splitVerifyMessage(message)) { + b.startObject(); + b.field("type", "VerificationException"); + // Try to peel "line L:C: ..." into structured fields. + int line = -1, col = -1; + String text = entry; + if (entry.startsWith("line ")) { + int colon1 = entry.indexOf(':', 5); + int colon2 = colon1 > 0 ? entry.indexOf(':', colon1 + 1) : -1; + if (colon1 > 0 && colon2 > 0) { + try { + line = Integer.parseInt(entry.substring(5, colon1)); + col = Integer.parseInt(entry.substring(colon1 + 1, colon2)); + text = entry.substring(colon2 + 1).trim(); + } catch (NumberFormatException ignore) {} + } + } + b.field("message", text); + if (line >= 0) { + b.field("line", line); + } + if (col >= 0) { + b.field("column", col); + } + b.endObject(); + } + b.endArray(); + }); + } + + private static List splitVerifyMessage(String message) { + // Verifier messages look like: "Found N problem(s)\nline 1:5: foo\nline 1:10: bar" + // We split on every "\nline " (or leading "line ") and trim. + List out = new ArrayList<>(); + int idx = message.indexOf("\nline "); + if (idx < 0) { + out.add(message.startsWith("line ") ? message : message); + return out; + } + // Drop the "Found N problem(s)" preamble. + String rest = message.substring(idx + 1); + for (String part : rest.split("\\nline ")) { + String trimmed = part.startsWith("line ") ? part.substring(5).trim() : part.trim(); + // Re-add "line " prefix if it isn't there, so downstream parser can find it. + out.add(trimmed.matches("^\\d+:\\d+:.*") ? "line " + trimmed : trimmed); + } + return out; + } + + @FunctionalInterface + private interface Writer { + void write(XContentBuilder b) throws Exception; + } + + private static String write(Writer w) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (XContentBuilder b = new XContentBuilder(JsonXContent.jsonXContent, baos)) { + b.startObject(); + w.write(b); + b.endObject(); + } + return baos.toString(StandardCharsets.UTF_8); + } catch (Exception e) { + // Fall back to a hand-rolled error response we know is valid. + return "{\"status\":\"internal_error\",\"message\":\"" + + e.getClass().getSimpleName() + ": " + (e.getMessage() == null ? "" : e.getMessage().replace("\"", "'")) + + "\"}"; + } + } + + /** Parsed request fields. */ + static final class Request { + String id; + String query; + Map> indices; + Map> lookupIndices; + List enrichPolicies; + List params; + boolean shutdown; + boolean ping; + + @SuppressWarnings("unchecked") + static Request parse(String line) throws Exception { + byte[] bytes = line.getBytes(StandardCharsets.UTF_8); + try (XContentParser p = JsonXContent.jsonXContent.createParser( + XContentParserConfiguration.EMPTY.withRegistry(NamedXContentRegistry.EMPTY), + new BytesArray(bytes).streamInput())) { + Map raw = p.map(); + Request r = new Request(); + Object idObj = raw.get("id"); + r.id = idObj == null ? null : String.valueOf(idObj); + r.query = (String) raw.get("query"); + Object shutdownObj = raw.get("shutdown"); + r.shutdown = shutdownObj instanceof Boolean && (Boolean) shutdownObj; + Object pingObj = raw.get("ping"); + r.ping = pingObj instanceof Boolean && (Boolean) pingObj; + r.indices = castStringMapOfMap(raw.get("indices")); + r.lookupIndices = castStringMapOfMap(raw.get("lookup_indices")); + Object enrich = raw.get("enrich_policies"); + if (enrich instanceof List list) { + r.enrichPolicies = new ArrayList<>(); + for (Object o : list) { + if (o instanceof Map m) { + Map em = (Map) m; + r.enrichPolicies.add(new AnalyzerFactory.EnrichPolicyInput( + (String) em.get("name"), + (String) em.getOrDefault("policy_type", "match"), + (String) em.get("match_field"), + (String) em.get("index"), + em.get("mapping") instanceof Map mm ? (Map) mm : Map.of() + )); + } + } + } + Object params = raw.get("params"); + if (params instanceof List pl) { + r.params = new ArrayList<>(pl); + } + return r; + } + } + + @SuppressWarnings("unchecked") + private static Map> castStringMapOfMap(Object o) { + if (o instanceof Map m) { + Map> out = new LinkedHashMap<>(); + for (Map.Entry e : m.entrySet()) { + if (e.getValue() instanceof Map v) { + out.put(String.valueOf(e.getKey()), (Map) v); + } + } + return out; + } + return null; + } + } +} diff --git a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/MappingLoader.java b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/MappingLoader.java new file mode 100644 index 00000000000..ecec0f8b0c8 --- /dev/null +++ b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/MappingLoader.java @@ -0,0 +1,184 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package co.elastic.detectionrules.esqlvalidator; + +import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.DateEsField; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.core.type.KeywordEsField; +import org.elasticsearch.xpack.esql.core.type.TextEsField; +import org.elasticsearch.xpack.esql.core.type.UnsupportedEsField; +import org.elasticsearch.xpack.esql.type.EsqlDataTypeRegistry; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME; +import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD; +import static org.elasticsearch.xpack.esql.core.type.DataType.OBJECT; +import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT; +import static org.elasticsearch.xpack.esql.core.type.DataType.UNSUPPORTED; + +/** + * Converts a parsed ES index mapping (in standard `{"properties": {...}}` form) + * into the internal {@link EsField} map used by the ES|QL analyzer. + * + *

This is a copy of the logic in {@code LoadMapping} from the ES test + * fixtures, but operates on a pre-parsed {@code Map} so we don't + * have to depend on the test-fixtures source set (which pulls in JUnit). + */ +final class MappingLoader { + + private MappingLoader() {} + + @SuppressWarnings("unchecked") + static Map fromProperties(Map mapping) { + if (mapping == null || mapping.isEmpty()) { + return emptyMap(); + } + Object props = mapping.get("properties"); + if (props == null && mapping.values().stream().anyMatch(v -> v instanceof Map && ((Map) v).containsKey("type"))) { + // Allow the caller to omit the wrapper "properties" key. + props = mapping; + } + if (props instanceof Map raw) { + return startWalking((Map) raw); + } + return emptyMap(); + } + + private static Map startWalking(Map mapping) { + Map types = new LinkedHashMap<>(); + if (mapping == null) { + return emptyMap(); + } + for (Map.Entry entry : mapping.entrySet()) { + walk(entry.getKey(), entry.getValue(), types); + } + return types; + } + + @SuppressWarnings("unchecked") + private static void walk(String name, Object value, Map out) { + if ((value instanceof Map) == false) { + throw new IllegalArgumentException("Unrecognized mapping for [" + name + "]: " + value); + } + Map content = (Map) value; + + if ("nested".equals(content.get("type"))) { + // IndexResolver strips nested fields entirely; mirror that. + return; + } + + DataType dataType = resolveType(content); + + final Map properties; + if (dataType == OBJECT) { + properties = fromProperties(content); + } else if (content.containsKey("fields")) { + Object fields = content.get("fields"); + properties = (fields instanceof Map) ? startWalking((Map) fields) : Collections.emptyMap(); + } else { + properties = fromProperties(content); + } + + boolean docValues = boolSetting(content.get("doc_values"), dataType.hasDocValues()); + boolean isDimension = boolSetting(content.get("time_series_dimension"), false); + boolean isMetric = content.containsKey("time_series_metric"); + if (isDimension && isMetric) { + throw new IllegalStateException("Field [" + name + "] is both dimension and metric"); + } + EsField.TimeSeriesFieldType tsType = EsField.TimeSeriesFieldType.NONE; + if (isDimension) { + tsType = EsField.TimeSeriesFieldType.DIMENSION; + } + if (isMetric) { + tsType = EsField.TimeSeriesFieldType.METRIC; + } + + final EsField field; + if (dataType == TEXT) { + field = new TextEsField(name, properties, docValues, false, tsType); + } else if (dataType == KEYWORD) { + int length = intSetting(content.get("ignore_above"), Short.MAX_VALUE); + boolean normalized = content.get("normalizer") != null + && content.get("normalizer").toString().isBlank() == false; + field = new KeywordEsField(name, properties, docValues, length, normalized, false, tsType); + } else if (dataType == DATETIME) { + field = DateEsField.dateEsField(name, properties, docValues, tsType); + } else if (dataType == UNSUPPORTED) { + String type = String.valueOf(content.get("type")); + field = new UnsupportedEsField(name, List.of(type), null, properties); + propagateUnsupported(name, type, properties); + } else { + field = new EsField(name, dataType, properties, docValues, tsType); + } + out.put(name, field); + } + + private static DataType resolveType(Map content) { + if (content.containsKey("type")) { + String typeName = content.get("type").toString(); + if ("constant_keyword".equals(typeName) || "wildcard".equals(typeName)) { + return KEYWORD; + } + Object metricsTypeParameter = content.get(TimeSeriesParams.TIME_SERIES_METRIC_PARAM); + TimeSeriesParams.MetricType metricType = null; + if (metricsTypeParameter instanceof String s) { + metricType = TimeSeriesParams.MetricType.fromString(s); + } else if (metricsTypeParameter != null) { + metricType = (TimeSeriesParams.MetricType) metricsTypeParameter; + } + try { + return EsqlDataTypeRegistry.INSTANCE.fromEs(typeName, metricType); + } catch (IllegalArgumentException ignore) { + return UNSUPPORTED; + } + } + if (content.containsKey("properties")) { + return OBJECT; + } + return UNSUPPORTED; + } + + private static boolean boolSetting(Object value, boolean defaultValue) { + if (value == null) { + return defaultValue; + } + String s = value.toString().toLowerCase(); + return switch (s) { + case "true", "1", "yes", "on" -> true; + case "false", "0", "no", "off" -> false; + default -> defaultValue; + }; + } + + private static int intSetting(Object value, int defaultValue) { + return value == null ? defaultValue : Integer.parseInt(value.toString()); + } + + private static void propagateUnsupported(String inherited, String originalType, Map properties) { + if (properties == null || properties.isEmpty()) { + return; + } + for (Map.Entry entry : properties.entrySet()) { + EsField field = entry.getValue(); + UnsupportedEsField u; + if (field instanceof UnsupportedEsField unsupported) { + u = new UnsupportedEsField(unsupported.getName(), List.of(originalType), inherited, unsupported.getProperties()); + } else { + u = new UnsupportedEsField(field.getName(), List.of(originalType), inherited, field.getProperties()); + } + entry.setValue(u); + propagateUnsupported(inherited, originalType, u.getProperties()); + } + } +} diff --git a/tests/test_esql_parser.py b/tests/test_esql_parser.py new file mode 100644 index 00000000000..6b336b66848 --- /dev/null +++ b/tests/test_esql_parser.py @@ -0,0 +1,96 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. + +"""Tests for the Java-backed ES|QL validator wrapper.""" + +import os +import unittest +from pathlib import Path + +from detection_rules.esql_parser import EsqlValidator, ValidationError + +VALIDATOR_DIR = Path(__file__).resolve().parents[1] / "lib" / "esql-validator" +JAR_PATH = VALIDATOR_DIR / "build" / "esql-validator.jar" +CLASSPATH_FILE = VALIDATOR_DIR / "build" / "classpath.txt" + +# The Java daemon is heavy to build (requires an Elasticsearch checkout). We skip +# this test suite unless the JAR is already present, or DR_ESQL_BUILD=1 is set. +SHOULD_RUN = (JAR_PATH.exists() and CLASSPATH_FILE.exists()) or os.environ.get("DR_ESQL_BUILD") == "1" + + +@unittest.skipUnless( + SHOULD_RUN, + "esql-validator JAR not built. Run lib/esql-validator/build.sh or set DR_ESQL_BUILD=1.", +) +class TestEsqlValidator(unittest.TestCase): + """End-to-end checks of the JVM daemon wrapper.""" + + @classmethod + def setUpClass(cls) -> None: + cls.validator = EsqlValidator(build_if_missing=os.environ.get("DR_ESQL_BUILD") == "1") + cls.validator.start() + + @classmethod + def tearDownClass(cls) -> None: + cls.validator.stop() + + def test_ping(self) -> None: + # Smoke test of the JSON round-trip path. + result = self.validator.validate("FROM idx", indices={"idx": {"properties": {"a": {"type": "long"}}}}) + self.assertEqual(result.status, "ok") + + def test_valid_query_returns_plan(self) -> None: + result = self.validator.validate( + "FROM logs | WHERE foo == 1 | LIMIT 5", + indices={"logs": {"properties": {"foo": {"type": "integer"}}}}, + ) + self.assertTrue(result.ok, msg=result.raw) + self.assertIsNotNone(result.plan) + self.assertIn("EsRelation[logs]", result.plan) + + def test_parse_error_includes_position(self) -> None: + result = self.validator.validate("FROM logs | WAT") + self.assertEqual(result.status, "parse_error") + self.assertGreaterEqual(len(result.errors), 1) + err = result.errors[0] + self.assertEqual(err.type, "ParsingException") + self.assertEqual(err.line, 1) + self.assertGreater(err.column or 0, 0) + + def test_unknown_field_is_verify_error(self) -> None: + result = self.validator.validate( + "FROM logs | WHERE missing_field == 1", + indices={"logs": {"properties": {"foo": {"type": "integer"}}}}, + ) + self.assertEqual(result.status, "verify_error") + err = result.errors[0] + self.assertEqual(err.type, "VerificationException") + self.assertIn("missing_field", err.message) + self.assertEqual(err.line, 1) + + def test_type_mismatch_is_verify_error(self) -> None: + # Comparing a keyword field with a number — Verifier should flag this. + result = self.validator.validate( + 'FROM logs | WHERE name == 1', + indices={"logs": {"properties": {"name": {"type": "keyword"}}}}, + ) + self.assertEqual(result.status, "verify_error", msg=result.raw) + self.assertTrue(any("name" in e.message for e in result.errors), msg=result.errors) + + def test_raise_for_status(self) -> None: + result = self.validator.validate("FROM x | WAT") + with self.assertRaises(ValidationError): + result.raise_for_status() + + def test_multiple_round_trips_share_daemon(self) -> None: + # Verify the long-running daemon doesn't break across many calls. + mapping = {"logs": {"properties": {"foo": {"type": "integer"}}}} + for i in range(10): + r = self.validator.validate(f"FROM logs | LIMIT {i + 1}", indices=mapping) + self.assertTrue(r.ok, msg=f"iteration {i}: {r.raw}") + + +if __name__ == "__main__": + unittest.main() From 3b44d7612bfbd520a77d3a1018365041431c259f Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Wed, 27 May 2026 16:19:22 -0400 Subject: [PATCH 02/12] Add query column support --- detection_rules/esql_parser/validator.py | 7 ++++ lib/esql-validator/README.md | 14 ++++---- .../detectionrules/esqlvalidator/Main.java | 33 ++++++++++++++++++- 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/detection_rules/esql_parser/validator.py b/detection_rules/esql_parser/validator.py index b09fbf79e4c..62f1bbf4b1f 100644 --- a/detection_rules/esql_parser/validator.py +++ b/detection_rules/esql_parser/validator.py @@ -51,6 +51,12 @@ class ValidationResult: status: str # 'ok' | 'parse_error' | 'verify_error' | 'request_error' plan: str | None = None + # Output columns, matching the shape returned by the ES|QL HTTP API: + # [{"name": "", "type": ""}, ...]. For fields whose underlying ES + # mapping is unsupported or in conflict, an entry also carries "original_types" + # (list[str]) and, when one can be inferred, "suggested_cast" (str). Only populated + # when status == 'ok'. + columns: list[dict[str, Any]] = field(default_factory=list) errors: list[_DiagnosticEntry] = field(default_factory=list) raw: dict[str, Any] = field(default_factory=dict) @@ -206,6 +212,7 @@ def validate( return ValidationResult( status=response.get("status", "unknown"), plan=response.get("plan"), + columns=response.get("columns", []), errors=[_DiagnosticEntry.from_json(e) for e in response.get("errors", [])], raw=response, ) diff --git a/lib/esql-validator/README.md b/lib/esql-validator/README.md index 0a4e9dd3938..be78d29ec1d 100644 --- a/lib/esql-validator/README.md +++ b/lib/esql-validator/README.md @@ -79,13 +79,13 @@ and response shape. **Response** -| `status` | Other fields | -|------------------|-------------------------------------------------------------| -| `ok` | `plan` — analyzed logical plan as text | -| `parse_error` | `errors[]` — `{type, message, line, column}` | -| `verify_error` | `errors[]` — one entry per Verifier diagnostic | -| `request_error` | `message` — malformed JSON or missing required field | -| `internal_error` | `message` — uncaught exception while serializing a response | +| `status` | Other fields | +|------------------|---------------------------------------------------------------------------------------------------------------| +| `ok` | `plan` — analyzed logical plan as text; `columns` — `[{name, type, original_types?, suggested_cast?}]`, matching the ES\|QL HTTP API response (the last two only appear for unsupported / type-conflicting fields) | +| `parse_error` | `errors[]` — `{type, message, line, column}` | +| `verify_error` | `errors[]` — one entry per Verifier diagnostic | +| `request_error` | `message` — malformed JSON or missing required field | +| `internal_error` | `message` — uncaught exception while serializing a response | ## Python interface diff --git a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java index 591d50fe582..6640af099ab 100644 --- a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java +++ b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java @@ -22,6 +22,8 @@ import org.elasticsearch.xpack.esql.parser.QueryParam; import org.elasticsearch.xpack.esql.parser.QueryParams; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.UnsupportedAttribute; import org.elasticsearch.xpack.esql.core.type.DataType; import java.io.BufferedReader; @@ -33,6 +35,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; /** * Long-running daemon that validates ES|QL queries via line-delimited JSON over @@ -50,7 +54,8 @@ * } * *

Response shape on success: - *

{@code {"id": "1", "status": "ok", "plan": "..."}}
+ *
{@code {"id": "1", "status": "ok", "plan": "...",
+ *              "columns": [{"name": "foo", "type": "integer"}, ...]}}
* *

Response shape on failure: *

{@code {"id": "1", "status": "parse_error", "errors": [{"message": "...", "line": 1, "column": 5}]}}
@@ -133,10 +138,36 @@ private static String handle(String line, EsqlParser parser, InferenceSettings i req.indices, req.lookupIndices, req.enrichPolicies)); LogicalPlan analyzed = analyzer.analyze(parsed); String planText = analyzed.toString(); + List outputAttrs = analyzed.output(); return write(b -> { b.field("id", req.id); b.field("status", "ok"); b.field("plan", planText); + b.startArray("columns"); + for (Attribute attr : outputAttrs) { + b.startObject(); + b.field("name", attr.name()); + b.field("type", attr.dataType().outputType()); + // Mirror ColumnInfoImpl: unsupported fields surface their underlying ES + // types and a suggested cast, so callers can react to mapping conflicts. + if (attr instanceof UnsupportedAttribute ua) { + List originalTypes = ua.originalTypes(); + if (originalTypes != null && !originalTypes.isEmpty()) { + b.field("original_types", originalTypes); + DataType suggestedCast = DataType.suggestedCast( + originalTypes.stream() + .map(DataType::fromTypeName) + .filter(Objects::nonNull) + .collect(Collectors.toSet()) + ); + if (suggestedCast != null) { + b.field("suggested_cast", suggestedCast.typeName()); + } + } + } + b.endObject(); + } + b.endArray(); }); } catch (VerificationException ve) { return verifyErrorResponse(req.id, ve.getMessage()); From 41d3a34b948ea1861194799aafaa1baf8abe407f Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Wed, 27 May 2026 16:46:06 -0400 Subject: [PATCH 03/12] Update the python validation route --- detection_rules/esql_errors.py | 8 +- detection_rules/esql_parser/__init__.py | 16 ++- detection_rules/esql_parser/validator.py | 132 +++++++++++++++++----- detection_rules/index_mappings.py | 137 +++++++---------------- detection_rules/remote_validation.py | 4 +- detection_rules/rule_validators.py | 27 ++--- 6 files changed, 184 insertions(+), 140 deletions(-) diff --git a/detection_rules/esql_errors.py b/detection_rules/esql_errors.py index 8316888cc8c..5aa5e634876 100644 --- a/detection_rules/esql_errors.py +++ b/detection_rules/esql_errors.py @@ -38,12 +38,16 @@ def cleanup_empty_indices( class EsqlKibanaBaseError(ClientError): """Base class for ESQL exceptions with cleanup logic.""" + # elastic_client is optional: it's only used to clean up stale rule-test-* / + # test-* indices from previous remote runs. Local-only callers pass None and + # the cleanup is skipped. def __init__( self, message: str, - elastic_client: Elasticsearch, + elastic_client: Elasticsearch | None = None, ) -> None: - cleanup_empty_indices(elastic_client) + if elastic_client is not None: + cleanup_empty_indices(elastic_client) super().__init__(message, original_error=self) diff --git a/detection_rules/esql_parser/__init__.py b/detection_rules/esql_parser/__init__.py index 51228040b64..550107a54b6 100644 --- a/detection_rules/esql_parser/__init__.py +++ b/detection_rules/esql_parser/__init__.py @@ -9,6 +9,18 @@ line-delimited JSON over stdin/stdout to validate arbitrary ES|QL queries. """ -from .validator import EsqlValidator, ValidationError, ValidationResult +from .validator import ( + EsqlValidator, + ValidationError, + ValidationResult, + get_shared_validator, + shared_validator, +) -__all__ = ("EsqlValidator", "ValidationError", "ValidationResult") +__all__ = ( + "EsqlValidator", + "ValidationError", + "ValidationResult", + "get_shared_validator", + "shared_validator", +) diff --git a/detection_rules/esql_parser/validator.py b/detection_rules/esql_parser/validator.py index 62f1bbf4b1f..e29a4e39eb1 100644 --- a/detection_rules/esql_parser/validator.py +++ b/detection_rules/esql_parser/validator.py @@ -7,12 +7,15 @@ from __future__ import annotations +import atexit +import contextlib import itertools import json import os import subprocess import sys import threading +from collections.abc import Iterator from dataclasses import dataclass, field from pathlib import Path from typing import IO, Any @@ -74,15 +77,7 @@ def raise_for_status(self) -> None: class EsqlValidator: - """Long-running JVM daemon that parses and verifies ES|QL queries. - - Usage: - with EsqlValidator() as v: - result = v.validate("FROM logs | LIMIT 5", - indices={"logs": {"properties": {"foo": {"type": "integer"}}}}) - if not result.ok: - ... - """ + """Long-running JVM daemon that parses and verifies ES|QL queries.""" def __init__( self, @@ -92,6 +87,7 @@ def __init__( startup_timeout: float = 60.0, request_timeout: float = 30.0, build_if_missing: bool = True, + heap_size: str | None = "512m", ) -> None: self.validator_dir = Path(validator_dir or DEFAULT_VALIDATOR_DIR) self.es_home = Path(es_home or os.environ.get("ES_HOME") or DEFAULT_ES_HOME) @@ -99,12 +95,17 @@ def __init__( self.startup_timeout = startup_timeout self.request_timeout = request_timeout self.build_if_missing = build_if_missing + # Cap JVM heap so long-running daemons in bulk validation don't grow unbounded. + self.heap_size = heap_size self._proc: subprocess.Popen[bytes] | None = None self._lock = threading.Lock() self._counter = itertools.count(1) self._stderr_thread: threading.Thread | None = None self._stderr_lines: list[str] = [] + # PID that spawned the current daemon; used to detect fork-inherited state. + self._started_pid: int | None = None + self._atexit_registered = False # --- public API --------------------------------------------------------- @@ -134,14 +135,23 @@ def start(self) -> None: env = os.environ.copy() env.setdefault("RUNTIME_JAVA_HOME", env.get("JAVA_HOME", "")) + cmd: list[str] = [self.java_bin] + if self.heap_size: + cmd.append(f"-Xmx{self.heap_size}") + cmd.extend(["-cp", classpath, "co.elastic.detectionrules.esqlvalidator.Main"]) + self._proc = subprocess.Popen( - [self.java_bin, "-cp", classpath, "co.elastic.detectionrules.esqlvalidator.Main"], + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, bufsize=0, ) + self._started_pid = os.getpid() + if not self._atexit_registered: + atexit.register(self._atexit_stop) + self._atexit_registered = True assert self._proc.stdout is not None and self._proc.stdin is not None self._stderr_thread = threading.Thread( target=self._drain_stderr, args=(self._proc.stderr,), daemon=True @@ -172,6 +182,7 @@ def stop(self) -> None: self._proc.kill() finally: self._proc = None + self._started_pid = None def validate( self, @@ -182,17 +193,11 @@ def validate( enrich_policies: list[dict[str, Any]] | None = None, params: list[Any] | None = None, ) -> ValidationResult: - """Parse and verify an ES|QL query. - - Args: - query: the ES|QL query text. - indices: ``{index_pattern: es_mapping}`` for FROM targets. - ``es_mapping`` is the standard ES mapping JSON, e.g. - ``{"properties": {"foo": {"type": "integer"}}}``. - lookup_indices: same shape as ``indices``, for LOOKUP JOIN targets. - enrich_policies: list of ``{name, policy_type, match_field, index, mapping}``. - params: ES|QL positional query params (``?``). - """ + """Parse and verify an ES|QL query.""" + # indices: {pattern: es_mapping} for FROM targets, e.g. {"logs": {"properties": ...}}. + # lookup_indices: same shape, for LOOKUP JOIN targets. + # enrich_policies: list of {name, policy_type, match_field, index, mapping}. + # params: positional query params (?). request_id = str(next(self._counter)) payload: dict[str, Any] = {"id": request_id, "query": query} if indices: @@ -243,13 +248,55 @@ def _build(self) -> None: ) def _roundtrip(self, payload: dict[str, Any]) -> dict[str, Any]: + # One transparent restart if the daemon was inherited across a fork or died + # since the last call; surface the underlying failure on a second strike. + last_err: ValidationError | None = None with self._lock: - self._send(payload) - line = self._read_line(self.request_timeout) + for attempt in range(2): + try: + self._ensure_alive() + self._send(payload) + line = self._read_line(self.request_timeout) + try: + return json.loads(line) + except json.JSONDecodeError as e: + raise ValidationError(f"Daemon emitted non-JSON: {line!r}") from e + except (BrokenPipeError, ValidationError) as e: + last_err = e if isinstance(e, ValidationError) else ValidationError(str(e)) + # Force a clean respawn on the next attempt. + self._proc = None + self._started_pid = None + continue + assert last_err is not None + raise last_err + + def _ensure_alive(self) -> None: + """Spawn or respawn the daemon as needed.""" + # Handles three cases: never started, inherited via fork(), and crashed since + # the last call. Callers (currently _roundtrip) retry once after this. + if self._proc is None: + self.start() + return + if self._started_pid is not None and self._started_pid != os.getpid(): + # We're in a forked child that inherited the parent's pipes; don't reuse them. + self._proc = None + self._started_pid = None + self._stderr_lines.clear() + self.start() + return + if self._proc.poll() is not None: + self._proc = None + self._started_pid = None + self._stderr_lines.clear() + self.start() + + def _atexit_stop(self) -> None: + # Best-effort cleanup on interpreter exit; swallow everything so we never + # interfere with shutdown. try: - return json.loads(line) - except json.JSONDecodeError as e: - raise ValidationError(f"Daemon emitted non-JSON: {line!r}") from e + self.stop() + except Exception: # noqa: BLE001 + pass def _send(self, payload: dict[str, Any]) -> None: if self._proc is None or self._proc.stdin is None: @@ -302,3 +349,36 @@ def _drain_stderr(self, stream: IO[bytes]) -> None: def _kill(self) -> None: if self._proc is not None and self._proc.poll() is None: self._proc.kill() + + +# --- Shared session --------------------------------------------------------- +# +# Keyed by os.getpid() so a child process never reuses a daemon inherited from +# its parent (a fork would have it share the same stdin/stdout pipes, which +# would corrupt the JSON protocol). +_SHARED: dict[int, EsqlValidator] = {} + + +def get_shared_validator() -> EsqlValidator | None: + """Return the validator registered for the current process, if any.""" + return _SHARED.get(os.getpid()) + + +@contextlib.contextmanager +def shared_validator(**kwargs: Any) -> Iterator[EsqlValidator]: + """Scope a single EsqlValidator to a block of work, reused across calls in it.""" + # Re-entrant: an inner `with shared_validator()` yields the outer instance and + # does not stop it on exit. Used by bulk validation to amortize JVM startup. + pid = os.getpid() + existing = _SHARED.get(pid) + if existing is not None: + yield existing + return + v = EsqlValidator(**kwargs) + v.start() + _SHARED[pid] = v + try: + yield v + finally: + _ = _SHARED.pop(pid, None) + v.stop() diff --git a/detection_rules/index_mappings.py b/detection_rules/index_mappings.py index f3017145ad4..70f3cf0ca3f 100644 --- a/detection_rules/index_mappings.py +++ b/detection_rules/index_mappings.py @@ -6,17 +6,14 @@ """Validation logic for rules containing queries.""" import re -import time from collections.abc import Callable from copy import deepcopy from typing import Any -from elastic_transport import ObjectApiResponse from elasticsearch import Elasticsearch # type: ignore[reportMissingTypeStubs] -from elasticsearch.exceptions import BadRequestError from semver import Version -from . import ecs, integrations, misc, utils +from . import ecs, integrations, utils from .config import load_current_package_version from .esql import EventDataset from .esql_errors import ( @@ -26,15 +23,14 @@ EsqlTypeMismatchError, EsqlUnknownIndexError, EsqlUnsupportedTypeError, - cleanup_empty_indices, ) +from .esql_parser import EsqlValidator, get_shared_validator from .integrations import ( load_integrations_manifests, load_integrations_schemas, ) from .rule import RuleMeta from .schemas import get_stack_schemas -from .schemas.definitions import HTTP_STATUS_BAD_REQUEST from .utils import combine_dicts @@ -98,43 +94,6 @@ def get_rule_integrations(metadata: RuleMeta) -> list[str]: return rule_integrations -def create_index_with_index_mapping( - elastic_client: Elasticsearch, index_name: str, mappings: dict[str, Any] -) -> ObjectApiResponse[Any] | None: - """Create an index with the specified mappings and settings to support large number of fields and nested objects.""" - try: - return elastic_client.indices.create( - index=index_name, - mappings={"properties": mappings}, - settings={ - "index.mapping.total_fields.limit": 10000, - "index.mapping.nested_fields.limit": 500, - "index.mapping.nested_objects.limit": 10000, - }, - ) - except BadRequestError as e: - error_message = str(e) - if ( - e.status_code == HTTP_STATUS_BAD_REQUEST - and "validation_exception" in error_message - and "Validation Failed: 1: this action would add [2] shards" in error_message - ): - cleanup_empty_indices(elastic_client) - try: - return elastic_client.indices.create( - index=index_name, - mappings={"properties": mappings}, - settings={ - "index.mapping.total_fields.limit": 10000, - "index.mapping.nested_fields.limit": 500, - "index.mapping.nested_objects.limit": 10000, - }, - ) - except BadRequestError as retry_error: - raise EsqlSchemaError(str(retry_error), elastic_client) from retry_error - raise EsqlSchemaError(error_message, elastic_client) from e - - def get_existing_mappings(elastic_client: Elasticsearch, indices: list[str]) -> tuple[dict[str, Any], dict[str, Any]]: """Retrieve mappings for all matching existing index templates.""" existing_mappings: dict[str, Any] = {} @@ -329,62 +288,48 @@ def get_filtered_index_schema( # noqa: PLR0913 return combined_mappings, filtered_index_mapping -def create_remote_indices( - elastic_client: Elasticsearch, - existing_mappings: dict[str, Any], - index_lookup: dict[str, Any], - log: Callable[[str], None], -) -> str: - """Create remote indices for validation and return the index string.""" - - suffix = str(int(time.time() * 1000)) - test_index = f"rule-test-index-{suffix}" - response = create_index_with_index_mapping(elastic_client, test_index, existing_mappings) - log(f"Index `{test_index}` created: {response}") - full_index_str = test_index - - # create all integration indices - for index, properties in index_lookup.items(): - ind_index_str = f"test-{index.rstrip('*')}{suffix}" - response = create_index_with_index_mapping(elastic_client, ind_index_str, properties) - log(f"Index `{ind_index_str}` created: {response}") - full_index_str = f"{full_index_str}, {ind_index_str}" - - return full_index_str - - def execute_query_against_indices( - elastic_client: Elasticsearch, + elastic_client: Elasticsearch | None, query: str, - test_index_str: str, + indices: dict[str, dict[str, Any]], log: Callable[[str], None], - delete_indices: bool = True, -) -> tuple[list[Any], ObjectApiResponse[Any]]: - """Execute the ESQL query against the test indices on a remote Stack and return the columns.""" - try: - log(f"Executing a query against `{test_index_str}`") - response = elastic_client.esql.query(query=query) - log(f"Got query response: {response}") - query_columns = response.get("columns", []) - except BadRequestError as e: - error_msg = str(e) - if "parsing_exception" in error_msg: - raise EsqlSyntaxError(str(e), elastic_client) from None - if "Unknown column" in error_msg: - raise EsqlSchemaError(str(e), elastic_client) from None - if "verification_exception" in error_msg and "unsupported type" in error_msg: - raise EsqlUnsupportedTypeError(str(e), elastic_client) from None - if "verification_exception" in error_msg: - raise EsqlTypeMismatchError(str(e), elastic_client) from None - raise EsqlKibanaBaseError(str(e), elastic_client) from None - if delete_indices or not misc.getdefault("skip_empty_index_cleanup")(): - for index_str in test_index_str.split(","): - response = elastic_client.indices.delete(index=index_str.strip()) - log(f"Test index `{index_str}` deleted: {response}") - - query_column_names = [c["name"] for c in query_columns] - log(f"Got query columns: {', '.join(query_column_names)}") - return query_columns, response +) -> tuple[list[dict[str, Any]], dict[str, Any]]: + """Validate an ES|QL query locally via the embedded Java validator.""" + # indices: {pattern: {"properties": {...}}} for each FROM target. elastic_client + # is only forwarded to error classes for opportunistic cleanup of stale + # rule-test-* indices from older remote runs; no query is sent to the cluster. + # Returns (columns, response) — columns matches the ES|QL HTTP API shape; response + # is a dict with a top-level "columns" key so callers expecting that wrapper work. + log(f"Validating ES|QL query locally against {len(indices)} index pattern(s)") + + shared = get_shared_validator() + if shared is not None: + result = shared.validate(query, indices=indices) + else: + with EsqlValidator() as v: + result = v.validate(query, indices=indices) + + if result.ok: + log(f"Got query columns: {', '.join(c.get('name', '') for c in result.columns)}") + return result.columns, {"columns": result.columns} + + # Map validator diagnostics back to the same exception types the remote path + # raised, so existing callers (and error-classification logic upstream) work + # unchanged. + first = result.errors[0] if result.errors else None + err_msg = first.message if first else f"status={result.status}" + if result.status == "parse_error": + raise EsqlSyntaxError(err_msg, elastic_client) from None + if result.status == "verify_error": + # Verifier messages are stable enough to substring-match. Unknown-column + # phrasing varies slightly (e.g. "Unknown column [x]" vs "unknown column"). + lower = err_msg.lower() + if "unknown column" in lower or "unknown function" in lower: + raise EsqlSchemaError(err_msg, elastic_client) from None + if "unsupported type" in lower: + raise EsqlUnsupportedTypeError(err_msg, elastic_client) from None + raise EsqlTypeMismatchError(err_msg, elastic_client) from None + raise EsqlKibanaBaseError(err_msg, elastic_client) from None def find_nested_multifields(mapping: dict[str, Any], path: str = "") -> list[Any]: diff --git a/detection_rules/remote_validation.py b/detection_rules/remote_validation.py index 36fc78acffa..f28cd053287 100644 --- a/detection_rules/remote_validation.py +++ b/detection_rules/remote_validation.py @@ -190,7 +190,9 @@ def validate_esql(self, contents: TOMLRuleContents, index_replacement: bool = Fa if index_replacement: try: validator = ESQLValidator(contents.data.query) # type: ignore[reportIncompatibleMethodOverride] - response = validator.remote_validate_rule_contents(self.kibana_client, self.es_client, contents) + # Local validator returns a plain dict already shaped like the + # ES|QL HTTP API body ({"columns": [...]}); no .body unwrap below. + return validator.remote_validate_rule_contents(self.kibana_client, self.es_client, contents) except Exception as exc: if isinstance(exc, elasticsearch.BadRequestError): raise ValidationError(f"ES|QL query failed: {exc} for rule: {rule_id}, query: \n{query}") from exc diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index 9b3f412c885..31a280845e4 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -15,7 +15,6 @@ import eql # type: ignore[reportMissingTypeStubs] import kql # type: ignore[reportMissingTypeStubs] -from elastic_transport import ObjectApiResponse from elasticsearch import Elasticsearch # type: ignore[reportMissingTypeStubs] from eql import ast # type: ignore[reportMissingTypeStubs] from eql.parser import ( # type: ignore[reportMissingTypeStubs] @@ -35,7 +34,6 @@ from .esql import get_esql_query_event_dataset_integrations from .esql_errors import EsqlTypeMismatchError from .index_mappings import ( - create_remote_indices, execute_query_against_indices, get_rule_integrations, prepare_mappings, @@ -861,7 +859,7 @@ def validate(self, data: "QueryRuleData", rule_meta: RuleMeta, force_remote_vali def remote_validate_rule_contents( self, kibana_client: Kibana, elastic_client: Elasticsearch, contents: TOMLRuleContents, verbosity: int = 0 - ) -> ObjectApiResponse[Any]: + ) -> dict[str, Any]: """Remote validate a rule's ES|QL query using an Elastic Stack.""" return self.remote_validate_rule( kibana_client=kibana_client, @@ -880,7 +878,7 @@ def remote_validate_rule( # noqa: PLR0913 metadata: RuleMeta, rule_id: str = "", verbosity: int = 0, - ) -> ObjectApiResponse[Any]: + ) -> dict[str, Any]: """Uses remote validation from an Elastic Stack to validate ES|QL a given rule""" self.rule_id = rule_id @@ -903,20 +901,23 @@ def remote_validate_rule( # noqa: PLR0913 f"{', '.join(str(integration) for integration in event_dataset_integrations)}" ) - # Get mappings for all matching existing index templates - existing_mappings, index_lookup, combined_mappings = prepare_mappings( + # Get mappings for all matching existing index templates. The middle + # index_lookup return is unused now that we no longer materialize remote + # test indices for the integration patterns. + existing_mappings, _index_lookup, combined_mappings = prepare_mappings( elastic_client, indices, event_dataset_integrations, metadata, stack_version, self.log ) self.log(f"Collected mappings: {len(existing_mappings)}") self.log(f"Combined mappings prepared: {len(combined_mappings)}") - # Create remote indices - full_index_str = create_remote_indices(elastic_client, existing_mappings, index_lookup, self.log) - - # Replace all sources with the test indices - query = query.replace(indices_str, full_index_str) # type: ignore[reportUnknownVariableType] - - query_columns, response = execute_query_against_indices(elastic_client, query, full_index_str, self.log) # type: ignore[reportUnknownVariableType] + # Validate the query locally via the embedded Java validator. The combined + # mapping covers every field across the rule's integrations, and the key + # matches the FROM clause exactly so the analyzer's IndexResolution lookup + # succeeds without any string rewriting. + validator_indices = {indices_str: {"properties": combined_mappings}} + query_columns, response = execute_query_against_indices( + elastic_client, query, validator_indices, self.log + ) self.esql_unique_fields = query_columns # Build a mapping lookup for all stack versions to validate against. From c7fa13ef6a7a5932887f75b193de8c91b2f6db3b Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Wed, 27 May 2026 17:03:33 -0400 Subject: [PATCH 04/12] Remove remote references as appropriate --- .github/workflows/esql-validation.yml | 72 +++++++--------------- .github/workflows/lock-versions.yml | 4 +- .github/workflows/pythonpackage.yml | 4 +- .github/workflows/release-fleet.yml | 2 +- CLI.md | 4 +- detection_rules/devtools.py | 86 +++++++++++---------------- detection_rules/index_mappings.py | 12 +++- detection_rules/main.py | 12 ++-- detection_rules/rule_validators.py | 65 ++++++++++---------- tests/test_rules_remote.py | 2 +- 10 files changed, 111 insertions(+), 152 deletions(-) diff --git a/.github/workflows/esql-validation.yml b/.github/workflows/esql-validation.yml index b0d7802edb6..30b9a5ec1e5 100644 --- a/.github/workflows/esql-validation.yml +++ b/.github/workflows/esql-validation.yml @@ -44,56 +44,28 @@ jobs: echo "run_esql=true" >> $GITHUB_ENV - - name: Check out repository - env: - DR_CLOUD_ID: ${{ secrets.dr_cloud_id }} - DR_API_KEY: ${{ secrets.dr_api_key }} - if: ${{ !env.DR_CLOUD_ID && !env.DR_API_KEY && env.run_esql == 'true' }} - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5 - with: - path: elastic-container - repository: peasead/elastic-container - - - name: Build and run containers - env: - DR_CLOUD_ID: ${{ secrets.dr_cloud_id }} - DR_API_KEY: ${{ secrets.dr_api_key }} - if: ${{ !env.DR_CLOUD_ID && !env.DR_API_KEY && env.run_esql == 'true' }} - run: | - cd elastic-container - GENERATED_PASSWORD=$(openssl rand -base64 16) - sed -i "s|changeme|$GENERATED_PASSWORD|" .env - echo "::add-mask::$GENERATED_PASSWORD" - echo "GENERATED_PASSWORD=$GENERATED_PASSWORD" >> $GITHUB_ENV - set -x - bash elastic-container.sh update-version - bash elastic-container.sh start - - - name: Get API Key and setup auth - env: - DR_CLOUD_ID: ${{ secrets.dr_cloud_id }} - DR_API_KEY: ${{ secrets.dr_api_key }} - DR_ELASTICSEARCH_URL: "https://localhost:9200" - ES_USER: "elastic" - ES_PASSWORD: ${{ env.GENERATED_PASSWORD }} - if: ${{ !env.DR_CLOUD_ID && !env.DR_API_KEY && env.run_esql == 'true' }} - run: | - cd detection-rules - response=$(curl -k -X POST -u "$ES_USER:$ES_PASSWORD" -H "Content-Type: application/json" -d '{ - "name": "tmp-api-key", - "expiration": "1d" - }' "$DR_ELASTICSEARCH_URL/_security/api_key") - - DR_API_KEY=$(echo "$response" | jq -r '.encoded') - echo "::add-mask::$DR_API_KEY" - echo "DR_API_KEY=$DR_API_KEY" >> $GITHUB_ENV - - name: Set up Python 3.13 if: ${{ env.run_esql == 'true' }} uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6 with: python-version: '3.13' + - name: Set up JDK 21 + if: ${{ env.run_esql == 'true' }} + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '21' + + - name: Check out elastic/elasticsearch for ES|QL validator build + if: ${{ env.run_esql == 'true' }} + uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5 + with: + repository: elastic/elasticsearch + path: elasticsearch + # Match the stack version the rules currently target. Update alongside + # detection_rules.schemas.get_latest_stack_version(). + - name: Install dependencies if: ${{ env.run_esql == 'true' }} run: | @@ -102,14 +74,12 @@ jobs: pip cache purge pip install .[dev] - - name: Remote Test ESQL Rules + - name: Test ES|QL Rules if: ${{ env.run_esql == 'true' }} env: - DR_CLOUD_ID: ${{ secrets.dr_cloud_id || '' }} - DR_KIBANA_URL: ${{ secrets.dr_cloud_id == '' && 'https://localhost:5601' || '' }} - DR_ELASTICSEARCH_URL: ${{ secrets.dr_cloud_id == '' && 'https://localhost:9200' || '' }} - DR_API_KEY: ${{ secrets.dr_api_key || env.DR_API_KEY }} - DR_IGNORE_SSL_ERRORS: ${{ secrets.dr_cloud_id == '' && 'true' || '' }} + # Point the validator's build script at the elasticsearch checkout above. + # The first compile is slow; gradle build cache speeds up later runs. + ES_HOME: ${{ github.workspace }}/elasticsearch run: | cd detection-rules - python -m detection_rules dev test esql-remote-validation + python -m detection_rules dev test esql-validation diff --git a/.github/workflows/lock-versions.yml b/.github/workflows/lock-versions.yml index 9533a6ecfe1..e96a434dfe1 100644 --- a/.github/workflows/lock-versions.yml +++ b/.github/workflows/lock-versions.yml @@ -87,7 +87,7 @@ jobs: - name: Build release package with navigator files env: - DR_REMOTE_ESQL_VALIDATION: "true" + DR_ESQL_VALIDATION: "true" DR_CLOUD_ID: ${{ secrets.dr_cloud_id || '' }} DR_KIBANA_URL: ${{ secrets.dr_cloud_id == '' && 'https://localhost:5601' || '' }} DR_ELASTICSEARCH_URL: ${{ secrets.dr_cloud_id == '' && 'https://localhost:9200' || '' }} @@ -111,7 +111,7 @@ jobs: - name: Lock the versions env: BRANCHES: "${{github.event.inputs.branches}}" - DR_REMOTE_ESQL_VALIDATION: "true" + DR_ESQL_VALIDATION: "true" DR_CLOUD_ID: ${{ secrets.dr_cloud_id || '' }} DR_KIBANA_URL: ${{ secrets.dr_cloud_id == '' && 'https://localhost:5601' || '' }} DR_ELASTICSEARCH_URL: ${{ secrets.dr_cloud_id == '' && 'https://localhost:9200' || '' }} diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index f4dbcee348f..93fd7880ae8 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -40,8 +40,8 @@ jobs: env: # only run the test test_rule_change_has_updated_date on pull request events to main GITHUB_EVENT_NAME: "${{ github.event_name}}" - # only run remote validation if repo is set to do so otherwise defer to .github/workflows/esql-validation.yml - DR_REMOTE_ESQL_VALIDATION: "${{ vars.remote_esql_validation }}" + # only run ES|QL validation if the repo opts in; otherwise defer to .github/workflows/esql-validation.yml + DR_ESQL_VALIDATION: "${{ vars.esql_validation }}" DR_CLOUD_ID: ${{ secrets.dr_cloud_id }} DR_KIBANA_URL: ${{ secrets.dr_cloud_id }} DR_ELASTICSEARCH_URL: ${{ secrets.dr_cloud_id }} diff --git a/.github/workflows/release-fleet.yml b/.github/workflows/release-fleet.yml index 236fa6a0788..3912052c141 100644 --- a/.github/workflows/release-fleet.yml +++ b/.github/workflows/release-fleet.yml @@ -161,7 +161,7 @@ jobs: - name: Build release package env: - DR_REMOTE_ESQL_VALIDATION: "true" + DR_ESQL_VALIDATION: "true" DR_CLOUD_ID: ${{ secrets.dr_cloud_id || '' }} DR_KIBANA_URL: ${{ secrets.dr_cloud_id == '' && 'https://localhost:5601' || '' }} DR_ELASTICSEARCH_URL: ${{ secrets.dr_cloud_id == '' && 'https://localhost:9200' || '' }} diff --git a/CLI.md b/CLI.md index 2621da5ef95..36ad5c27110 100644 --- a/CLI.md +++ b/CLI.md @@ -55,9 +55,9 @@ In `_config.yaml`, `bypass_optional_elastic_validation: true` enables all of the Using the environment variable `DR_CLI_MAX_WIDTH` will set a custom max width for the click CLI. For instance, some users may want to increase the default value in cases where help messages are cut off. -Using the environment variable `DR_REMOTE_ESQL_VALIDATION` will enable remote ESQL validation for rules that use ESQL queries. This validation will be performed whenever the rule is loaded including for example the view-rule command. This requires the appropriate kibana_url or cloud_id, api_key, and es_url to be set in the config file or as environment variables. +Using the environment variable `DR_ESQL_VALIDATION` will enable ES|QL validation for rules that use ES|QL queries. This validation runs locally via the embedded Java validator (`lib/esql-validator`) and is performed whenever the rule is loaded — including, for example, the `view-rule` command. No Elasticsearch or Kibana credentials are required. -Using the environment variable `DR_SKIP_EMPTY_INDEX_CLEANUP` will disable the cleanup of remote testing indexes that are created as part of the remote ESQL validation. By default, these indexes are deleted after the validation is complete, or upon validation error. +Using the environment variable `DR_SKIP_EMPTY_INDEX_CLEANUP` will disable the cleanup of any stale `rule-test-*` / `test-*` indexes left over from older remote-validation runs. Current validation does not create any such indexes — the variable only affects opportunistic cleanup triggered when validation errors fire with a live Elasticsearch client configured. ## Importing rules into the repo diff --git a/detection_rules/devtools.py b/detection_rules/devtools.py index 26b6fdba4e2..4373ce6f4b1 100644 --- a/detection_rules/devtools.py +++ b/detection_rules/devtools.py @@ -26,7 +26,6 @@ import requests.exceptions import yaml from elasticsearch import BadRequestError, Elasticsearch -from elasticsearch import ConnectionError as ESConnectionError from eql.table import Table # type: ignore[reportMissingTypeStubs] from eql.utils import load_dump # type: ignore[reportMissingTypeStubs, reportUnknownVariableType] from kibana.connector import Kibana # type: ignore[reportMissingTypeStubs] @@ -43,6 +42,7 @@ from .esql_errors import ( ESQL_EXCEPTION_TYPES, ) +from .esql_parser import shared_validator from .eswrap import CollectEvents, add_range_to_dsl from .ghwrap import GithubClient, update_gist from .integrations import ( @@ -57,8 +57,6 @@ from .misc import ( PYTHON_LICENSE, add_client, - get_default_elasticsearch_client, - get_default_kibana_client, raise_client_error, ) from .packaging import CURRENT_RELEASE_PATH, PACKAGE_FILE, RELEASE_DIR, Package @@ -1414,18 +1412,20 @@ def rule_event_search( # noqa: PLR0913 raise_client_error("Rule is not a query rule!") -@test_group.command("esql-remote-validation") +@test_group.command("esql-validation") @click.option( "--verbosity", type=click.IntRange(0, 1), default=0, help="Set verbosity level: 0 for minimal output, 1 for detailed output.", ) -def esql_remote_validation( +def esql_validation( verbosity: int, ) -> None: - """Search using a rule file against an Elasticsearch instance.""" - + """Validate all production ES|QL rules locally via the embedded Java validator.""" + # Validation is fully local: no Kibana or Elasticsearch client is required. + # shared_validator() spawns one JVM daemon up front and reuses it across every + # rule, instead of paying ~2-3s startup per rule. rule_collection: RuleCollection = RuleCollection.default().filter(production_filter) esql_rules = [r for r in rule_collection if r.contents.data.type == "esql"] @@ -1433,51 +1433,35 @@ def esql_remote_validation( if not esql_rules: return - # TODO(eric-forte-elastic): @add_client https://github.com/elastic/detection-rules/issues/5156 # noqa: FIX002 - with get_default_kibana_client() as kibana_client, get_default_elasticsearch_client() as elastic_client: - if not kibana_client or not elastic_client: - raise_client_error("Skipping remote validation due to missing client") - - failed_count = 0 - fail_list: list[str] = [] - max_retries = 3 + + failed_count = 0 + fail_list: list[str] = [] + with shared_validator(): for r in esql_rules: - retry_count = 0 - while retry_count < max_retries: - try: - validator = ESQLValidator(r.contents.data.query) # type: ignore[reportIncompatibleMethodOverride] - _ = validator.remote_validate_rule_contents(kibana_client, elastic_client, r.contents, verbosity) - break - except (ValueError, BadRequestError, *ESQL_EXCEPTION_TYPES) as e: # type: ignore[reportUnknownMemberType] - e_type = type(e) # type: ignore[reportUnknownMemberType] - if isinstance(e, ESQL_EXCEPTION_TYPES): - click.echo(click.style(f"{r.contents.data.rule_id} ", fg="red", bold=True), nl=False) - _ = e.show() # type: ignore[reportUnknownMemberType] - else: - click.echo(f"FAILURE: {e_type}: {e}") # type: ignore[reportUnknownMemberType] - fail_list.append(f"{r.contents.data.rule_id} FAILURE: {e_type}: {e}") # type: ignore[reportUnknownMemberType] - failed_count += 1 - break - except ESConnectionError as e: - retry_count += 1 - click.echo(f"Connection error: {e}. Retrying {retry_count}/{max_retries}...") - time.sleep(30) - if retry_count == max_retries: - click.echo(f"FAILURE: {e} after {max_retries} retries") - fail_list.append(f"FAILURE: {e} after {max_retries} retries") - failed_count += 1 - - click.echo(f"Total rules: {len(esql_rules)}") - click.echo(f"Failed rules: {failed_count}") - - _ = Path("failed_rules.log").write_text("\n".join(fail_list), encoding="utf-8") - click.echo("Failed rules written to failed_rules.log") - if failed_count > 0: - click.echo("Failed rule IDs:") - uuids = {line.split()[0] for line in fail_list} - click.echo("\n".join(uuids)) - ctx = click.get_current_context() - ctx.exit(1) + try: + validator = ESQLValidator(r.contents.data.query) # type: ignore[reportIncompatibleMethodOverride] + _ = validator.remote_validate_rule_contents(None, None, r.contents, verbosity) + except (ValueError, BadRequestError, *ESQL_EXCEPTION_TYPES) as e: # type: ignore[reportUnknownMemberType] + e_type = type(e) # type: ignore[reportUnknownMemberType] + if isinstance(e, ESQL_EXCEPTION_TYPES): + click.echo(click.style(f"{r.contents.data.rule_id} ", fg="red", bold=True), nl=False) + _ = e.show() # type: ignore[reportUnknownMemberType] + else: + click.echo(f"FAILURE: {e_type}: {e}") # type: ignore[reportUnknownMemberType] + fail_list.append(f"{r.contents.data.rule_id} FAILURE: {e_type}: {e}") # type: ignore[reportUnknownMemberType] + failed_count += 1 + + click.echo(f"Total rules: {len(esql_rules)}") + click.echo(f"Failed rules: {failed_count}") + + _ = Path("failed_rules.log").write_text("\n".join(fail_list), encoding="utf-8") + click.echo("Failed rules written to failed_rules.log") + if failed_count > 0: + click.echo("Failed rule IDs:") + uuids = {line.split()[0] for line in fail_list} + click.echo("\n".join(uuids)) + ctx = click.get_current_context() + ctx.exit(1) @test_group.command("rule-survey") diff --git a/detection_rules/index_mappings.py b/detection_rules/index_mappings.py index 70f3cf0ca3f..8261f1e02b0 100644 --- a/detection_rules/index_mappings.py +++ b/detection_rules/index_mappings.py @@ -94,10 +94,16 @@ def get_rule_integrations(metadata: RuleMeta) -> list[str]: return rule_integrations -def get_existing_mappings(elastic_client: Elasticsearch, indices: list[str]) -> tuple[dict[str, Any], dict[str, Any]]: +def get_existing_mappings( + elastic_client: Elasticsearch | None, indices: list[str] +) -> tuple[dict[str, Any], dict[str, Any]]: """Retrieve mappings for all matching existing index templates.""" + # When elastic_client is None we skip simulate_index_template entirely; callers + # fall back to local integration / ECS / custom schemas. existing_mappings: dict[str, Any] = {} index_lookup: dict[str, Any] = {} + if elastic_client is None: + return existing_mappings, index_lookup for index in indices: index_tmpl_mappings = get_simulated_index_template_mappings(elastic_client, index) index_lookup[index] = index_tmpl_mappings @@ -404,7 +410,7 @@ def get_ecs_schema_mappings(current_version: Version) -> dict[str, Any]: def prepare_mappings( # noqa: PLR0913 - elastic_client: Elasticsearch, + elastic_client: Elasticsearch | None, indices: list[str], event_dataset_integrations: list[EventDataset], metadata: RuleMeta, @@ -412,6 +418,8 @@ def prepare_mappings( # noqa: PLR0913 log: Callable[[str], None], ) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]: """Prepare index mappings for the given indices and rule integrations.""" + # When elastic_client is None, get_existing_mappings returns empty and we rely + # solely on local integration, ECS, non-ECS and custom schemas below. existing_mappings, index_lookup = get_existing_mappings(elastic_client, indices) # Collect mappings for the integrations diff --git a/detection_rules/main.py b/detection_rules/main.py index baf1d1de800..97010bc1122 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -489,21 +489,23 @@ def mass_update( @root.command("view-rule") @click.argument("rule-file", type=Path) @click.option("--api-format/--rule-format", default=True, help="Print the rule in final api or rule format") -@click.option("--esql-remote-validation", is_flag=True, default=False, help="Enable remote validation for the rule") +@click.option("--esql-validation", is_flag=True, default=False, help="Run local ES|QL validation on the rule") @click.pass_context def view_rule( - _: click.Context, rule_file: Path, api_format: str, esql_remote_validation: bool + _: click.Context, rule_file: Path, api_format: str, esql_validation: bool ) -> TOMLRule | DeprecatedRule: """View an internal rule or specified rule file.""" rule = RuleCollection().load_file(rule_file) + # Skip if the config-level auto-validation already ran during rule loading + # (avoids double-validating the same rule). if ( - esql_remote_validation + esql_validation and isinstance(rule.contents.data, ESQLRuleData) and isinstance(rule.contents.data.validator, ESQLValidator) and isinstance(rule.contents.metadata, RuleMeta) - and not getdefault("remote_esql_validation")() + and not getdefault("esql_validation")() ): - rule.contents.data.validator.validate(rule.contents.data, rule.contents.metadata, force_remote_validation=True) + rule.contents.data.validator.validate(rule.contents.data, rule.contents.metadata, force_validation=True) if api_format: click.echo(json.dumps(rule.contents.to_api_format(), indent=2, sort_keys=True)) diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index 31a280845e4..93333001f73 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -830,37 +830,31 @@ def validate_columns_index_mapping( return True - def validate(self, data: "QueryRuleData", rule_meta: RuleMeta, force_remote_validation: bool = False) -> None: # type: ignore[reportIncompatibleMethodOverride] + def validate(self, data: "QueryRuleData", rule_meta: RuleMeta, force_validation: bool = False) -> None: # type: ignore[reportIncompatibleMethodOverride] """Validate an ESQL query while checking TOMLRule.""" - if misc.getdefault("remote_esql_validation")() or force_remote_validation: - resolved_kibana_options = { - str(option.name): option.default() if callable(option.default) else option.default - for option in misc.kibana_options - if option.name is not None - } - - resolved_elastic_options = { - option.name: option.default() if callable(option.default) else option.default - for option in misc.elasticsearch_options - if option.name is not None - } - - with ( - misc.get_kibana_client(**resolved_kibana_options) as kibana_client, # type: ignore[reportUnknownVariableType] - misc.get_elasticsearch_client(**resolved_elastic_options) as elastic_client, # type: ignore[reportUnknownVariableType] - ): - _ = self.remote_validate_rule( - kibana_client, - elastic_client, - data.query, - rule_meta, - data.rule_id, - ) + if not (misc.getdefault("esql_validation")() or force_validation): + return + + # Validation runs locally via the embedded Java validator; no Kibana or + # Elasticsearch client is required. The remote-client setup that used to + # live here was the source of "Missing required --cloud-id/--kibana-url" + # errors when this was invoked from `view-rule --esql-validation`. + _ = self.remote_validate_rule( + kibana_client=None, + elastic_client=None, + query=data.query, + metadata=rule_meta, + rule_id=data.rule_id, + ) def remote_validate_rule_contents( - self, kibana_client: Kibana, elastic_client: Elasticsearch, contents: TOMLRuleContents, verbosity: int = 0 + self, + kibana_client: Kibana | None, + elastic_client: Elasticsearch | None, + contents: TOMLRuleContents, + verbosity: int = 0, ) -> dict[str, Any]: - """Remote validate a rule's ES|QL query using an Elastic Stack.""" + """Validate a rule's ES|QL query (clients optional; both may be None for local-only).""" return self.remote_validate_rule( kibana_client=kibana_client, elastic_client=elastic_client, @@ -872,23 +866,24 @@ def remote_validate_rule_contents( def remote_validate_rule( # noqa: PLR0913 self, - kibana_client: Kibana, - elastic_client: Elasticsearch, + kibana_client: Kibana | None, + elastic_client: Elasticsearch | None, query: str, metadata: RuleMeta, rule_id: str = "", verbosity: int = 0, ) -> dict[str, Any]: - """Uses remote validation from an Elastic Stack to validate ES|QL a given rule""" + """Validate an ES|QL rule using the embedded Java validator.""" self.rule_id = rule_id self.verbosity = verbosity - # Validate that all fields (columns) are either dynamic fields or correctly mapped - # against the combined mapping of all the indices - kibana_details: dict[str, Any] = kibana_client.get("/api/status", {}) # type: ignore[reportUnknownVariableType] - if "version" not in kibana_details: - raise ValueError("Failed to retrieve Kibana details.") + # When a Kibana client is supplied, sanity-check connectivity; otherwise we + # rely solely on local schemas and the embedded validator (no cluster needed). + if kibana_client is not None: + kibana_details: dict[str, Any] = kibana_client.get("/api/status", {}) # type: ignore[reportUnknownVariableType] + if "version" not in kibana_details: + raise ValueError("Failed to retrieve Kibana details.") stack_version = get_latest_stack_version() self.log(f"Validating against {stack_version} stack") diff --git a/tests/test_rules_remote.py b/tests/test_rules_remote.py index 3c1f6c712d7..0f4099afabb 100644 --- a/tests/test_rules_remote.py +++ b/tests/test_rules_remote.py @@ -29,7 +29,7 @@ @unittest.skipIf(get_default_config() is None, "Skipping remote validation due to missing config") @unittest.skipIf( - not getdefault("remote_esql_validation")(), "Skipping remote validation because remote_esql_validation is False" + not getdefault("esql_validation")(), "Skipping ES|QL validation because esql_validation is False" ) class TestRemoteRules(BaseRuleTest): """Test rules against a remote Elastic stack instance.""" From a3fbb891bc972450ef9a439e908cbcf81a43f84e Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Wed, 27 May 2026 17:21:50 -0400 Subject: [PATCH 05/12] Update for index proper handling --- detection_rules/rule_validators.py | 10 +++++----- .../detectionrules/esqlvalidator/AnalyzerFactory.java | 9 ++++++++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index 93333001f73..7e5d711fbd3 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -887,7 +887,7 @@ def remote_validate_rule( # noqa: PLR0913 stack_version = get_latest_stack_version() self.log(f"Validating against {stack_version} stack") - indices_str, indices = self.get_esql_query_indices(query) # type: ignore[reportUnknownVariableType] + _indices_str, indices = self.get_esql_query_indices(query) # type: ignore[reportUnknownVariableType] self.log(f"Extracted indices from query: {', '.join(indices)}") event_dataset_integrations = get_esql_query_event_dataset_integrations(query) @@ -906,10 +906,10 @@ def remote_validate_rule( # noqa: PLR0913 self.log(f"Combined mappings prepared: {len(combined_mappings)}") # Validate the query locally via the embedded Java validator. The combined - # mapping covers every field across the rule's integrations, and the key - # matches the FROM clause exactly so the analyzer's IndexResolution lookup - # succeeds without any string rewriting. - validator_indices = {indices_str: {"properties": combined_mappings}} + # mapping covers every field across the rule's integrations. The key uses + # the same canonical no-whitespace comma-join the ES|QL parser produces, so + # the analyzer's IndexResolution lookup hits. + validator_indices = {",".join(indices): {"properties": combined_mappings}} query_columns, response = execute_query_against_indices( elastic_client, query, validator_indices, self.log ) diff --git a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java index 94d09729627..b2229406f1d 100644 --- a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java +++ b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java @@ -77,7 +77,14 @@ Analyzer build(ResolutionInputs inputs) { Map indexResolutions = new HashMap<>(); if (inputs.indices() != null) { for (Map.Entry> e : inputs.indices().entrySet()) { - indexResolutions.put(new IndexPattern(Source.EMPTY, e.getKey()), indexResolution(e.getKey(), e.getValue(), IndexMode.STANDARD)); + // The ES|QL parser canonicalizes multi-pattern FROM clauses by joining + // with "," (no whitespace) — see IdentifierBuilder.visitIndexPattern. + // IndexPattern.equals is a strict string compare, so we must match that + // canonical form or the analyzer's resolution lookup misses and emits + // "[none specified]". Accept caller keys with arbitrary whitespace and + // normalize here for robustness. + String key = e.getKey().replaceAll("\\s*,\\s*", ",").trim(); + indexResolutions.put(new IndexPattern(Source.EMPTY, key), indexResolution(key, e.getValue(), IndexMode.STANDARD)); } } From ed3fef9959a266533d7ded110aa9cf53faca43f3 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Wed, 27 May 2026 17:25:25 -0400 Subject: [PATCH 06/12] minor version bump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c7d46bb03ec..627e160ef4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.6.42" +version = "1.7.0" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12" From 023bcf8f267b3ee7b69654684a30ac239131b4c1 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Wed, 27 May 2026 17:30:23 -0400 Subject: [PATCH 07/12] Update text handling --- .../detectionrules/esqlvalidator/MappingLoader.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/MappingLoader.java b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/MappingLoader.java index ecec0f8b0c8..fbfd1cc6f49 100644 --- a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/MappingLoader.java +++ b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/MappingLoader.java @@ -130,6 +130,13 @@ private static DataType resolveType(Map content) { if ("constant_keyword".equals(typeName) || "wildcard".equals(typeName)) { return KEYWORD; } + // Text-family storage types that field_caps surfaces as plain "text" at + // search time. Mirror that here so callers can pass raw index mappings + // (e.g. ECS's `message: match_only_text`) without the analyzer marking + // them UNSUPPORTED — which would then reject perfectly valid queries. + if ("match_only_text".equals(typeName) || "annotated_text".equals(typeName)) { + return TEXT; + } Object metricsTypeParameter = content.get(TimeSeriesParams.TIME_SERIES_METRIC_PARAM); TimeSeriesParams.MetricType metricType = null; if (metricsTypeParameter instanceof String s) { From 1f0a5f40001a56e8fa84732ed5e91d16c590bd75 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Wed, 27 May 2026 19:22:40 -0400 Subject: [PATCH 08/12] Add support for COMPLETION rules --- detection_rules/esql_parser/validator.py | 46 +++++++++++++++++++ .../known_inference_endpoints.json | 10 ++++ .../esqlvalidator/AnalyzerFactory.java | 19 +++++++- .../detectionrules/esqlvalidator/Main.java | 17 ++++++- 4 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 lib/esql-validator/known_inference_endpoints.json diff --git a/detection_rules/esql_parser/validator.py b/detection_rules/esql_parser/validator.py index e29a4e39eb1..6382f854d76 100644 --- a/detection_rules/esql_parser/validator.py +++ b/detection_rules/esql_parser/validator.py @@ -23,6 +23,35 @@ DEFAULT_REPO_ROOT = Path(__file__).resolve().parents[2] DEFAULT_VALIDATOR_DIR = DEFAULT_REPO_ROOT / "lib" / "esql-validator" DEFAULT_ES_HOME = Path("/tmp/elasticsearch") +DEFAULT_INFERENCE_ENDPOINTS_FILE = DEFAULT_VALIDATOR_DIR / "known_inference_endpoints.json" + + +def _merge_inference_endpoints( + defaults: list[dict[str, str]], overrides: list[dict[str, str]] | None +) -> list[dict[str, str]]: + """Merge default and per-call inference endpoints; later entries win on inference_id.""" + by_id: dict[str, dict[str, str]] = {e["inference_id"]: e for e in defaults} + for entry in overrides or (): + if "inference_id" in entry and "task_type" in entry: + by_id[entry["inference_id"]] = entry + return list(by_id.values()) + + +def _load_default_inference_endpoints(path: Path) -> list[dict[str, str]]: + """Read the bundled whitelist of known-valid inference endpoints, if present.""" + # Missing or malformed file is non-fatal: callers can still pass endpoints + # explicitly via validate(..., inference_endpoints=...). + if not path.exists(): + return [] + try: + data = json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return [] + out: list[dict[str, str]] = [] + for entry in data.get("endpoints", []): + if isinstance(entry, dict) and "inference_id" in entry and "task_type" in entry: + out.append({"inference_id": entry["inference_id"], "task_type": entry["task_type"]}) + return out class ValidationError(Exception): @@ -88,6 +117,7 @@ def __init__( request_timeout: float = 30.0, build_if_missing: bool = True, heap_size: str | None = "512m", + default_inference_endpoints: list[dict[str, str]] | None = None, ) -> None: self.validator_dir = Path(validator_dir or DEFAULT_VALIDATOR_DIR) self.es_home = Path(es_home or os.environ.get("ES_HOME") or DEFAULT_ES_HOME) @@ -97,6 +127,15 @@ def __init__( self.build_if_missing = build_if_missing # Cap JVM heap so long-running daemons in bulk validation don't grow unbounded. self.heap_size = heap_size + # Whitelist of inference endpoints to register on every validate() call. The + # daemon has no live cluster to resolve `.gp-llm-v2-completion` and similar, + # so we feed in a known-valid set from known_inference_endpoints.json. Pass + # an explicit list to override (e.g. for tests); pass [] to disable. + if default_inference_endpoints is None: + default_inference_endpoints = _load_default_inference_endpoints( + self.validator_dir / "known_inference_endpoints.json" + ) + self.default_inference_endpoints = default_inference_endpoints self._proc: subprocess.Popen[bytes] | None = None self._lock = threading.Lock() @@ -191,12 +230,16 @@ def validate( indices: dict[str, dict[str, Any]] | None = None, lookup_indices: dict[str, dict[str, Any]] | None = None, enrich_policies: list[dict[str, Any]] | None = None, + inference_endpoints: list[dict[str, str]] | None = None, params: list[Any] | None = None, ) -> ValidationResult: """Parse and verify an ES|QL query.""" # indices: {pattern: es_mapping} for FROM targets, e.g. {"logs": {"properties": ...}}. # lookup_indices: same shape, for LOOKUP JOIN targets. # enrich_policies: list of {name, policy_type, match_field, index, mapping}. + # inference_endpoints: list of {inference_id, task_type}. Merged with the + # bundled whitelist (see default_inference_endpoints); per-call entries win + # on inference_id collision. # params: positional query params (?). request_id = str(next(self._counter)) payload: dict[str, Any] = {"id": request_id, "query": query} @@ -206,6 +249,9 @@ def validate( payload["lookup_indices"] = lookup_indices if enrich_policies: payload["enrich_policies"] = enrich_policies + merged_inference = _merge_inference_endpoints(self.default_inference_endpoints, inference_endpoints) + if merged_inference: + payload["inference_endpoints"] = merged_inference if params: payload["params"] = params diff --git a/lib/esql-validator/known_inference_endpoints.json b/lib/esql-validator/known_inference_endpoints.json new file mode 100644 index 00000000000..70d81636bb6 --- /dev/null +++ b/lib/esql-validator/known_inference_endpoints.json @@ -0,0 +1,10 @@ +{ + "_comment": "Inference endpoints that ES|QL queries may reference. The validator daemon doesn't have a live cluster to resolve these, so we whitelist the well-known Elastic-managed ones here. Add more entries as new rules need them. `task_type` values must match org.elasticsearch.inference.TaskType (case-insensitive): TEXT_EMBEDDING, SPARSE_EMBEDDING, RERANK, COMPLETION, CHAT_COMPLETION, EMBEDDING, ANY.", + "endpoints": [ + { + "inference_id": ".gp-llm-v2-completion", + "task_type": "COMPLETION", + "_note": "Elastic-managed General Purpose LLM v2 (gp-llm-v2). Used by COMPLETION command in LLM-backed detection rules." + } + ] +} diff --git a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java index b2229406f1d..61b22b0d5e0 100644 --- a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java +++ b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java @@ -15,6 +15,8 @@ import org.elasticsearch.xpack.esql.analysis.EnrichResolution; import org.elasticsearch.xpack.esql.analysis.UnmappedResolution; import org.elasticsearch.xpack.esql.analysis.Verifier; +import org.elasticsearch.xpack.esql.inference.ResolvedInference; +import org.elasticsearch.inference.TaskType; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; @@ -62,7 +64,8 @@ EsqlFunctionRegistry functionRegistry() { record ResolutionInputs( Map> indices, Map> lookupIndices, - List enrichPolicies + List enrichPolicies, + List inferenceEndpoints ) {} record EnrichPolicyInput( @@ -73,6 +76,8 @@ record EnrichPolicyInput( Map mapping ) {} + record InferenceEndpointInput(String inferenceId, String taskType) {} + Analyzer build(ResolutionInputs inputs) { Map indexResolutions = new HashMap<>(); if (inputs.indices() != null) { @@ -109,6 +114,16 @@ Analyzer build(ResolutionInputs inputs) { } } + InferenceResolution.Builder inferenceBuilder = InferenceResolution.builder(); + if (inputs.inferenceEndpoints() != null) { + for (InferenceEndpointInput ep : inputs.inferenceEndpoints()) { + // Unknown task_type strings turn into a structured request_error rather + // than a silent fallback — easier to spot a typo in the whitelist file. + TaskType taskType = TaskType.valueOf(ep.taskType().toUpperCase(Locale.ROOT)); + inferenceBuilder.withResolvedInference(new ResolvedInference(ep.inferenceId(), taskType)); + } + } + Configuration cfg = buildConfiguration(); AnalyzerContext ctx = new AnalyzerContext( cfg, @@ -117,7 +132,7 @@ Analyzer build(ResolutionInputs inputs) { indexResolutions, lookupResolutions, enrichResolution, - InferenceResolution.builder().build(), + inferenceBuilder.build(), TransportVersion.current(), QuerySettings.UNMAPPED_FIELDS.defaultValue() ); diff --git a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java index 6640af099ab..e37dd334cbd 100644 --- a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java +++ b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/Main.java @@ -135,7 +135,7 @@ private static String handle(String line, EsqlParser parser, InferenceSettings i // Stage 2: verify (analyze). try { Analyzer analyzer = analyzerFactory.build(new AnalyzerFactory.ResolutionInputs( - req.indices, req.lookupIndices, req.enrichPolicies)); + req.indices, req.lookupIndices, req.enrichPolicies, req.inferenceEndpoints)); LogicalPlan analyzed = analyzer.analyze(parsed); String planText = analyzed.toString(); List outputAttrs = analyzed.output(); @@ -298,6 +298,7 @@ static final class Request { Map> indices; Map> lookupIndices; List enrichPolicies; + List inferenceEndpoints; List params; boolean shutdown; boolean ping; @@ -335,6 +336,20 @@ static Request parse(String line) throws Exception { } } } + Object inference = raw.get("inference_endpoints"); + if (inference instanceof List il) { + r.inferenceEndpoints = new ArrayList<>(); + for (Object o : il) { + if (o instanceof Map m) { + Map em = (Map) m; + String inferenceId = (String) em.get("inference_id"); + String taskType = (String) em.get("task_type"); + if (inferenceId != null && taskType != null) { + r.inferenceEndpoints.add(new AnalyzerFactory.InferenceEndpointInput(inferenceId, taskType)); + } + } + } + } Object params = raw.get("params"); if (params instanceof List pl) { r.params = new ArrayList<>(pl); From 1c2aea1cae20128cf0f01d6cfede7afbd9880e4f Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Wed, 27 May 2026 21:30:54 -0400 Subject: [PATCH 09/12] Add multi field support --- detection_rules/index_mappings.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/detection_rules/index_mappings.py b/detection_rules/index_mappings.py index 8261f1e02b0..fb7fcef04c0 100644 --- a/detection_rules/index_mappings.py +++ b/detection_rules/index_mappings.py @@ -396,6 +396,12 @@ def get_ecs_schema_mappings(current_version: Version) -> dict[str, Any]: if info["type"] == "scaled_float": ecs_schema_scaled_floats.update({index: info["scaling_factor"]}) ecs_schema_flattened.update({index: info["type"]}) + # Expand ECS multi-fields (e.g. process.command_line.text). The ECS flat + # schema records them under each field's "multi_fields", but the iteration + # above only copies "type" — without this step, queries that reference a + # subfield like `process.command_line.text` hit "Unknown column". + for sub in info.get("multi_fields", []): + ecs_schema_flattened[f"{index}.{sub['name']}"] = sub["type"] ecs_schema = utils.convert_to_nested_schema(ecs_schema_flattened) for index, info in ecs_schema_scaled_floats.items(): parts = index.split(".") From 3ea6fdb7a0ecb79978bbbd4f46b8234b0d5f0b2a Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Wed, 27 May 2026 21:42:13 -0400 Subject: [PATCH 10/12] Add endpoint schemas, stack inherited them automatically --- detection_rules/index_mappings.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/detection_rules/index_mappings.py b/detection_rules/index_mappings.py index fb7fcef04c0..203444ce080 100644 --- a/detection_rules/index_mappings.py +++ b/detection_rules/index_mappings.py @@ -457,6 +457,11 @@ def prepare_mappings( # noqa: PLR0913 # These need to be handled separately as we need to be able to validate non-ecs fields as a whole # and also at a per index level as custom schemas can override non-ecs fields and/or indices non_ecs_schema = ecs.flatten(non_ecs_schema) + # Merge in Elastic Endpoint extension fields (process.Ext.*, file.Ext.*, dll.Ext.*, ...). + # The KQL/EQL paths in ecs.py already include these; ES|QL queries against + # logs-endpoint.* or .alerts-security.* legitimately reference them too and would + # otherwise hit "Unknown column" even though the field is valid on real indices. + non_ecs_schema.update(ecs.flatten(ecs.get_endpoint_schemas())) non_ecs_schema = utils.convert_to_nested_schema(non_ecs_schema) non_ecs_schema = prune_mappings_of_unsupported_types("non-ecs", non_ecs_schema, log) From 59960431222f26ad14923679c42bcdbf2d33b90a Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Thu, 28 May 2026 10:56:13 -0400 Subject: [PATCH 11/12] Update docstring --- tests/test_esql_parser.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/test_esql_parser.py b/tests/test_esql_parser.py index 6b336b66848..9094121eab6 100644 --- a/tests/test_esql_parser.py +++ b/tests/test_esql_parser.py @@ -29,19 +29,22 @@ class TestEsqlValidator(unittest.TestCase): @classmethod def setUpClass(cls) -> None: + """Spawn one JVM daemon and reuse it across every test method in the class.""" cls.validator = EsqlValidator(build_if_missing=os.environ.get("DR_ESQL_BUILD") == "1") cls.validator.start() @classmethod def tearDownClass(cls) -> None: + """Reap the JVM daemon after the last test runs.""" cls.validator.stop() def test_ping(self) -> None: - # Smoke test of the JSON round-trip path. + """Smoke-test the stdin/stdout JSON round-trip with a trivial query.""" result = self.validator.validate("FROM idx", indices={"idx": {"properties": {"a": {"type": "long"}}}}) self.assertEqual(result.status, "ok") def test_valid_query_returns_plan(self) -> None: + """A well-formed query returns ok with a populated analyzed-plan text.""" result = self.validator.validate( "FROM logs | WHERE foo == 1 | LIMIT 5", indices={"logs": {"properties": {"foo": {"type": "integer"}}}}, @@ -51,6 +54,7 @@ def test_valid_query_returns_plan(self) -> None: self.assertIn("EsRelation[logs]", result.plan) def test_parse_error_includes_position(self) -> None: + """Syntax errors surface as parse_error with structured line/column.""" result = self.validator.validate("FROM logs | WAT") self.assertEqual(result.status, "parse_error") self.assertGreaterEqual(len(result.errors), 1) @@ -60,6 +64,7 @@ def test_parse_error_includes_position(self) -> None: self.assertGreater(err.column or 0, 0) def test_unknown_field_is_verify_error(self) -> None: + """References to fields missing from the supplied mapping become verify_error.""" result = self.validator.validate( "FROM logs | WHERE missing_field == 1", indices={"logs": {"properties": {"foo": {"type": "integer"}}}}, @@ -71,7 +76,7 @@ def test_unknown_field_is_verify_error(self) -> None: self.assertEqual(err.line, 1) def test_type_mismatch_is_verify_error(self) -> None: - # Comparing a keyword field with a number — Verifier should flag this. + """Comparing a keyword field with a number is flagged by the Verifier.""" result = self.validator.validate( 'FROM logs | WHERE name == 1', indices={"logs": {"properties": {"name": {"type": "keyword"}}}}, @@ -80,12 +85,13 @@ def test_type_mismatch_is_verify_error(self) -> None: self.assertTrue(any("name" in e.message for e in result.errors), msg=result.errors) def test_raise_for_status(self) -> None: + """raise_for_status() turns any non-ok result into a ValidationError.""" result = self.validator.validate("FROM x | WAT") with self.assertRaises(ValidationError): result.raise_for_status() def test_multiple_round_trips_share_daemon(self) -> None: - # Verify the long-running daemon doesn't break across many calls. + """The long-running daemon stays healthy across many sequential calls.""" mapping = {"logs": {"properties": {"foo": {"type": "integer"}}}} for i in range(10): r = self.validator.validate(f"FROM logs | LIMIT {i + 1}", indices=mapping) From 22c81d726c13099727d084bd3f9f891b75bc4d53 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Wed, 3 Jun 2026 15:42:36 -0400 Subject: [PATCH 12/12] minor update --- .../elastic/detectionrules/esqlvalidator/AnalyzerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java index 61b22b0d5e0..da652397f29 100644 --- a/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java +++ b/lib/esql-validator/src/main/java/co/elastic/detectionrules/esqlvalidator/AnalyzerFactory.java @@ -50,7 +50,7 @@ final class AnalyzerFactory { private final EsqlFunctionRegistry functionRegistry = new EsqlFunctionRegistry(); - private final PromqlFunctionRegistry promqlFunctionRegistry = new PromqlFunctionRegistry(); + private final PromqlFunctionRegistry promqlFunctionRegistry = PromqlFunctionRegistry.INSTANCE; private final XPackLicenseState licenseState = new XPackLicenseState(() -> System.currentTimeMillis()); private final Verifier verifier = new Verifier( new Metrics(functionRegistry, /*isSnapshot*/ true, /*isServerless*/ true),