Callback#
Overview#
Callback 是 Airflow 的一个机制, 可以在某些事件发生时调用一个 Python 函数. 例如在 task 失败的时候发 email 提醒等. Airflow 支持下列 callback:
on_success_callback
: Invoked when the task succeedson_failure_callback
: Invoked when the task failssla_miss_callback
: Invoked when a task misses its defined SLA (Service Level Agreements)on_retry_callback
: Invoked when the task is up for retryon_execute_callback
: Invoked right before the task begins executing.
这些参数是要在你用 @task
decorator 或是 BaseOperator
的时候传入的 (所有的 Operator 都支持这些参数). 不然是不会有作用的.
这里有一个小坑. 如果你用了 @dag
+ @task
定义了你的 dag 和 task, 按照官方文档, 如果你在 dag 级别定义了任何 callback, 所有的 task 是可以自动获得这些 callback 的. 但是官方文档没有说, 你不许用 with DAG(...)
以及 PythonOperator(...)
这样的形式定义才能有这个效果. 而用 decorator 是没有这个效果的, 你需要给每个 task 一个个的定义.
下面我们给出了一个示例 dag:
1# -*- coding: utf-8 -*-
2
3"""
4学习如何使用 callback 函数在 任务成功, 失败, 重试的时候执行一些操作.
5
6Reference:
7
8- https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/callbacks.html
9"""
10
11import pendulum
12
13from airflow.decorators import dag, task
14
15# 由于 decorator 的参数由于其实现原理比较 tricky, IDE 不能自动提示,
16# 但我们知道它的参数跟 Operator class 的参数一致, 所以我这里 import 进来供我点击跳转查看参数.
17from airflow.operators.python import BaseOperator
18
19
20def on_success_callback(context):
21 print("I called on_success_callback")
22
23
24def on_failure_callback(context):
25 print("I called on_failure_callback")
26
27
28def on_retry_callback(context):
29 print("I called on_retry_callback")
30
31
32@dag(
33 dag_id="dag_0002_callback",
34 start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), # pendulum 是一个更可靠的时区库
35 schedule=None, # 不自动执行, 你点了 Trigger DAG 才会执行
36 catchup=False,
37)
38def my_dag():
39 @task(
40 task_id="task1",
41 on_success_callback=on_success_callback,
42 )
43 def task1():
44 """
45 Task 1 永远成功.
46 """
47 print("Start task1")
48 print("End task1")
49
50 @task(
51 task_id="task2",
52 retries=10,
53 retry_delay=1,
54 on_retry_callback=on_retry_callback,
55 )
56 def task2():
57 """
58 Task 2 只有 10% 的概率成功. 但是我们重试 10 次, 按概率 10 次全部失败的概率是 35%, 并不低.
59 """
60 import random
61
62 print(f"Start task2")
63 value = random.randint(1, 100)
64 print(f"Generated value is {value}")
65 if value <= 10:
66 pass
67 else:
68 raise ValueError("Randomly failed")
69
70 print("End task2")
71
72 @task(
73 task_id="task3",
74 on_failure_callback=on_failure_callback,
75 )
76 def task3():
77 """
78 Task 3 永远失败.
79 """
80 print("Start task3")
81 raise Exception("Always failed")
82 print("End task3")
83
84 run_task1 = task1()
85 run_task2 = task2()
86 run_task3 = task3()
87
88 run_task1 >> run_task2 >> run_task3
89
90
91run_dag = my_dag() # 你最后必须要实例化这个 DAG 对象 (它是被 @dag 装饰器装饰的函数的返回值, 不是原本的函数了)