How we parse Apache Airflow DAGs without importing Airflow
How we parse Apache Airflow DAGs without importing Airflow
我们是如何在不导入 Apache Airflow 的情况下解析 Airflow DAG 的
TL;DR — Leoflow runs a Go control plane that never imports Apache Airflow, yet compiles standard airflow.sdk DAGs. It does it with a structural shim: a pure-stdlib stand-in for airflow that the parser puts on the import path, then execs your dag.py to record the graph (without running task bodies or installing a single provider). Arbitrary provider operators are captured by class + kwargs at compile time and run for real in the task pod at runtime. This is the engineering behind Leoflow v0.1.0.
简而言之,Leoflow 运行着一个 Go 控制平面,它从不导入 Apache Airflow,却能编译标准的 airflow.sdk DAG。它通过一个结构化垫片(shim)来实现:这是一个仅使用标准库的 Airflow 替代品,解析器将其放入导入路径中,然后执行你的 dag.py 来记录图结构(无需运行任务主体,也无需安装任何提供程序)。任意的提供程序算子(provider operators)在编译时通过类名和关键字参数(kwargs)被捕获,并在运行时于任务 Pod 中真实执行。这就是 Leoflow v0.1.0 背后的工程实现。
The constraint that forces the design Leoflow’s scheduler is Go — no GIL, no Python in the hot path (that’s the whole point: Airflow’s Python control plane is what makes it slow). But a Leoflow DAG is a standard Apache Airflow 3.2 DAG, written against airflow.sdk:
强制这一设计的约束条件是 Leoflow 的调度器使用 Go 编写——没有全局解释器锁(GIL),热路径中没有 Python 代码(这正是重点:Airflow 的 Python 控制平面正是其缓慢的原因)。但 Leoflow 的 DAG 是标准的 Apache Airflow 3.2 DAG,基于 airflow.sdk 编写:
from airflow.sdk import DAG, task
from airflow.providers.standard.operators.bash import BashOperator
with DAG("etl", schedule="@daily"):
pull = BashOperator(task_id="pull", bash_command="echo '[1,2,3]' > /tmp/raw.json")
@task
def transform() -> int:
import json
return len(json.load(open("/tmp/raw.json")))
pull >> transform()
So: how does a control plane that never imports Airflow read a DAG written against the Airflow SDK? Importing real Airflow into the parser would drag in the GIL, the dependency tree, and parse-time side effects — exactly what we’re escaping. The answer (ADR 0024) is to not import Airflow at all.
那么,一个从不导入 Airflow 的控制平面是如何读取基于 Airflow SDK 编写的 DAG 的呢?在解析器中导入真实的 Airflow 会引入 GIL、依赖树以及解析时的副作用——这正是我们想要规避的。答案(ADR 0024)是根本不导入 Airflow。
The shim: a structural stand-in for airflow. The parser ships a pure-standard-library package that looks like airflow — same import paths, same attribute surface the compiler reads — and nothing else. It’s put ahead of any real Airflow on the import path, and then the parser simply exec’s your dag.py:
垫片:Airflow 的结构化替代品。解析器提供了一个纯标准库包,看起来就像 Airflow 一样——具有相同的导入路径、相同的编译器读取的属性接口——除此之外别无他物。它被放置在导入路径中任何真实 Airflow 之前,然后解析器只需执行你的 dag.py:
import runpy
runpy.run_path("dag.py", run_name="__leoflow_dag__") # `airflow` resolves to the shim
Running the file builds structure. Here’s the core of the shim (paraphrased):
运行该文件即可构建结构。以下是垫片的核心逻辑(意译):
_CURRENT: list = [] # stack of DAGs being defined
COLLECTED: dict = {} # dag_id -> DAG, filled as each DAG context is entered
class DAG:
def __init__(self, dag_id, schedule=None, tags=None, **kw):
self.dag_id, self.schedule, self.task_dict = dag_id, schedule, {}
COLLECTED[dag_id] = self
def __enter__(self): _CURRENT.append(self); return self
def __exit__(self, *e): _CURRENT.pop()
class BaseOperator:
def __init__(self, task_id, **kwargs):
self.upstream_task_ids, self.downstream_task_ids = set(), set()
dag = kwargs.get("dag") or (_CURRENT[-1] if _CURRENT else None)
if dag: dag.task_dict[task_id] = self
def __rshift__(self, other):
self.downstream_task_ids.add(other.task_id)
other.upstream_task_ids.add(self.task_id)
return other
with DAG(...) registers; constructing an operator attaches it to the active DAG and stores its kwargs; >> records edges; @task builds the node but never runs the body. The compiler then reads COLLECTED and emits an immutable dag.json.
with DAG(...) 进行注册;构造算子时将其附加到当前活动的 DAG 并存储其 kwargs;>> 记录边;@task 构建节点但从不运行函数体。随后,编译器读取 COLLECTED 并生成一个不可变的 dag.json。
Two properties fall straight out of this:
- Unsupported constructs can’t be faked. A
from airflow.<thing>the shim doesn’t model raisesModuleNotFoundError, which the loader turns into a clear “not supported by Leoflow” error — at compile time, never a silent half-run. - Parsing has no side effects.
@taskbodies never execute during parsing, so a DAG file can’t trigger its own work just by being read — the thing that makes Airflow’s dag-parsing both slow and risky.
由此直接得出两个特性:
- 不支持的结构无法伪造。如果垫片中未建模
from airflow.<thing>,则会引发ModuleNotFoundError,加载器将其转化为明确的“Leoflow 不支持”错误——在编译时发生,绝不会出现静默的半运行状态。 - 解析没有副作用。
@task函数体在解析过程中从不执行,因此 DAG 文件不会因为被读取而触发自身的工作——这正是导致 Airflow 的 DAG 解析既缓慢又有风险的原因。
The control plane now has the graph without importing Airflow or installing one provider.
现在,控制平面在无需导入 Airflow 或安装任何提供程序的情况下,就获得了图结构。
The long tail: capture, don’t reimplement. Modeling all 1,500+ provider operators in the shim would be a treadmill. So for anything beyond the native handful (bash, python, http, empty), the shim has a meta-path finder (ADR 0040) that synthesizes any airflow.providers.<x>.{operators,sensors,transfers}.<Class> on demand. It doesn’t implement the operator — it captures it: records the operator’s real dotted class path and its constructor kwargs, then registers it like any node.
长尾效应:捕获,而非重新实现。在垫片中对 1500 多个提供程序算子进行建模将是一项无休止的工作。因此,对于原生支持的少数几个(bash, python, http, empty)之外的任何内容,垫片都有一个元路径查找器(ADR 0040),可以按需合成任何 airflow.providers.<x>.{operators,sensors,transfers}.<Class>。它并不实现该算子,而是捕获它:记录算子真实的类路径和构造函数 kwargs,然后像任何节点一样注册它。
No provider is installed in the parser. The dotted path and kwargs are just data in dag.json.
解析器中没有安装任何提供程序。类路径和 kwargs 只是 dag.json 中的数据。
The seam: the real operator runs in the pod. At runtime, inside the task’s own pod — where the provider is installed, baked into that DAG’s image — the agent reconstructs and runs the genuine operator: import_string(dotted_class)(**captured_kwargs).execute(context). The real Airflow operator executes, with the real provider, against the real connection — while the control plane that scheduled it never imported either.
接合点:真实的算子在 Pod 中运行。在运行时,在任务自身的 Pod 内——即安装了提供程序并将其构建到该 DAG 镜像中的环境中——代理会重建并运行真实的算子:import_string(dotted_class)(**captured_kwargs).execute(context)。真实的 Airflow 算子使用真实的提供程序,针对真实的连接进行执行——而调度它的控制平面从未导入过这两者。
Compile time: structure, dependency-free, in Go’s world. Run time: the real Airflow operator, in an isolated pod. That split is the entire design — it’s how you get Airflow’s ecosystem fidelity without Airflow’s control plane.
编译时:结构化、无依赖,处于 Go 的世界中。运行时:真实的 Airflow 算子,在隔离的 Pod 中。这种分离是整个设计的核心——这就是如何在没有 Airflow 控制平面的情况下获得 Airflow 生态系统保真度的方法。
Why it matters:
- No GIL, no Airflow imports in scheduling — the control plane stays fast and Go-native.
- No dependency hell — each DAG owns its image; the parser needs zero providers.
- No parse-time surprises — reading a DAG can’t run it.
- Full operator fidelity — the actual provider operator runs in the pod.
为什么这很重要:
- 没有 GIL,调度中没有 Airflow 导入——控制平面保持快速且原生于 Go。
- 没有依赖地狱——每个 DAG 拥有自己的镜像;解析器不需要任何提供程序。
- 没有解析时的意外——读取 DAG 不会运行它。
- 完全的算子保真度——实际的提供程序算子在 Pod 中运行。
It’s all open source (Apache 2.0): github.com/neochaotic/leoflow. ADR 0024 (the shim) and ADR 0040 (operator capture) have the gory details.
这一切都是开源的(Apache 2.0):github.com/neochaotic/leoflow。ADR 0024(垫片)和 ADR 0040(算子捕获)包含了详细的技术细节。