airflow源码解析——调度器篇

因为airflow1.02.0变更了许多

接下来会通过分析airflow1.0airflow2.0的源码来加深对airflow的理解

airflow简介

airflow是一个工作流模式的分布式任务调度框架,可以实现复杂的工作流调度。因为工作原因需要对airflow进行调研,因此这里记录一下学习airflow过程遇到的问题并吸收一些实现技巧。接下来会根据airflow的各种组件,深入airflow的源码进行讲解。airflow1.02.0,实现的架构进行了一次较大的改变,因此,我在这里同时分析两个大版本的源码,通过分析其中的异同,也可以一窥开发者的设计思路和优化方向。

首先放一张整体的airflow架构图

d5a83dfb61fdda9670e2237463cdf633

2.01.0在整体结构相似,这里大概会将airflow的解析分为调度器篇 worker(executor)dag处理器篇 webserver

接下来的内容默认读者对airflow的基本概念有一定的了解,如果尚未了解相关内容,之后有时间我再考虑写一篇概念相关内容

airflow创建工作流的过程主要如下,注意airflow是一个命令驱动的框架,几乎所有的机制都是从发送命令行开始的

  1. 通过命令行创建dagoperator
  2. 配置airflow的基本配置,包括数据库连接,webserver端口,连接板并发数,读取的存放dag文件夹的位置等等
  3. 启动调度器,webserver以及executor,这时框架会自动从设置好的的dag文件夹位置读取dag和operator的配置文件并生成dagtask存放进数据库
  4. 接着就可以登陆airflowwebserver查看各个dag的运行情况l

airflow1.0 调度器部分

这里我们选择的版本是1.8.2

首先观察一下airflow的主要结构

image-20211128132757164

大致解释一下各个目录的作用

  • api: 放置了一些用于代码内接口调用的方法
  • bin:启动方法
  • contrib:应该是一些第三方插件的定制方法,包括sensor,executor,hook等
  • dag:dag的基类
  • example_dag: 一些dag的样例模板
  • executor:具体的executor
  • hooks:用于方法内调用的hook,使得可以降低非核心功、能的耦合程度
  • macros:只有hive的一些方法,我还不是特别了解
  • migrations: 略
  • operators:定制的一些operator,即task的模板
  • security:有关用户认证和登录的方法
  • task_runner: 执行airflow命令行工具,有点像bash
  • ti_dep: taskinstance 的一些依赖状态
  • utils: 工具类
  • www: 页面相关

可以看到目录结构非常之多,因此我们需要抓住一条主线来梳理。本篇选择以调度器作为切入口,我们就从调度器的创建开始吧。

airflow所有机制的运行入口几乎都是从命令行开始的,因此我们直接从命令行的入口方法查看,这里我们查看/bin/cli.py

可以看到再cli.pyCLIFactory类中存在大量的命令定义,这里在之后的学习中还会经常回到这里。我们直接查看scheduler的启动

image-20211128142711668

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def scheduler(args):
print(settings.HEADER)
# 调度器的核心
job = jobs.SchedulerJob(
dag_id=args.dag_id,
subdir=process_subdir(args.subdir),
run_duration=args.run_duration,
num_runs=args.num_runs,
do_pickle=args.do_pickle)
# 如果挂后台,就将进程挂到daemonContext上
if args.daemon:
pid, stdout, stderr, log_file = setup_locations("scheduler", args.pid, args.stdout, args.stderr, args.log_file)
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
)
with ctx:
job.run()

stdout.close()
stderr.close()
# 通过信号直接启动进程
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
job.run()

可以看见,核心在于创建schuedulerJob类,同时传入dag_id, 持续时间,并发数等基本参数

继续进入,可以发现我们跳转到了job.py

我们先查看一下schedulerJob的父类,baseJob.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class BaseJob(Base, LoggingMixin):

__tablename__ = "job"

id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN),)
state = Column(String(20))
job_type = Column(String(30))
start_date = Column(DateTime())
end_date = Column(DateTime())
latest_heartbeat = Column(DateTime())
executor_class = Column(String(500))
hostname = Column(String(500))
unixname = Column(String(1000))


def __init__(
pass
)

def kill(self):
pass

def on_kill(self):
pass

def heartbeat_callback(self, session=None):
pass

def heartbeat(self):
pass

def run(self):
pass

def _execute(self):
pass

@provide_session
def reset_state_for_orphaned_tasks(self, dag_run, session=None):
pass

可以发现,这里job首先是作为一张表保存了job的状态,执行情况(这里是sqlalchemy的相关知识)

主要的方法有

  • is_alive: 通过heartbeat判断是否存活
  • kill:通过删除数据库记录的方式来结束任务
  • on_kill:等待子类重写,用于删除任务的时候做一些额外处理
  • heartbeat_callback:心跳的回调,等待子类重写
  • heartbeat:发送心跳,下面详细介绍
  • run:修改表中job的状态,来表示job正在进行,同时执行_exexute
  • _execute: 等待子类重写,如何执行job
  • reset_state_for_orphaned_tasks:重置孤儿任务

下面我们看看heartbeat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def heartbeat(self):
session = settings.Session()
job = session.query(BaseJob).filter_by(id=self.id).one()
make_transient(job)
session.commit()
session.close()

if job.state == State.SHUTDOWN:
self.kill()

# Figure out how long to sleep for
sleep_for = 0
if job.latest_heartbeat:
sleep_for = max(
0,
self.heartrate - (datetime.now() - job.latest_heartbeat).total_seconds())

# Don't keep session open while sleeping as it leaves a connection open
session.close()
sleep(sleep_for)

# Update last heartbeat time
session = settings.Session()
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
job.latest_heartbeat = datetime.now()
session.merge(job)
session.commit()

self.heartbeat_callback(session=session)
session.close()
self.logger.debug('[heart] Boom.')

心跳在调度器运行期间定期发送,通过查表判断调度器状态是否正常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@provide_session
def reset_state_for_orphaned_tasks(self, dag_run, session=None):
# 从执行器中获取等待入队的任务
queued_tis = self.executor.queued_tasks

# 同样考虑正在执行器中执行的任务
running = self.executor.running
tis = list()
tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session))
tis.extend(dag_run.get_task_instances(state=State.QUEUED, session=session))

# 等待入队和正在执行器执行的任务属于正常情况不会处理
for ti in tis:
if ti.key not in queued_tis and ti.key not in running:
self.logger.debug("Rescheduling orphaned task {}".format(ti))
ti.state = State.NONE
session.commit()

重置孤儿任务则是考虑在调度过程可能因为各种异常情况,如调度器进程突然中止或者没有没有执行器执行任务,就会产生无法继续执行的孤儿任务,这种任务将会在下一次调度前进行回收并再次调度

接下来就进入schedulerJob了,我们直接来看核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
   def _execute(self):
self.logger.info("Starting the scheduler")
# 打开锁,对表连接进行独占
pessimistic_connection_handling()

logging.basicConfig(level=logging.DEBUG)

# dag可以被一些执行器pickle序列化,以便更容易地远程执行
pickle_dags = False
if self.do_pickle and self.executor.__class__ not in \
(executors.LocalExecutor, executors.SequentialExecutor):
pickle_dags = True

# Use multiple processes to parse and generate tasks for the
# DAGs in parallel. By processing them in separate processes,
# we can get parallelism and isolation from potentially harmful
# user code.

# some log

# Build up a list of Python files that could contain DAGs
self.logger.info("Searching for files in {}".format(self.subdir))
# 遍历目录并查找python文件
known_file_paths = list_py_file_paths(self.subdir)
self.logger.info("There are {} files in {}"
.format(len(known_file_paths), self.subdir))

def processor_factory(file_path, log_file_path):
return DagFileProcessor(file_path,
pickle_dags,
self.dag_ids,
log_file_path)
# 核心的方法,处理dag,将文件转换成dag
processor_manager = DagFileProcessorManager(self.subdir,
known_file_paths,
self.max_threads,
self.file_process_interval,
self.child_process_log_directory,
self.num_runs,
processor_factory)

# 执行调度
try:
self._execute_helper(processor_manager)
finally:
self.logger.info("Exited execute loop")


# 到这里调度结束,杀死子进程并退出
pids_to_kill = processor_manager.get_all_pids()
if len(pids_to_kill) > 0:
# First try SIGTERM
this_process = psutil.Process(os.getpid())

# 只检查子进程以确保因为子进程死亡但是进程ID被重用导致杀死错误进程的情况
child_processes = [x for x in this_process.children(recursive=True)
if x.is_running() and x.pid in pids_to_kill]
for child in child_processes:
self.logger.info("Terminating child PID: {}".format(child.pid))
child.terminate()
timeout = 5
self.logger.info("Waiting up to {}s for processes to exit..."
.format(timeout))
# 等待进程被中止
try:
psutil.wait_procs(child_processes, timeout)
except psutil.TimeoutExpired:
self.logger.debug("Ran out of time while waiting for "
"processes to exit")

# Then SIGKILL
child_processes = [x for x in this_process.children(recursive=True)
if x.is_running() and x.pid in pids_to_kill]
if len(child_processes) > 0:
for child in child_processes:
self.logger.info("Killing child PID: {}".format(child.pid))
child.kill()
child.wait()

可以看到调度的流程是 加锁-> 获取解析后的dag -> 调度 ->调度结束杀死进程

关于解析dag我们单独放一篇出来讲,这里我们直接进入execute_helper一探究竟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def _execute_helper(self, processor_manager):

self.executor.start()

session = settings.Session()
self.logger.info("Resetting state for orphaned tasks")
# grab orphaned tasks and make sure to reset their state
active_runs = DagRun.find(
state=State.RUNNING,
external_trigger=False,
session=session,
no_backfills=True,
)
for dr in active_runs:
self.logger.info("Resetting {} {}".format(dr.dag_id,
dr.execution_date))
self.reset_state_for_orphaned_tasks(dr, session=session)

session.close()

execute_start_time = datetime.now()

# Last time stats were printed
last_stat_print_time = datetime(2000, 1, 1)
# Last time that self.heartbeat() was called.
last_self_heartbeat_time = datetime.now()
# Last time that the DAG dir was traversed to look for files
last_dag_dir_refresh_time = datetime.now()

# Use this value initially
known_file_paths = processor_manager.file_paths

# For the execute duration, parse and schedule DAGs
while (datetime.now() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
self.logger.debug("Starting Loop...")
loop_start_time = time.time()

# Traverse the DAG directory for Python files containing DAGs
# periodically
elapsed_time_since_refresh = (datetime.now() -
last_dag_dir_refresh_time).total_seconds()

if elapsed_time_since_refresh > self.dag_dir_list_interval:
# Build up a list of Python files that could contain DAGs
self.logger.info("Searching for files in {}".format(self.subdir))
known_file_paths = list_py_file_paths(self.subdir)
last_dag_dir_refresh_time = datetime.now()
self.logger.info("There are {} files in {}"
.format(len(known_file_paths), self.subdir))
processor_manager.set_file_paths(known_file_paths)

self.logger.debug("Removing old import errors")
self.clear_nonexistent_import_errors(known_file_paths=known_file_paths)

# Kick of new processes and collect results from finished ones
self.logger.info("Heartbeating the process manager")
simple_dags = processor_manager.heartbeat()

if self.using_sqlite:
# For the sqlite case w/ 1 thread, wait until the processor
# is finished to avoid concurrent access to the DB.
self.logger.debug("Waiting for processors to finish since we're "
"using sqlite")
processor_manager.wait_until_finished()

# Send tasks for execution if available
if len(simple_dags) > 0:
simple_dag_bag = SimpleDagBag(simple_dags)

# Handle cases where a DAG run state is set (perhaps manually) to
# a non-running state. Handle task instances that belong to
# DAG runs in those states

# If a task instance is up for retry but the corresponding DAG run
# isn't running, mark the task instance as FAILED so we don't try
# to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.UP_FOR_RETRY],
State.FAILED)
# If a task instance is scheduled or queued, but the corresponding
# DAG run isn't running, set the state to NONE so we don't try to
# re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.QUEUED,
State.SCHEDULED],
State.NONE)

self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED,))

# Call hearbeats
self.logger.info("Heartbeating the executor")
self.executor.heartbeat()

# Process events from the executor
self._process_executor_events()

# Heartbeat the scheduler periodically
time_since_last_heartbeat = (datetime.now() -
last_self_heartbeat_time).total_seconds()
if time_since_last_heartbeat > self.heartrate:
self.logger.info("Heartbeating the scheduler")
self.heartbeat()
last_self_heartbeat_time = datetime.now()

# Occasionally print out stats about how fast the files are getting processed
if ((datetime.now() - last_stat_print_time).total_seconds() >
self.print_stats_interval):
if len(known_file_paths) > 0:
self._log_file_processing_stats(known_file_paths,
processor_manager)
last_stat_print_time = datetime.now()

loop_end_time = time.time()
self.logger.debug("Ran scheduling loop in {:.2f}s"
.format(loop_end_time - loop_start_time))
self.logger.debug("Sleeping for {:.2f}s"
.format(self._processor_poll_interval))
time.sleep(self._processor_poll_interval)

# Exit early for a test mode
if processor_manager.max_runs_reached():
self.logger.info("Exiting loop as all files have been processed "
"{} times".format(self.num_runs))
break

# Stop any processors
processor_manager.terminate()

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
all_files_processed = True
for file_path in known_file_paths:
if processor_manager.get_last_finish_time(file_path) is None:
all_files_processed = False
break
if all_files_processed:
self.logger.info("Deactivating DAGs that haven't been touched since {}"
.format(execute_start_time.isoformat()))
models.DAG.deactivate_stale_dags(execute_start_time)

self.executor.end()

settings.Session.remove()

内容有点长,主要可以分成这几部分

  • 处理前调度的特殊情况
  • 调度和解析dag
  • 执行task-instance
  • 调度结束后处理

先看调度前

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
self.executor.start()

session = settings.Session()
self.logger.info("Resetting state for orphaned tasks")
# grab orphaned tasks and make sure to reset their state
active_runs = DagRun.find(
state=State.RUNNING,
external_trigger=False,
session=session,
no_backfills=True,
)
for dr in active_runs:
self.logger.info("Resetting {} {}".format(dr.dag_id,
dr.execution_date))
self.reset_state_for_orphaned_tasks(dr, session=session)

session.close()

execute_start_time = datetime.now()

# Last time stats were printed
last_stat_print_time = datetime(2000, 1, 1)
# Last time that self.heartbeat() was called.
last_self_heartbeat_time = datetime.now()
# Last time that the DAG dir was traversed to look for files
last_dag_dir_refresh_time = datetime.now()

# Use this value initially
known_file_paths = processor_manager.file_paths

这里会首先寻找dag-run,dag-run是dag每次执行生成的实例,通过dag-run清理之前调度器遗留下来的孤儿任务

除了孤儿任务,首先看调度前有哪些依然正常运行的任务

  1. dag-run有效同时task-instance还在调度中(schedue),这些可以直接被本次调度回收
  2. dag-run有效同时task-instance还在进入队列中(queued),这些会被队列回收

因为

airflow2.0调度器部分

airflow2.0因为使用python3作为开发语言,因此在架构上使用了许多新的特性,同时整体的文件组织结构也发生了较大的变化

分享到: