ShortCircuit#

Overview#

断路器在编排系统中是一个常见的模式. 它能让整个流程提前终止.

Airflow 有一个 ShortCircuitOperator 就是干这个事情的. 它本质上是一个 task, 同时也是一个 callable, 跟普通的 Python 函数一样可以有任何参数, 只不过它必须返回 Ture 或者 False. 如果是 True 则表示我们继续执行, 当它不存在. 而如果是 False 则表示我们不执行后续的 task, 直接结束整个 DAG 的执行. 同时把所有后续的 Task 标记为 skipped, 并且整个 DAG 视为 succeeded.

我们这里有一个示例 DAG:

dag_0004_short_circuit.py
 1# -*- coding: utf-8 -*-
 2
 3"""
 4This example shows how to use
 5`ShortCircuitOperator <https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#shortcircuitoperator>`_
 6to skip downstream tasks.
 7"""
 8
 9import pendulum
10
11from airflow.decorators import dag
12# you need to import ShortCircuitOperator
13from airflow.operators.python import ShortCircuitOperator
14from airflow.operators.empty import EmptyOperator
15
16
17@dag(
18    dag_id="dag_0004_short_circuit",
19    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),  # pendulum 是一个更可靠的时区库
20    schedule=None,  # 不自动执行, 你点了 Trigger DAG 才会执行
21    catchup=False,
22)
23def my_dag():
24    task1 = EmptyOperator(
25        task_id="task1",
26    )
27    # short circuit operator is just a callable function, however,
28    # it has to return a boolean value, True or False.
29    # If True, then keep going,
30    # If False, then skip all downstream tasks.
31    cond = ShortCircuitOperator(
32        task_id="condition",
33        python_callable=lambda: False,
34    )
35    task2 = EmptyOperator(
36        task_id="task2",
37    )
38    task1 >> cond >> task2
39
40
41run_dag = my_dag()  # 你最后必须要实例化这个 DAG 对象 (它是被 @dag 装饰器装饰的函数的返回值, 不是原本的函数了)

ShortCircuit Use Case 1 - Custom Exception Handling#

那么这个模式可以用来做什么呢? 我这里举个例子, 有的时候你有的 task 会因为各种系统原因, 例如依赖系统暂时不可用, 过一段时间你重试就好了. 所以你希望将其标记为成功, 而不希望抛出异常, 因为你认为这种情况是可以接受的. 但是这个 task 后续有很多 task, 你应该怎么做呢? 很简单. 先用 Mark Task as Succeeded for Certain Error 中的方法让这个 task 返回的值中包含一个 boolean flag 用于表示是否需要执行断路器. 然后把一个 ShortCircuitOperator 放在后面, 并且接受前一个 task 的输出作为输入即可.

我们这里有一个示例 DAG:

dag_0005_conditional_short_circuit.py
 1# -*- coding: utf-8 -*-
 2
 3"""
 4This is an advanced example of using
 5`ShortCircuitOperator <https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#shortcircuitoperator>`_. We will learn how to use custom if else logic to control the execution of downstream tasks.
 6"""
 7
 8import random
 9import pendulum
10
11from airflow.models.dag import DAG
12from airflow.decorators import dag, task
13from airflow.operators.empty import EmptyOperator
14from airflow.operators.python import PythonOperator, ShortCircuitOperator
15
16
17# ------------------------------------------------------------------------------
18# Method 1
19# ------------------------------------------------------------------------------
20# @dag(
21#     dag_id="dag_0005_conditional_short_circuit",
22#     start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),  # pendulum 是一个更可靠的时区库
23#     schedule=None,  # 不自动执行, 你点了 Trigger DAG 才会执行
24#     catchup=False,
25# )
26# def my_dag():
27#     @task(
28#         task_id="task1",
29#     )
30#     def task1():
31#         """
32#         该任务 50% 几率会返回 True, 50% 几率返回 False.
33#         """
34#         print("Start task1")
35#         value = random.randint(1, 100)
36#         print(f"rnd value is {value}")
37#         do_we_stop_earlier = not (value > 50)
38#         return do_we_stop_earlier
39#
40#     # short circuit operator is just a callable function
41#     # it can take arbitrary number of arguments from previous steps
42#     # you just need to return True or False
43#     @task.short_circuit(
44#         task_id="conditional_short_circuit",
45#     )
46#     def conditional_short_circuit(flag: bool):
47#         return flag
48#
49#     run_task1 = task1()
50#     run_task2 = EmptyOperator(
51#         task_id="task2",
52#     )
53#     # conditional_short_circuit(run_task1) is the syntax to pass the returned
54#     # value from task1 to conditional_short_circuit
55#     run_task1 >> conditional_short_circuit(run_task1) >> run_task2
56#
57#
58# run_dag = my_dag()  # 你最后必须要实例化这个 DAG 对象 (它是被 @dag 装饰器装饰的函数的返回值, 不是原本的函数了)
59
60
61# ------------------------------------------------------------------------------
62# Method 2
63# ------------------------------------------------------------------------------
64with DAG(
65    dag_id="dag_0005_conditional_short_circuit",
66    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),  # pendulum 是一个更可靠的时区库
67    schedule=None,  # 不自动执行, 你点了 Trigger DAG 才会执行
68    catchup=False,
69) as my_dag:
70
71    def task1():
72        """
73        该任务 50% 几率会返回 True, 50% 几率返回 False.
74        """
75        print("Start task1")
76        value = random.randint(1, 100)
77        print(f"rnd value is {value}")
78        do_we_stop_earlier = not (value > 50)
79        return do_we_stop_earlier
80
81    run_task1 = PythonOperator(
82        task_id="task1",
83        python_callable=task1,
84    )
85
86    run_conditional_short_circuit = ShortCircuitOperator(
87        task_id="conditional_short_circuit",
88        python_callable=lambda flag: flag,
89        op_kwargs={"flag": run_task1.output},
90    )
91
92    run_task2 = EmptyOperator(
93        task_id="task2",
94    )
95
96    run_task1 >> run_conditional_short_circuit >> run_task2