Make the DAG processor access metadata exclusively through the API server#67878
Make the DAG processor access metadata exclusively through the API server#67878kaxil wants to merge 7 commits into
Conversation
a536afc to
47d927f
Compare
I am wary of giving the dag processor the ability to mint any tokens at all -- given it runs user code this seems like it's a huge security risk 🤔 |
| @router.post("/jobs", status_code=201) | ||
| def register_job(body: JobRegisterBody) -> dict: | ||
| """Register the processor's liveness Job row (server-side) and return its id.""" | ||
| job = Job() | ||
| job.job_type = body.job_type | ||
| with create_session() as session: | ||
| job.prepare_for_execution(session=session) | ||
| return {"job_id": job.id} | ||
|
|
||
|
|
||
| @router.post("/jobs/{job_id}/heartbeat") | ||
| def job_heartbeat(job_id: int) -> dict: | ||
| """Update the processor Job's latest_heartbeat so the health check sees it alive.""" | ||
| with create_session() as session: | ||
| job = session.get(Job, job_id) | ||
| if job is None: | ||
| raise HTTPException(status_code=404, detail="Job not found") | ||
| job.latest_heartbeat = timezone.utcnow() | ||
| session.merge(job) | ||
| return {"alive": True} | ||
|
|
||
|
|
||
| @router.post("/jobs/{job_id}/complete") | ||
| def complete_job(job_id: int, body: JobCompleteBody) -> dict: | ||
| """Record the processor Job's terminal state and end time.""" | ||
| with create_session() as session: | ||
| job = session.get(Job, job_id) | ||
| if job is not None: | ||
| job.end_date = timezone.utcnow() | ||
| job.state = body.state | ||
| session.merge(job) | ||
| return {"completed": True} |
There was a problem hiding this comment.
I'm not sure we want to "encode" this into the API -- mostly I'm not sure that it is a good pattern that we want to follow, especially for things like "static" dags which don't need re-parsing.
c7295a6 to
d58de85
Compare
| # The Execution API is task-instance scoped: its ``sub`` is validated as a UUID. The DAG processor | ||
| # is not a task instance, so its token carries an all-zero sentinel UUID rather than a real id. | ||
| DAG_PROCESSOR_TOKEN_SUBJECT = "00000000-0000-0000-0000-000000000000" |
There was a problem hiding this comment.
This feels like too much of a hack. There is nothing in JWT that says the sub claim must be a UUID, that is just our choice, so I think for dag processing the sub should be something else.
There was a problem hiding this comment.
Agreed it's a hack. It's a UUID only because the processor reuses the Execution API for parse-time Connection/Variable reads, and those routes go through CurrentTIToken -> TIToken(id: UUID) (the id also feeds team scoping), so a non-UUID sub is rejected before the route runs. The all-zero value is a stand-in non-TI principal, and the one token currently carries both the execution and dag-processing audiences.
The DAG Processing validator accepts any sub, so the UUID is only needed on the Execution side. Two ways to give dag processing a real sub:
- Two tokens: a dag-processing token with
sub=dag-processorand a separate execution token that keeps the UUID (the Execution API genuinely is TI-scoped). - Generalise the Execution principal so the read-only conn/var routes accept a non-TI
sub.
I lean to (2) if you're open to it (one token, no sentinel anywhere); otherwise I'll do (1). Which would you prefer?
There was a problem hiding this comment.
sub=dag-processor -- this leads to an interesting connudrum. I'd really like to be able to tie requests to an Connection or Variable to a specific file, which means some kind of token exchange to get a per-file scoped token.
I think the point about CurrentTIToken leads to a more sailient design question though: is it right to use the Execution API for this. The "Execution" part of the API is not really true for dag parsing. I also don't want us to have to duplicate things into the /dag-processing/ API (I'm already not happy with how much we have to duplicate from the public API to the Execution API, doing that a third time makes me sad.
I'm in favour of 2 generally, and probably the TIClaims and TIToken classes are a mistake/overly specific naming. Nothing seems to look at TIClaims that I can see.
$ rg 'token\.' airflow-core/src/airflow/api_fastapi/execution_api
airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
130: parent_ti = session.get(TaskInstance, token.id)
airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
55: token.id,
airflow-core/src/airflow/api_fastapi/execution_api/routes/connections.py
38: token.id,
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
47: token.id,
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
318: if token.claims.scope == "workload":
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/token.py
32: Validated JWT claims for a task identity token.
airflow-core/src/airflow/api_fastapi/execution_api/security.py
97: dedup or Cadwyn replays) return the cached token.
160: token_scope = token.claims.scope
187: if str(token.id) != ti_self_id:
239: return await session.scalar(_team_name_for_ti_stmt(token.id))
Also the "per-team connection" in exec API currently wouldn't work anyway as the team is looked up from task_instance -> dag_model -> dag_bundle -> dag_bundle.teams, so that wouldn't work with a fake uuid anyway.
…rver The standalone DAG processor no longer connects to the metadata database. It persists parse results and reads all metadata through the DAG Processing API (AIP-92): a single DagProcessingApiClient routes persistence, bundle state and sync, stale-DAG and warning sweeps, priority-parse and callback claims, and the processor Job lifecycle, with no ORM session in the manager. Bundle-initialization connection/variable reads resolve through the Execution API (the same path workers and triggerers use), so a git connection stored in the metadata database keeps working without direct DB access. The processor parses user code, so it does not hold the signing key or mint its own token: it presents a bearer token the deployment provisions, read from [dag_processor] api_token_path, to both the DAG Processing and Execution APIs. In standalone (a trusted launcher that already holds the signing key), the token is minted and provisioned automatically. Per-loop API calls are guarded so a transient API outage skips a cycle instead of crashing the processor, the heartbeat is throttled, the client retries transient failures, and startup waits for API readiness.
Mount a /dag-processing FastAPI app on the API server with the endpoints the DAG processor persists through: parsing-results, bundle reconcile/state/sync, stale-dags, purge-warnings, priority-parse and callback claim, and Job register/heartbeat/complete. Split into app.py (routes), datamodels.py, and security.py, matching the execution_api layout. The endpoints validate the bearer token the processor presents via JWTBearer, using the same get_sig_validation_args path as the Execution API, so [api_auth] trusted_jwks_url applies equally. /health is open for readiness probes.
The standalone DAG processor now requires an API server that mounts the dag-processing app, and a deployment-provisioned bearer token. List dag-processing in the api-server --apps options, note the requirement in the web-stack docs, and add a significant newsfragment for the breaking change.
The DAG processor parses user code, so it must not hold the JWT signing key or mint its own token. It now carries a bearer token a trusted component provisions to the file at [dag_processor] api_token_path and only reads that file, re-reading it as the token is rotated so a refreshed token is picked up without a restart. - DagProcessingApiClient reads its token via a callable bearer-auth on each request (short cache plus a 401-triggered re-read), and the Execution API client used for parse-time Connection/Variable reads is re-read per parser spawn. - airflow.api_fastapi.auth.dag_processor_token mints the dual-audience token. It signs with whatever get_signing_args resolves (symmetric secret or an asymmetric private key), so a JWKS-based control plane validates externally-issued tokens unchanged. - 'airflow provision-dag-processor-token' is the trusted minter CLI; airflow standalone now provisions through it. [dag_processor] jwt_expiration_time sets the token lifetime.
The DAG processor needs a bearer token but must not hold the signing key. These deployments now mint the token in a trusted step and share it with the processor: - Helm: an init container (which holds the signing key) mints the token into a shared emptyDir; the dag-processor container reads it read-only and is not given the key. Toggle with dagProcessor.apiToken.provisionViaInitContainer. - docker-compose (quick-start docs and task-sdk integration tests): airflow-init mints the token to a shared volume that the dag-processor reads.
The test ran the manager three times expecting one parse per run, relying on dag_path.touch() to beat the default 30s min_file_process_interval via an mtime comparison. Under load that filesystem-granularity race left a run waiting out the interval and tripping the per-test timeout. Pin min_file_process_interval=0 so each run re-parses unconditionally; the touch is no longer needed.
d58de85 to
327717c
Compare
| yield request | ||
|
|
||
|
|
||
| class DagProcessingApiClient: |
There was a problem hiding this comment.
Would it be beneficial to use API data models declared in datamodels.py here as type hints? Otherwise those are just free-form payloads and it's anyone's guess if they are current.
…stname The /dag-processing /jobs endpoint built the Job server-side, so it recorded the API server's hostname/pid rather than the processor's. The dag-processor health check (airflow jobs check --job-type DagProcessorJob --hostname <host>) filters by the processor's hostname, so it never matched the row and 'docker compose up --wait' timed out waiting for the processor to become healthy (the e2e and remote-logging PROD-image tests). The processor now reports its hostname/unixname/pid when registering and the endpoint records them, restoring the in-process behaviour.
Once this PR is merged, the standalone DAG processor (
airflow dag-processor) no longer connects to the metadatadatabase directly. It persists parse results and reads all metadata through the API server, the
same way workers already operate. This removes one of the last few components that
runs user-adjacent code while also holding a direct database connection.
Persistence (serialized DAGs, import errors, warnings), stale-DAG and orphaned-import-error
reconciliation, bundle sync and state, priority-parse-request and callback claiming, and the
processor's own
Jobliveness record all go through a new/dag-processingAPI app. Parse-timeand bundle-initialization
Connection/Variablereads resolve through the Execution API.What changed
/dag-processingFastAPI sub-app mounted on the API server(
airflow.api_fastapi.dag_processing), split intoapp.py(routes),datamodels.py, andsecurity.py.DagProcessingApiClient(httpx) used by the processor: pooled, with bounded retry/backoffand a startup readiness wait.
DagFileProcessorManagerroutes all persistence and metadata reads through the client.Bundle-initialization credentials resolve through the Execution API (the same path workers and
triggerers use), so a git connection stored in the metadata database keeps working without
direct DB access.
[core] dag_processing_api_server_url(defaults to the/dag-processingmount ofthe configured API server) and
[dag_processor] jwt_audience.Design notes
[dag_processor] jwt_audiencewith thedeployment signing key, and the endpoints validate it via
JWTBearer. Validation goes throughthe same
get_sig_validation_argspath as the Execution API, so a deployment that configures[api_auth] trusted_jwks_urlvalidates externally-issued tokens for/dag-processingexactlyas it does for
/execution./healthstays unauthenticated for readiness probes.of crashing the processor, the heartbeat is throttled, and startup waits for API readiness.
Config
Future Work
Ideally, the entire client side of the dag-processor will be moved to task-sdk in follow-up PRs.