Keep learning, keep living...

0%

Apache Airflow介绍

很多业务场景都需要在后台定期执行任务,如数据ETL(Extract-Transform-Load)操作。简单处理可通过crontab来管理。当任务需要在多台机器上执行,或者任务之间有依赖关系时,crontab便不太能满足需求。这种场景下需要分布式任务调度系统来组织任务编排,管理任务依赖,调度任务工作流和监视任务执行状态。

比较优秀的开源解决方案有:

Azkaban和Oozie都更聚集在大数据处理平台上的任务调度,Airflow的应用场景更为通用。本文简单介绍Airflow。

Airflow使用Python开发,它通过DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。比如,如下的工作流中,任务T1执行完成,T2T3才能开始执行,T2T3都执行完成,T4才能开始执行。

Airflow使用Code as Configuration的理念,DAG由Python源码文件来定义, 如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
dag_id='dummy_dag',
schedule_interval=dt.timedelta(days=1),
start_date=days_ago(1)
)

t1 = DummyOperator(task_id='task1', dag=dag)
t2 = DummyOperator(task_id='task2', dag=dag)
t3 = DummyOperator(task_id='task3', dag=dag)
t4 = DummyOperator(task_id='task4', dag=dag)

t1 >> [t2, t3] >> t4

Etsy提供了一个工具,可以从YAML文件来生成DAG文件,如果不习惯使用Python来定义DAG文件,可以使用该工具。

DAG的一次执行叫做DAG run, 包括了本次工作流执行中的任务执行实例。DAG文件定义了工作流逻辑,任务本身的实现由Operator定义。Operator一般是原子任务,两个Operator之间一般不共享资源。DAG保证他们之间的执行顺序,而他们实际完全有可能在不同的机器上运行。如果两个Operator之间确实共享信息,可以使用airflow的XComs机制

Airflow提供了各种Operator实现,可以完成各种任务实现:

  • BashOperator: 执行Bash命令
  • PythonOperator: 执行Python函数
  • EmailOperator: 发送邮件
  • SimpleHttpOperator: 发送HTTP请求
  • ……

一些情况下,我们需要根据执行结果执行不同的任务,这样工作流会产生分支。如:

这种需求可以使用BranchPythonOperator来实现。参考:

Airflow的整体架构主要由三部分:

  • WebServer:提供WEB服务,会定时扫描指定目录下的DAG文件
  • Scheduler: 定期检查所有DAG及其中的任务,如果发现符合执行条件,则发起相应的任务供worker接收
  • Worker: 接收任务并执行任务

Worker的具体实现由配置文件中的executor来指定,airflow支持多种Executor:

  • SequentialExecutor: 单进程顺序执行,一般只用来测试
  • LocalExecutor: 本地多进程执行
  • CeleryExecutor: 使用Celery进行分布式任务调度
  • DaskExecutor:使用Dask进行分布式任务调度
  • KubernetesExecutor: 1.10.0新增, 创建临时POD执行每次任务

生产环境一般使用CeleryExecutorKubernetesExecutor

使用CeleryExecutor的架构如图:

使用KubernetesExecutor的架构如图:

可以根据实际任务的执行环境要求来进行选择,当然kubernetes平台已经非常成熟和便捷,资源使用弹性更加具备优势。但目前KubernetesExecutor还不太稳定,简单场景可以通过使用LocalExecutor执行kubectl相关操作或者使用KubernetesPodOperator来变相实现在Kubernetes集群中运行任务的需求。

下面介绍在Kubernetes环境中使用Helm来安装Airflow.

Bitnami提供了Apache Airflow Helm Chart

它需要一个GIT仓库地址来拉取DAG文件,需要提前准备好:

1
export REPOSITORY_URL=https://github.com/flygoast/airflow-dag-examples

在Kubernetes集群安装:

1
2
3
4
5
helm install airflow bitnami/airflow \
--set airflow.cloneDagFilesFromGit.enabled=true \
--set airflow.cloneDagFilesFromGit.repository=$REPOSITORY_URL \
--set airflow.cloneDagFilesFromGit.branch=master \
--set airflow.baseUrl=http://127.0.0.1:8080

等待安装完成:

1
2
3
4
5
6
7
8
9
flygoast@bogon devel % kubectl get pods
NAME READY STATUS RESTARTS AGE
airflow-postgresql-0 1/1 Running 0 16m
airflow-redis-master-0 1/1 Running 0 16m
airflow-redis-slave-0 1/1 Running 0 16m
airflow-redis-slave-1 1/1 Running 0 15m
airflow-scheduler-8557d55695-wlkd9 2/2 Running 0 16m
airflow-web-766bb56d6d-8fl7l 2/2 Running 0 16m
airflow-worker-0 2/2 Running 0 16m

配置端口映射:

1
kubectl port-forward --namespace default svc/airflow 8080:8080

获取密码:

1
2
flygoast@bogon devel % echo Password: $(kubectl get secret --namespace default airflow -o jsonpath="{.data.airflow-password}" | base64 --decode)
Password: c9Yq6z17tk

使用user和密码登录http://127.0.0.1:8080, airflow安装完成:

当我们更新了GIT仓库中的DAG文件后,需要执行helm upgrade:

1
2
3
4
5
helm upgrade airflow bitnami/airflow \
--set airflow.cloneDagFilesFromGit.enabled=true \
--set airflow.cloneDagFilesFromGit.repository=$REPOSITORY_URL \
--set airflow.cloneDagFilesFromGit.branch=master \
--set airflow.baseUrl=http://127.0.0.1:8080

参考链接: