airflow 框架使用经验

主要总结 airflow 框架使用经验。

Beginner

  • Airflow Introduction

  • Run Airflow in Python Env

  • Run Airflow in Docker

  • Airflow Basics and Core Concepts

  • Airflow Task Lifecycle

  • Airflow Basic Architecture

  • Airflow DAG with Bash Operator

  • Airflow DAG with Python Operator

  • Data Sharing via Airflow XComs

  • Airflow Task Flow API

  • Airflow Catch Up and Backfill

  • Airflow Scheduler with Cron Expression

  • Airflow Connection to Postgres

  • Airflow Postgres Operator

  • Airflow Docker Install Python Package 2 ways

  • Airflow AWS S3 Sensor Operator

  • Airflow Hooks S3 PostgreSQL

reference

Operator

TriggerDagRunOperator

使用新的 TriggerDagRunOperator,可以等待触发的 DAG 完成。不再需要创建自己的自定义运算符。这与将参数 wait_for_completion 设置为 true 一样简单。此外,参数 reset_dag_run 还允许回填触发的 DAGRuns,这是绝对关键的。

reference

DockerOperator

Airflow DockerOperator 是一个非常强大的操作员。

它在 docker 容器中执行任务。使用 DockerOperator 有多种优势,例如:

  • 测试任务的更简单方法

  • 控制任务所需的资源

  • 避免依赖关系冲突

以及更多。

reference

Test on Airflow

通过使用 airflow scheduler cli 可以方便的测试 dag 和 task.

1
2
3
4
# in airflow scheduler
airflow task test [dag_id] [task_id] [execution date,如-1表示当前时间]
# 如果在容器外部
docker exec [container_id] airflow tasks test [dag_id] [task_id] -1

reference

Airflow Observability

reference

Airflow on k8s

KubernetesExecutor

在本地多节点 Kubernetes 集群上使用 KubernetesExecutor 运行 Apache Airflow.

  • 相比其它 executor, KubernetesExecutor 动态创建工作节点,避免资源浪费

  • 任务间在 pod 间是隔离容错的

  • 减小 scheduler 由于边缘驱动触发压力

deploy airflow with helm

  • non prod 和 prod 区分于 database 是否部署.

  • 由统一的管理多配置部署管理.

  • 配置参数化配置.

  • scheduler,web server,database 单 pod 部署.

    image.

1
2
3
4
5
6
7
8
9
10
11
passing = KubernetesPodOperator(
namespace="default",
image="python:3.8-bullseye-slim",
cmds=["python", "-c"],
arguments=["print('hello word')"],
labels={"foo": "bar"},
name="passing-test",
task_id="passing-task",
get_logs=True,
dag=dag,
)

k8s pod operator

由 scheduler 调度 worker 创建工作容器.

image.

  • 通常在 executor_config 配置 KubernetesExecutor.

install kind & helm & airflow

kind 本地多集群管理工具
helm: 多集群服务脚本部署工具

  • 使用 kind 创建本地集群

  • helm 添加 airflow repos

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 添加 airflow helm 仓库
helm repo add airflow https://marclamberti.github.io/airflow-eks-helm-char
helm repo update
helm repo list
helm search airflow
# 打印到本地配置 values.yaml
helm show values airflow/airflow > value.yaml
# helm 部署 airflow
helm install -f values.yaml --kube-context kind-airflow-cluster airflow airflow/airflow
# 查看 airflow 部署
kubectl get pods --context kind-airflow-cluster
# 导出本地访问端口
kubectl port-forward svc/airflow-webserver 8080:8080 --context kind-airflow-cluster

# 构建镜像 在values.yaml指定镜像重新部署
# FROM apache/airflow:1.10.10.1-alpha2-python3.7
# COPY dags $AIRFLOW_HOME/dags
docker build -t airflow-image:v1.0.0
# 删除本地集群
# kind delete cluster --name=airflow-cluster
helm upgrade -f values.yaml --kube-context kind-airflow-cluster airflow airflow/airflow

reference

reference

reference