airflow任务报警功能试用

Mar 14, 2021, 12:56 pm

一、前言

airflow是一款优秀的任务调度、监控平台,由Airbnb公司开源提供。本文主要提供监控功能使用介绍,安装教程请参考这篇文章
本文以一个任务的管理为案例,说明多种情况下airflow如何实现报警。

DAG任务

案例为5个任务,任务间依赖关系:

任务3依赖任务1和任务2,任务5依赖任务3和任务4。

二、各种情况下任务实现

测试一:正常情况执行任务,完成后实现邮件通知
dag任务的脚步如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#coding: utf-8

import datetime
import time
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.email import send_email
from airflow.operators.python import PythonOperator

# email account
mail_to = "your_mail_account"
attachments = []


def task_success_function(context):
"""
callback on each task success complete!
"""
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
send_email(
mail_to,
"Task completed!",
f"DAG: {dag_id}, TASK_ID: {task_id} complete!. {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
attachments
)


def task_retry_function(context):
"""
callback on each retry task!
"""
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
send_email(
mail_to,
"Task Retry!",
f"DAG: {dag_id}, TASK_ID: {task_id} retry!. {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
attachments
)


def task_failure_function(context):
"""
callback on each failure task!
"""
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
send_email(
mail_to,
"Task Failure!",
f"DAG: {dag_id}, TASK_ID: {task_id} failure!. {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
attachments
)


def task_sla_function(*args, **kwargs):
"""
callback on each task exceeded the time should have succeeded!
"""
send_email(
mail_to,
"Task Exceeded!",
f"DAG: exceed!. {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
attachments
)


# args setting
default_args = {
"owner": "zhnagjinfei",
"retries": 1,
"retry_delay": timedelta(seconds=10),
"execution_timeout": timedelta(seconds=30),
"on_failure_callback": task_failure_function,
"on_success_callback": task_success_function,
"on_retry_callback": task_retry_function,
"sla_miss_callback": task_sla_function
}

# create a dag task
dag = DAG(
'test_task',
default_args=default_args,
description="a tutorial",
tags=['test'],
start_date=days_ago(2)
)

# average complete in 10 seconds,notice if not complete in 30s, retry if not complete in 60s, max retry 1;
t1 = BashOperator(
task_id='task1',
bash_command='/home/hadoop/tmp/task.sh 1 2',
dag=dag,
retries=1, # task1 is easy to fail
sla=timedelta(seconds=30), # should complete time
execution_timeout=timedelta(seconds=60), # overwrite default args
)

t2 = BashOperator(
task_id='task2',
bash_command='/home/hadoop/tmp/task.sh 2 1',
dag=dag
)


def defer_run(*args, **kwargs):
"""延迟10s中再执行任务"""
time.sleep(10)
print("任务执行前休眠10s")
print("继续执行")


t3 = BashOperator(
task_id='task3',
bash_command='/home/hadoop/tmp/task.sh 3 2',
dag=dag
)

t4 = BashOperator(
task_id='task4',
bash_command='/home/hadoop/tmp/task.sh 4 2',
dag=dag
)

t5 = BashOperator(
task_id='task5',
bash_command='/home/hadoop/tmp/task.sh 5 1',
dag=dag
)

dag.doc_md = __doc__

# task3 rely task1 and task2, task5 rely task3 and task4
[t1, t2] >> t3
[t3, t4] >> t5

task.sh是一个shell脚本,接受2个参数:task id 和 暂停的秒数,在暂停n秒后,向log文件中写入一条记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/bin/bash

if [ $# != 2 ]; then
echo "参数必须为2个,如:'task.sh 0 0'"
exit 1;
fi

startTime=`date +"%Y-%m-%d %H:%M:%S"`
startTime_s=`date +%s`
echo "${startTime} 开始执行任务$1" >> task.log

task_index=$1
seconds=$2

sleep ${seconds}

endTime=`date +"%Y-%m-%d %H:%M:%S"`
endTime_s=`date +%s`
t_s=$[ $endTime_s - $startTime_s ]
echo "${endTime} 完成执行任务${task_index}, 共耗时:${t_s}秒." >> /home/hadoop/tmp/task.log

exit 0

执行结果


所有任务均执行成功,并且邮件通知收到5封完成的通知邮件。

测试二:设置任务超时时间,超过规定时间的任务需要通知并重试
为了完成这个测试,新创建一个任务,任务很简单,只有一个task,该task调用脚本,脚本执行时间超过35s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#coding: utf-8

import datetime
import time
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.email import send_email

# email account
mail_to = "****@163.com"
attachments = []


def task_retry_function(context):
"""
callback on each retry task!
"""
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
send_email(
mail_to,
"Task Retry!",
f"DAG: {dag_id}, TASK_ID: {task_id} retry!. {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
attachments
)


def task_failure_function(context):
"""
callback on each failure task!
"""
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
send_email(
mail_to,
"Task Failure!",
f"DAG: {dag_id}, TASK_ID: {task_id} failure!. {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
attachments
)


def task_sla_function(*args, **kwargs):
"""
callback on each task exceeded the time should have succeeded!
"""
send_email(
mail_to,
"Task Exceeded!",
f"DAG: exceed!. {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
attachments
)


# args setting
default_args = {
"owner": "user",
"retries": 1,
"retry_delay": timedelta(seconds=10),
"execution_timeout": timedelta(seconds=30),
"on_failure_callback": task_failure_function,
"on_success_callback": None,
"on_retry_callback": task_retry_function
}

# create a dag task
dag = DAG(
'test_sla_task',
default_args=default_args,
description="a tutorial",
tags=['test'],
schedule_interval="*/2 * * * *",
start_date=datetime.datetime(2021, 3, 16, 8, 0, 0),
sla_miss_callback=task_sla_function
)

# average complete in 5 seconds,notice if not complete in 20s, retry if not complete in 30s, max retry 1;
t1 = BashOperator(
task_id='task1',
bash_command='/home/hadoop/tmp/task.sh 1 35',
dag=dag,
sla=timedelta(seconds=20), # should complete time
)

针对这个任务,进行以下监测:

  • 如果任务超过20s没有完成,发送超时通知;
  • 如果任务30s没有完成,进行1次重试并发送通知;
  • 任务失败后发送通知。

注意由于重试后,该任务依然超时,所以超时通知会发送2次。
下图是任务执行后,邮箱的通知信息:

完成上述任务的关键是BashOperator中sla参数,该参数继承自BaseOperator中,用以设置超时时间。同时在DAG中(注意不是default_args)设置sla超时后的回调函数sla_miss_callback。
sla功能仅适用于调度任务,手动触发的任务不适应。调度任务是指配置schedule_interval而触发的任务。

测试三、等待任务1和任务2均完成后,等待10秒再执行
任务3改为:

1
2
3
4
5
6
7
t3 = BashOperator(
task_id='task3',
bash_command='/home/hadoop/tmp/task.sh 3 2',
dag=dag
)

t3.pre_execute = defer_run

执行结果显示,任务3执行之前休眠了10s,图中划线的2个时间差。

测试四、如果任务5在某个时间点还没开始执行,则发送邮件通知
有多种办法实现该功能,比如任务开始前pre_execute执行某个动作,那么我们在预期时间点监测这个动作是否已执行。或者在预期时间点调用airflow提供的api接口,判断task是否已执行。

本文采用api接口调用判断,关于airflow接口调用,请参考

http://{部署IP}/redoc#tag/TaskInstance

airflow提供了获取dag运行中task instance的信息接口

接口需要传输dag_id和dag_run_id
接口返回task_instance列表,每个instance中包含state信息,state为null表示尚未开始运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"dag_id": "test_task",
"duration": null,
"end_date": null,
"execution_date": "2021-03-15T23:51:43.951007+00:00",
"executor_config": "{}",
"hostname": "",
"max_tries": 1,
"operator": "BashOperator",
"pid": null,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 1,
"queue": "default",
"queued_when": null,
"sla_miss": null,
"start_date": null,
"state": null,
"task_id": "task5",
"try_number": 0,
"unixname": "hadoop"
}
`

比如,在预期task5需要已执行的时刻去获取列表 ,如果state为null,表示尚未开始执行,因此进行报警。

三、总结

接触airflow之后被它强大的功能所震撼,据了解许多大厂基于airflow二次开发任务管理平台。掌握airflow的基本使用,将大大提高我们任务监控的能力。本文仅就几个案例说明airflow的使用,更多内容还请访问官网,官方文档内容比较齐全。