很多业务场景都需要在后台定期执行任务,如数据ETL(Extract-Transform-Load)操作。简单处理可通过crontab来管理。当任务需要在多台机器上执行,或者任务之间有依赖关系时,crontab便不太能满足需求。这种场景下需要分布式任务调度系统来组织任务编排,管理任务依赖,调度任务工作流和监视任务执行状态。
比较优秀的开源解决方案有:
Azkaban和Oozie都更聚集在大数据处理平台上的任务调度,Airflow的应用场景更为通用。本文简单介绍Airflow。
Airflow使用Python开发,它通过DAGs(Directed Acyclic Graph, 有向无环图)
来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。比如,如下的工作流中,任务T1
执行完成,T2
和T3
才能开始执行,T2
和T3
都执行完成,T4
才能开始执行。
Airflow使用Code as Configuration
的理念,DAG
由Python源码文件来定义, 如:
1 | from datetime import datetime, timedelta |
Etsy提供了一个工具,可以从YAML文件来生成DAG文件,如果不习惯使用Python来定义DAG文件,可以使用该工具。
DAG的一次执行叫做DAG run
, 包括了本次工作流执行中的任务执行实例。DAG文件定义了工作流逻辑,任务本身的实现由Operator定义。Operator一般是原子任务,两个Operator之间一般不共享资源。DAG保证他们之间的执行顺序,而他们实际完全有可能在不同的机器上运行。如果两个Operator之间确实共享信息,可以使用airflow的XComs机制。
Airflow提供了各种Operator实现,可以完成各种任务实现:
- BashOperator: 执行Bash命令
- PythonOperator: 执行Python函数
- EmailOperator: 发送邮件
- SimpleHttpOperator: 发送HTTP请求
- ……
一些情况下,我们需要根据执行结果执行不同的任务,这样工作流会产生分支。如:
这种需求可以使用BranchPythonOperator
来实现。参考:
Airflow的整体架构主要由三部分:
- WebServer:提供WEB服务,会定时扫描指定目录下的DAG文件
- Scheduler: 定期检查所有DAG及其中的任务,如果发现符合执行条件,则发起相应的任务供worker接收
- Worker: 接收任务并执行任务
Worker的具体实现由配置文件中的executor
来指定,airflow支持多种Executor
:
- SequentialExecutor: 单进程顺序执行,一般只用来测试
- LocalExecutor: 本地多进程执行
- CeleryExecutor: 使用Celery进行分布式任务调度
- DaskExecutor:使用Dask进行分布式任务调度
- KubernetesExecutor: 1.10.0新增, 创建临时POD执行每次任务
生产环境一般使用CeleryExecutor
和KubernetesExecutor
。
使用CeleryExecutor
的架构如图:
使用KubernetesExecutor
的架构如图:
可以根据实际任务的执行环境要求来进行选择,当然kubernetes
平台已经非常成熟和便捷,资源使用弹性更加具备优势。但目前KubernetesExecutor
还不太稳定,简单场景可以通过使用LocalExecutor执行kubectl相关操作或者使用KubernetesPodOperator
来变相实现在Kubernetes集群中运行任务的需求。
下面介绍在Kubernetes环境中使用Helm来安装Airflow.
Bitnami提供了Apache Airflow Helm Chart。
它需要一个GIT仓库地址来拉取DAG文件,需要提前准备好:
1 | export REPOSITORY_URL=https://github.com/flygoast/airflow-dag-examples |
在Kubernetes集群安装:
1 | helm install airflow bitnami/airflow \ |
等待安装完成:
1 | flygoast@bogon devel % kubectl get pods |
配置端口映射:
1 | kubectl port-forward --namespace default svc/airflow 8080:8080 |
获取密码:
1 | flygoast@bogon devel % echo Password: $(kubectl get secret --namespace default airflow -o jsonpath="{.data.airflow-password}" | base64 --decode) |
使用user
和密码登录http://127.0.0.1:8080
, airflow安装完成:
当我们更新了GIT仓库中的DAG文件后,需要执行helm upgrade
:
1 | helm upgrade airflow bitnami/airflow \ |
参考链接: