Callback#

Overview#

Callback 是 Airflow 的一个机制, 可以在某些事件发生时调用一个 Python 函数. 例如在 task 失败的时候发 email 提醒等. Airflow 支持下列 callback:

  • on_success_callback: Invoked when the task succeeds

  • on_failure_callback: Invoked when the task fails

  • sla_miss_callback: Invoked when a task misses its defined SLA (Service Level Agreements)

  • on_retry_callback: Invoked when the task is up for retry

  • on_execute_callback: Invoked right before the task begins executing.

这些参数是要在你用 @task decorator 或是 BaseOperator 的时候传入的 (所有的 Operator 都支持这些参数). 不然是不会有作用的.

这里有一个小坑. 如果你用了 @dag + @task 定义了你的 dag 和 task, 按照官方文档, 如果你在 dag 级别定义了任何 callback, 所有的 task 是可以自动获得这些 callback 的. 但是官方文档没有说, 你不许用 with DAG(...) 以及 PythonOperator(...) 这样的形式定义才能有这个效果. 而用 decorator 是没有这个效果的, 你需要给每个 task 一个个的定义.

下面我们给出了一个示例 dag:

 1# -*- coding: utf-8 -*-
 2
 3"""
 4学习如何使用 callback 函数在 任务成功, 失败, 重试的时候执行一些操作.
 5
 6Reference:
 7
 8- https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/callbacks.html
 9"""
10
11import pendulum
12
13from airflow.decorators import dag, task
14
15# 由于 decorator 的参数由于其实现原理比较 tricky, IDE 不能自动提示,
16# 但我们知道它的参数跟 Operator class 的参数一致, 所以我这里 import 进来供我点击跳转查看参数.
17from airflow.operators.python import BaseOperator
18
19
20def on_success_callback(context):
21    print("I called on_success_callback")
22
23
24def on_failure_callback(context):
25    print("I called on_failure_callback")
26
27
28def on_retry_callback(context):
29    print("I called on_retry_callback")
30
31
32@dag(
33    dag_id="dag_0002_callback",
34    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),  # pendulum 是一个更可靠的时区库
35    schedule=None,  # 不自动执行, 你点了 Trigger DAG 才会执行
36    catchup=False,
37)
38def my_dag():
39    @task(
40        task_id="task1",
41        on_success_callback=on_success_callback,
42    )
43    def task1():
44        """
45        Task 1 永远成功.
46        """
47        print("Start task1")
48        print("End task1")
49
50    @task(
51        task_id="task2",
52        retries=10,
53        retry_delay=1,
54        on_retry_callback=on_retry_callback,
55    )
56    def task2():
57        """
58        Task 2 只有 10% 的概率成功. 但是我们重试 10 次, 按概率 10 次全部失败的概率是 35%, 并不低.
59        """
60        import random
61
62        print(f"Start task2")
63        value = random.randint(1, 100)
64        print(f"Generated value is {value}")
65        if value <= 10:
66            pass
67        else:
68            raise ValueError("Randomly failed")
69
70        print("End task2")
71
72    @task(
73        task_id="task3",
74        on_failure_callback=on_failure_callback,
75    )
76    def task3():
77        """
78        Task 3 永远失败.
79        """
80        print("Start task3")
81        raise Exception("Always failed")
82        print("End task3")
83
84    run_task1 = task1()
85    run_task2 = task2()
86    run_task3 = task3()
87
88    run_task1 >> run_task2 >> run_task3
89
90
91run_dag = my_dag()  # 你最后必须要实例化这个 DAG 对象 (它是被 @dag 装饰器装饰的函数的返回值, 不是原本的函数了)