-
Notifications
You must be signed in to change notification settings - Fork 17.2k
Make the DAG processor access metadata exclusively through the API server #67878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
kaxil
wants to merge
7
commits into
apache:main
Choose a base branch
from
astronomer:dag-processor-api-persistence
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
4eff3ac
Make the DAG processor access metadata exclusively through the API se…
kaxil 71025e4
Add the DAG Processing API server app (AIP-92)
kaxil 19abc7c
Document the DAG processor's API-server requirement
kaxil 6a25dca
Provision the DAG processor's API token instead of self-signing it
kaxil b99b667
Provision the DAG processor token in the Helm chart and docker-compose
kaxil 327717c
Stabilize test_stats_total_parse_time against the reparse interval
kaxil 5e76e70
Register the DAG processor's liveness Job with the processor's own ho…
kaxil File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| DAG processor now reads and writes metadata exclusively through the API server | ||
|
|
||
| The standalone DAG processor (``airflow dag-processor``) no longer connects to the metadata | ||
| database directly. It persists parse results and reads all metadata through the API server, | ||
| mirroring how workers and triggerers already operate. | ||
|
|
||
| **What changed:** | ||
|
|
||
| - The DAG processor process opens no metadata-database connection. Persistence (serialized | ||
| DAGs, import errors, warnings), stale-DAG and orphaned-import-error reconciliation, bundle | ||
| synchronization and state, priority-parse-request and callback claiming, and the processor's | ||
| own ``Job`` liveness record all go through the API server. | ||
| - Parse-time ``Connection``/``Variable``/``XCom`` reads from DAG files resolve through the | ||
| Execution API rather than an in-process app backed by the local database. | ||
|
|
||
| **Behaviour changes:** | ||
|
|
||
| - An API server must be reachable for the DAG processor to run. A deployment that previously | ||
| ran ``airflow dag-processor`` with only a database connection (no API server) must now also | ||
| run the API server. | ||
| - The API server fronting the DAG processor must run the ``dag-processing`` app. Start it with | ||
| ``airflow api-server --apps all`` or include ``dag-processing`` explicitly; a subset such as | ||
| ``--apps core,execution`` omits it and the DAG processor cannot run. | ||
| - The DAG processor authenticates to the API server with a bearer token the deployment | ||
| provisions; it does not mint its own. Because it parses user code, it is not given the signing | ||
| key. A trusted component mints the token and writes it to the file referenced by | ||
| ``[dag_processor] api_token_path``; the processor only reads that file, re-reading it as the | ||
| token is rotated, so a refreshed token is picked up without a restart. | ||
| - ``airflow provision-dag-processor-token`` mints the token and writes it to that file. The | ||
| bundled Helm chart (an init container) and docker-compose example run it for you; in a custom | ||
| deployment, run it from a trusted context that holds the signing key before the processor starts, | ||
| and re-run it before ``[dag_processor] jwt_expiration_time`` elapses to rotate the token. | ||
|
|
||
| **Configuration:** | ||
|
|
||
| - ``[core] dag_processing_api_server_url`` selects the DAG Processing API endpoint. It defaults | ||
| to the ``/dag-processing`` mount on the configured API server (``{BASE_URL}/dag-processing``), | ||
| so deployments that already run the API server need no extra configuration. Set it to point | ||
| the DAG processor at a different host. | ||
| - ``[dag_processor] api_token_path`` is the path to a file holding the bearer token the DAG | ||
| processor presents to the API server (for both the DAG Processing and Execution APIs). | ||
| - ``[dag_processor] jwt_expiration_time`` is the lifetime of the minted token (default 24h); the | ||
| provisioning step should re-mint before it elapses. | ||
|
|
||
| **Migration:** | ||
|
|
||
| - Ensure the API server is running and reachable from the DAG processor. If the DAG processor | ||
| runs on a host without the default API server, set ``[core] dag_processing_api_server_url`` | ||
| (and ``[core] execution_api_server_url`` for parse-time metadata reads) to its address. | ||
| - Provision the DAG processor's token (``airflow provision-dag-processor-token``) from a trusted | ||
| context that holds the signing key, and point the processor at it with | ||
| ``[dag_processor] api_token_path``. The Helm chart and docker-compose example do this by default. | ||
|
|
||
| * Types of change | ||
|
|
||
| * [ ] Dag changes | ||
| * [x] Config changes | ||
| * [x] CLI changes | ||
| * [x] Behaviour changes | ||
| * [ ] Plugin changes | ||
| * [ ] Dependency changes | ||
| * [x] Code interface changes | ||
| * [ ] API changes |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
103 changes: 103 additions & 0 deletions
103
airflow-core/src/airflow/api_fastapi/auth/dag_processor_token.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| """ | ||
| Mint and provision the bearer token the DAG processor presents to the API server (AIP-92). | ||
|
|
||
| The DAG processor parses (and forks) user code, so it must never hold the deployment signing key | ||
| or mint its own token. A *trusted* component runs the helpers here -- the deployment's provisioning | ||
| step (a Helm init container, a docker-compose init service) or ``airflow standalone`` -- mints the | ||
| token and writes it to ``[dag_processor] api_token_path``. The processor only ever reads that file | ||
| (re-reading it as it is rotated), so it carries a token without being able to forge one. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| import os | ||
| from pathlib import Path | ||
|
|
||
| from airflow.api_fastapi.auth.tokens import JWTGenerator, get_signing_args | ||
| from airflow.configuration import conf | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
| # The Execution API is task-instance scoped: its ``sub`` is validated as a UUID. The DAG processor | ||
| # is not a task instance, so its token carries an all-zero sentinel UUID rather than a real id. | ||
| DAG_PROCESSOR_TOKEN_SUBJECT = "00000000-0000-0000-0000-000000000000" | ||
|
|
||
|
|
||
| def mint_dag_processor_token( | ||
| *, | ||
| valid_for: int | None = None, | ||
| make_secret_key_if_needed: bool = False, | ||
| ) -> str: | ||
| """ | ||
| Mint the bearer token the DAG processor presents to the API server. | ||
|
|
||
| Trusted callers only. The token carries *both* the Execution and DAG Processing audiences (the | ||
| processor calls both APIs) under :data:`DAG_PROCESSOR_TOKEN_SUBJECT`. Signing uses whatever | ||
| ``get_signing_args`` resolves -- the symmetric ``[api_auth] jwt_secret`` or, where configured, an | ||
| asymmetric private key -- so the same minting works for a single deployment or a JWKS-based | ||
| control plane without change here. | ||
|
|
||
| :param valid_for: token lifetime in seconds; defaults to ``[dag_processor] jwt_expiration_time``. | ||
| :param make_secret_key_if_needed: generate a signing key if none is configured. Leave ``False`` | ||
| in real deployments (the key must be the shared one the API server validates with); only | ||
| ``airflow standalone``, which materialises that shared key itself, passes ``True``. | ||
| """ | ||
| audiences = [ | ||
| conf.get_mandatory_list_value("execution_api", "jwt_audience")[0], | ||
| conf.get_mandatory_list_value("dag_processor", "jwt_audience")[0], | ||
| ] | ||
| generator = JWTGenerator( | ||
| valid_for=valid_for if valid_for is not None else conf.getint("dag_processor", "jwt_expiration_time"), | ||
| # A JWT ``aud`` may be a list; the generator's hint is single-audience, but the processor | ||
| # presents this one token to both the Execution and DAG Processing APIs. | ||
| audience=audiences, # type: ignore[arg-type] | ||
| issuer=conf.get("api_auth", "jwt_issuer", fallback=None), | ||
| **get_signing_args(make_secret_key_if_needed=make_secret_key_if_needed), | ||
| ) | ||
| return generator.generate({"sub": DAG_PROCESSOR_TOKEN_SUBJECT}) | ||
|
|
||
|
|
||
| def provision_dag_processor_token_file( | ||
| output: str | os.PathLike[str] | None = None, | ||
| *, | ||
| valid_for: int | None = None, | ||
| make_secret_key_if_needed: bool = False, | ||
| ) -> str: | ||
| """ | ||
| Mint a DAG processor token and write it to ``output`` (or ``[dag_processor] api_token_path``). | ||
|
|
||
| The token is written atomically (temp file then ``rename``) so the processor, which re-reads the | ||
| file, never observes a half-written token. Re-run before the token expires to rotate it in place. | ||
|
|
||
| :return: the path written. | ||
| """ | ||
| target = output or conf.get("dag_processor", "api_token_path", fallback=None) | ||
| if not target: | ||
| raise ValueError( | ||
| "No output path for the DAG processor token: pass output or set [dag_processor] api_token_path." | ||
| ) | ||
| token = mint_dag_processor_token(valid_for=valid_for, make_secret_key_if_needed=make_secret_key_if_needed) | ||
| path = Path(target) | ||
| path.parent.mkdir(parents=True, exist_ok=True) | ||
| tmp_path = path.with_name(f"{path.name}.tmp") | ||
| tmp_path.write_text(token) | ||
| os.replace(tmp_path, path) | ||
| log.info("Wrote DAG processor API token to %s", path) | ||
| return str(path) | ||
25 changes: 25 additions & 0 deletions
25
airflow-core/src/airflow/api_fastapi/dag_processing/__init__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| """ | ||
| DAG Processing API (AIP-92). | ||
|
|
||
| A FastAPI sub-app that lets the DAG processor persist parse results without a | ||
| direct metadata-DB connection. The processor parses files locally and POSTs the | ||
| results here; this app owns the DB writes. | ||
| """ | ||
|
|
||
| from __future__ import annotations |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like too much of a hack. There is nothing in JWT that says the
subclaim must be a UUID, that is just our choice, so I think for dag processing the sub should be something else.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed it's a hack. It's a UUID only because the processor reuses the Execution API for parse-time
Connection/Variablereads, and those routes go throughCurrentTIToken->TIToken(id: UUID)(theidalso feeds team scoping), so a non-UUIDsubis rejected before the route runs. The all-zero value is a stand-in non-TI principal, and the one token currently carries both the execution and dag-processing audiences.The DAG Processing validator accepts any
sub, so the UUID is only needed on the Execution side. Two ways to give dag processing a realsub:sub=dag-processorand a separate execution token that keeps the UUID (the Execution API genuinely is TI-scoped).sub.I lean to (2) if you're open to it (one token, no sentinel anywhere); otherwise I'll do (1). Which would you prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sub=dag-processor-- this leads to an interesting connudrum. I'd really like to be able to tie requests to an Connection or Variable to a specific file, which means some kind of token exchange to get a per-file scoped token.I think the point about
CurrentTITokenleads to a more sailient design question though: is it right to use the Execution API for this. The "Execution" part of the API is not really true for dag parsing. I also don't want us to have to duplicate things into the /dag-processing/ API (I'm already not happy with how much we have to duplicate from the public API to the Execution API, doing that a third time makes me sad.I'm in favour of 2 generally, and probably the
TIClaimsandTITokenclasses are a mistake/overly specific naming. Nothing seems to look at TIClaims that I can see.Also the "per-team connection" in exec API currently wouldn't work anyway as the team is looked up from task_instance -> dag_model -> dag_bundle -> dag_bundle.teams, so that wouldn't work with a fake uuid anyway.