Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,24 @@ The ``DagFileProcessorManager`` runs user codes. As a result, it runs as a stand
4. Return DagBag: Provide the ``DagFileProcessorManager`` a list of the discovered Dag objects


Communicating with the API server
----------------------------------

The Dag processor does not connect to the metadata database directly. It persists parse results and
reads metadata (including parse-time ``Connection``, ``Variable`` and ``XCom`` values) through the
API server, so an API server running the ``dag-processing`` app must be reachable. Start it with
``airflow api-server --apps all`` or include ``dag-processing`` explicitly; a subset such as
``--apps core,execution`` omits it and the Dag processor cannot run.

Because the Dag processor parses user code, it must not hold the signing key or mint its own token.
A trusted component mints a bearer token and writes it to the file named by
:ref:`config:dag_processor__api_token_path`; the Dag processor only reads that file, re-reading it
as the token is rotated, so a refreshed token is picked up without a restart. Mint the token with
``airflow provision-dag-processor-token`` from a trusted context that holds the signing key, and
re-run it before :ref:`config:dag_processor__jwt_expiration_time` elapses. The official Helm chart
(via an init container) and the docker-compose example do this for you.


Fine-tuning your Dag processor performance
------------------------------------------

Expand Down
24 changes: 20 additions & 4 deletions airflow-core/docs/administration-and-deployment/web-stack.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,41 @@ with the new prefix.
Separating API Servers
-----------------------

By default, both the Core API Server and the Execution API Server are served together:
By default, all applications are served together:

.. code-block:: bash

airflow api-server
# same as
airflow api-server --apps all
# or

``--apps all`` serves the Core API Server, the Execution API Server, and the DAG Processing
API Server. You can also select a subset of applications, for example to serve only the Core
and Execution API Servers:

.. code-block:: bash

airflow api-server --apps core,execution

If you want to separate the Core API Server and the Execution API Server, you can run them
separately. This might be useful for scaling them independently or for deploying them on different machines.
If you want to separate the applications, you can run them separately. This might be useful for
scaling them independently or for deploying them on different machines.

.. code-block:: bash

# serve only the Core API Server
airflow api-server --apps core
# serve only the Execution API Server
airflow api-server --apps execution
# serve only the DAG Processing API Server
airflow api-server --apps dag-processing

.. note::

The standalone DAG processor (``airflow dag-processor``) reads and writes its metadata through
the DAG Processing API Server, so the API server it connects to must include the ``dag-processing``
app. ``airflow api-server --apps all`` includes it; a subset such as ``--apps core,execution`` does
not. If the ``dag-processing`` app is missing, the DAG processor's requests return ``404`` and it
cannot run.

Known Issues
------------
Expand Down
12 changes: 12 additions & 0 deletions airflow-core/docs/howto/docker-compose/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ x-airflow-common:
AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
AIRFLOW__API_AUTH__JWT_SECRET: ${AIRFLOW__API_AUTH__JWT_SECRET:-airflow_jwt_secret}
AIRFLOW__API_AUTH__JWT_ISSUER: ${AIRFLOW__API_AUTH__JWT_ISSUER:-airflow}
# The DAG processor parses user code, so it never mints its own API token: airflow-init mints
# one to this shared file and the processor reads it (re-reading it as it is rotated). For a
# hardened deployment, run the processor in a separate image that does not carry the signing
# key above; see the Helm chart for that pattern.
AIRFLOW__DAG_PROCESSOR__API_TOKEN_PATH: '/opt/airflow/api-token/dag-processor-token'
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
Expand All @@ -81,6 +86,7 @@ x-airflow-common:
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- airflow-api-token:/opt/airflow/api-token
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
Expand Down Expand Up @@ -270,6 +276,10 @@ services:
echo
/entrypoint airflow config list >/dev/null
echo
echo "Minting the DAG processor API token (the processor never mints its own)."
echo
/entrypoint airflow provision-dag-processor-token
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
Expand Down Expand Up @@ -335,3 +345,5 @@ services:

volumes:
postgres-db-volume:
# Shares the DAG processor API token from airflow-init (which mints it) to the processor.
airflow-api-token:
63 changes: 63 additions & 0 deletions airflow-core/newsfragments/67878.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
DAG processor now reads and writes metadata exclusively through the API server

The standalone DAG processor (``airflow dag-processor``) no longer connects to the metadata
database directly. It persists parse results and reads all metadata through the API server,
mirroring how workers and triggerers already operate.

**What changed:**

- The DAG processor process opens no metadata-database connection. Persistence (serialized
DAGs, import errors, warnings), stale-DAG and orphaned-import-error reconciliation, bundle
synchronization and state, priority-parse-request and callback claiming, and the processor's
own ``Job`` liveness record all go through the API server.
- Parse-time ``Connection``/``Variable``/``XCom`` reads from DAG files resolve through the
Execution API rather than an in-process app backed by the local database.

**Behaviour changes:**

- An API server must be reachable for the DAG processor to run. A deployment that previously
ran ``airflow dag-processor`` with only a database connection (no API server) must now also
run the API server.
- The API server fronting the DAG processor must run the ``dag-processing`` app. Start it with
``airflow api-server --apps all`` or include ``dag-processing`` explicitly; a subset such as
``--apps core,execution`` omits it and the DAG processor cannot run.
- The DAG processor authenticates to the API server with a bearer token the deployment
provisions; it does not mint its own. Because it parses user code, it is not given the signing
key. A trusted component mints the token and writes it to the file referenced by
``[dag_processor] api_token_path``; the processor only reads that file, re-reading it as the
token is rotated, so a refreshed token is picked up without a restart.
- ``airflow provision-dag-processor-token`` mints the token and writes it to that file. The
bundled Helm chart (an init container) and docker-compose example run it for you; in a custom
deployment, run it from a trusted context that holds the signing key before the processor starts,
and re-run it before ``[dag_processor] jwt_expiration_time`` elapses to rotate the token.

**Configuration:**

- ``[core] dag_processing_api_server_url`` selects the DAG Processing API endpoint. It defaults
to the ``/dag-processing`` mount on the configured API server (``{BASE_URL}/dag-processing``),
so deployments that already run the API server need no extra configuration. Set it to point
the DAG processor at a different host.
- ``[dag_processor] api_token_path`` is the path to a file holding the bearer token the DAG
processor presents to the API server (for both the DAG Processing and Execution APIs).
- ``[dag_processor] jwt_expiration_time`` is the lifetime of the minted token (default 24h); the
provisioning step should re-mint before it elapses.

**Migration:**

- Ensure the API server is running and reachable from the DAG processor. If the DAG processor
runs on a host without the default API server, set ``[core] dag_processing_api_server_url``
(and ``[core] execution_api_server_url`` for parse-time metadata reads) to its address.
- Provision the DAG processor's token (``airflow provision-dag-processor-token``) from a trusted
context that holds the signing key, and point the processor at it with
``[dag_processor] api_token_path``. The Helm chart and docker-compose example do this by default.

* Types of change

* [ ] Dag changes
* [x] Config changes
* [x] CLI changes
* [x] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [x] Code interface changes
* [ ] API changes
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/api_fastapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
init_middlewares,
init_views,
)
from airflow.api_fastapi.dag_processing.app import create_dag_processing_api_app
from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
Expand Down Expand Up @@ -61,7 +62,7 @@ def get_cookie_path() -> str:


# Fast API apps mounted under these prefixes are not allowed
RESERVED_URL_PREFIXES = ["/api/v2", "/ui", "/execution", "/auth", "/pluginsv2"]
RESERVED_URL_PREFIXES = ["/api/v2", "/ui", "/execution", "/auth", "/pluginsv2", "/dag-processing"]

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,6 +108,9 @@ def create_app(apps: str = "all") -> FastAPI:
init_error_handlers(task_exec_api_app)
app.mount("/execution", task_exec_api_app)

if "all" in apps_list or "dag-processing" in apps_list:
app.mount("/dag-processing", create_dag_processing_api_app())

if "all" in apps_list or "core" in apps_list:
app.state.dag_bag = dag_bag
init_plugins(app)
Expand Down
103 changes: 103 additions & 0 deletions airflow-core/src/airflow/api_fastapi/auth/dag_processor_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Mint and provision the bearer token the DAG processor presents to the API server (AIP-92).

The DAG processor parses (and forks) user code, so it must never hold the deployment signing key
or mint its own token. A *trusted* component runs the helpers here -- the deployment's provisioning
step (a Helm init container, a docker-compose init service) or ``airflow standalone`` -- mints the
token and writes it to ``[dag_processor] api_token_path``. The processor only ever reads that file
(re-reading it as it is rotated), so it carries a token without being able to forge one.
"""

from __future__ import annotations

import logging
import os
from pathlib import Path

from airflow.api_fastapi.auth.tokens import JWTGenerator, get_signing_args
from airflow.configuration import conf

log = logging.getLogger(__name__)

# The Execution API is task-instance scoped: its ``sub`` is validated as a UUID. The DAG processor
# is not a task instance, so its token carries an all-zero sentinel UUID rather than a real id.
DAG_PROCESSOR_TOKEN_SUBJECT = "00000000-0000-0000-0000-000000000000"
Comment on lines +38 to +40
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like too much of a hack. There is nothing in JWT that says the sub claim must be a UUID, that is just our choice, so I think for dag processing the sub should be something else.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed it's a hack. It's a UUID only because the processor reuses the Execution API for parse-time Connection/Variable reads, and those routes go through CurrentTIToken -> TIToken(id: UUID) (the id also feeds team scoping), so a non-UUID sub is rejected before the route runs. The all-zero value is a stand-in non-TI principal, and the one token currently carries both the execution and dag-processing audiences.

The DAG Processing validator accepts any sub, so the UUID is only needed on the Execution side. Two ways to give dag processing a real sub:

  1. Two tokens: a dag-processing token with sub=dag-processor and a separate execution token that keeps the UUID (the Execution API genuinely is TI-scoped).
  2. Generalise the Execution principal so the read-only conn/var routes accept a non-TI sub.

I lean to (2) if you're open to it (one token, no sentinel anywhere); otherwise I'll do (1). Which would you prefer?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sub=dag-processor -- this leads to an interesting connudrum. I'd really like to be able to tie requests to an Connection or Variable to a specific file, which means some kind of token exchange to get a per-file scoped token.

I think the point about CurrentTIToken leads to a more sailient design question though: is it right to use the Execution API for this. The "Execution" part of the API is not really true for dag parsing. I also don't want us to have to duplicate things into the /dag-processing/ API (I'm already not happy with how much we have to duplicate from the public API to the Execution API, doing that a third time makes me sad.

I'm in favour of 2 generally, and probably the TIClaims and TIToken classes are a mistake/overly specific naming. Nothing seems to look at TIClaims that I can see.

$ rg 'token\.' airflow-core/src/airflow/api_fastapi/execution_api
airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
130:    parent_ti = session.get(TaskInstance, token.id)

airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
55:        token.id,

airflow-core/src/airflow/api_fastapi/execution_api/routes/connections.py
38:        token.id,

airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
47:        token.id,

airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
318:    if token.claims.scope == "workload":

airflow-core/src/airflow/api_fastapi/execution_api/datamodels/token.py
32:    Validated JWT claims for a task identity token.

airflow-core/src/airflow/api_fastapi/execution_api/security.py
97:    dedup or Cadwyn replays) return the cached token.
160:    token_scope = token.claims.scope
187:        if str(token.id) != ti_self_id:
239:        return await session.scalar(_team_name_for_ti_stmt(token.id))

Also the "per-team connection" in exec API currently wouldn't work anyway as the team is looked up from task_instance -> dag_model -> dag_bundle -> dag_bundle.teams, so that wouldn't work with a fake uuid anyway.



def mint_dag_processor_token(
*,
valid_for: int | None = None,
make_secret_key_if_needed: bool = False,
) -> str:
"""
Mint the bearer token the DAG processor presents to the API server.

Trusted callers only. The token carries *both* the Execution and DAG Processing audiences (the
processor calls both APIs) under :data:`DAG_PROCESSOR_TOKEN_SUBJECT`. Signing uses whatever
``get_signing_args`` resolves -- the symmetric ``[api_auth] jwt_secret`` or, where configured, an
asymmetric private key -- so the same minting works for a single deployment or a JWKS-based
control plane without change here.

:param valid_for: token lifetime in seconds; defaults to ``[dag_processor] jwt_expiration_time``.
:param make_secret_key_if_needed: generate a signing key if none is configured. Leave ``False``
in real deployments (the key must be the shared one the API server validates with); only
``airflow standalone``, which materialises that shared key itself, passes ``True``.
"""
audiences = [
conf.get_mandatory_list_value("execution_api", "jwt_audience")[0],
conf.get_mandatory_list_value("dag_processor", "jwt_audience")[0],
]
generator = JWTGenerator(
valid_for=valid_for if valid_for is not None else conf.getint("dag_processor", "jwt_expiration_time"),
# A JWT ``aud`` may be a list; the generator's hint is single-audience, but the processor
# presents this one token to both the Execution and DAG Processing APIs.
audience=audiences, # type: ignore[arg-type]
issuer=conf.get("api_auth", "jwt_issuer", fallback=None),
**get_signing_args(make_secret_key_if_needed=make_secret_key_if_needed),
)
return generator.generate({"sub": DAG_PROCESSOR_TOKEN_SUBJECT})


def provision_dag_processor_token_file(
output: str | os.PathLike[str] | None = None,
*,
valid_for: int | None = None,
make_secret_key_if_needed: bool = False,
) -> str:
"""
Mint a DAG processor token and write it to ``output`` (or ``[dag_processor] api_token_path``).

The token is written atomically (temp file then ``rename``) so the processor, which re-reads the
file, never observes a half-written token. Re-run before the token expires to rotate it in place.

:return: the path written.
"""
target = output or conf.get("dag_processor", "api_token_path", fallback=None)
if not target:
raise ValueError(
"No output path for the DAG processor token: pass output or set [dag_processor] api_token_path."
)
token = mint_dag_processor_token(valid_for=valid_for, make_secret_key_if_needed=make_secret_key_if_needed)
path = Path(target)
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_name(f"{path.name}.tmp")
tmp_path.write_text(token)
os.replace(tmp_path, path)
log.info("Wrote DAG processor API token to %s", path)
return str(path)
25 changes: 25 additions & 0 deletions airflow-core/src/airflow/api_fastapi/dag_processing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
DAG Processing API (AIP-92).

A FastAPI sub-app that lets the DAG processor persist parse results without a
direct metadata-DB connection. The processor parses files locally and POSTs the
results here; this app owns the DB writes.
"""

from __future__ import annotations
Loading
Loading