airflow 框架使用经验
主要总结 airflow 框架使用经验。
Beginner
-
Airflow Introduction
-
Run Airflow in Python Env
-
Run Airflow in Docker
-
Airflow Basics and Core Concepts
-
Airflow Task Lifecycle
-
Airflow Basic Architecture
-
Airflow DAG with Bash Operator
-
Airflow DAG with Python Operator
-
Data Sharing via Airflow XComs
-
Airflow Task Flow API
-
Airflow Catch Up and Backfill
-
Airflow Scheduler with Cron Expression
-
Airflow Connection to Postgres
-
Airflow Postgres Operator
-
Airflow Docker Install Python Package 2 ways
-
Airflow AWS S3 Sensor Operator
-
Airflow Hooks S3 PostgreSQL
reference
Operator
TriggerDagRunOperator
使用新的 TriggerDagRunOperator,可以等待触发的 DAG 完成。不再需要创建自己的自定义运算符。这与将参数 wait_for_completion 设置为 true 一样简单。此外,参数 reset_dag_run 还允许回填触发的 DAGRuns,这是绝对关键的。
reference
DockerOperator
Airflow DockerOperator 是一个非常强大的操作员。
它在 docker 容器中执行任务。使用 DockerOperator 有多种优势,例如:
-
测试任务的更简单方法
-
控制任务所需的资源
-
避免依赖关系冲突
以及更多。
reference
Test on Airflow
通过使用 airflow scheduler cli 可以方便的测试 dag 和 task.
1 | # in airflow scheduler |
reference
Airflow Observability
reference
Airflow on k8s
KubernetesExecutor
在本地多节点 Kubernetes 集群上使用 KubernetesExecutor 运行 Apache Airflow.
-
相比其它 executor, KubernetesExecutor 动态创建工作节点,避免资源浪费
-
任务间在 pod 间是隔离容错的
-
减小 scheduler 由于边缘驱动触发压力
deploy airflow with helm
-
non prod 和 prod 区分于 database 是否部署.
-
由统一的管理多配置部署管理.
-
配置参数化配置.
-
scheduler,web server,database 单 pod 部署.
1 | passing = KubernetesPodOperator( |
k8s pod operator
由 scheduler 调度 worker 创建工作容器.
-
通常在 executor_config 配置 KubernetesExecutor.
install kind & helm & airflow
kind 本地多集群管理工具
helm: 多集群服务脚本部署工具
-
使用 kind 创建本地集群
-
helm 添加 airflow repos
1 | # 添加 airflow helm 仓库 |
reference
reference
-
https://marclamberti.com/blog/airflow-on-kubernetes-get-started-in-10-mins/
-
https://varunbpatil.github.io/2020/10/01/airflow-on-kubernetes.html