概述

本篇章我们介绍一下如何在源码和 Docker 两种方式下安装 Airflow3.0.6 环境,Airflow 不支持在 windows 上的部署,所以在这里我们使用的是 Windows 上的 Linux 子系统。

至于说为什么不使用 Airflow 最近版本 3.1,这是因为博主在真实使用中踩到了最新版本的 bug,包括但不限于 fab 验证出错,数据库连接异常。

源码安装

系统说明

  • 操作系统:WSL Ubuntu22.04
  • Airflow 版本:Airflow3.0.6
  • Python 版本:3.10
  • 数据库:sqlite

更新系统组件

sudo apt-get update
sudo apt-get install -y build-essential libssl-dev libffi-dev python3-dev libsqlite3-dev

项目准备

# 安装 airflow
mkdir ~/airflow-repo
cd ~/airflow-repo

uv venv .venv -p 3.10
uv init
uv add apache-airflow==3.0.6

这将为你初始化一个虚拟环境,并安装Apache Airflow3.0.6我们可以测试一下是否安装成功了:

airflow verison  # 3.0.6

启动项目

现在我们已经有了airflow项目的环境,在我们启动airflow之前我们还需要设置 airflow的家目录,默认会在家目录下生成日志文件和配置文件等重要文件夹

export AIRFLOW_HOME=~/airflow-repo

airflow提供了一个命令行命令airflow standalone这将启动 airflow 所需的所有组件

airflow standalone

这将自动在 AIRFLOW_HOME 目录下生成 sqlite 数据库文件并迁移数据表和生成 airflow.cfg 配置文件,此时,你可以访问 localhost:8080 路径来访问Airflow 了。

默认的用户是admin默认密码在~/airflow-repo/simple_auth_manager_passwords.json文件中或者你可以在输出日志中找到账户和密码。

Docker 安装

我们前面介绍了怎么源码安装 airflow接下来让我们介绍一下如何使用 docker 安装 airflow 服务

# 拉取镜像
docker pull apache/airflow:3.0.6-python3.10

启动所有airflow组件

docker run -d --name airflow \
  -p 8080:8080 \
  apache/airflow:3.0.6-python3.10 \
  bash -c "\
        airflow standalone"

这样你就可以访问 localhost:8080来访问 airflow 了

  • 单独启动各组件

如果你不想一次性启动所有的组件,你可以复制出配置文件 airflow.cfg然后添加必须的文件夹挂载到容器中,注意,你需要将 logs``dags``plugins等文件夹添加可执行权限,测试建议 777一劳永逸。

docker run -d --name airflow \
  -p 8080:8080 \
  -v "$PARENT_DIR/logs:/opt/airflow/logs" \
  -v "$PARENT_DIR/dags:/opt/airflow/dags" \
  -v "$PARENT_DIR/plugins:/opt/airflow/plugins" \
  -v "$PARENT_DIR/airflow.cfg:/opt/airflow/airflow.cfg" \
  apache/airflow:3.0.6-python3.10 \
  bash -c "\
    airflow scheduler & \
    airflow triggerer & \
    airflow dag-processor & \
    airflow api-server"

你可以在 dags文件夹中添加你的定时任务,然后调式测试程序是否正常工作。

调试

断点调试

虽然我们还没有介绍过 Airflow 的任务,但是我们启动了airflow后,我们肯定想断点调试任务,我们可以这样:

或者这里可以调整到别的地方?

import datetime

import pendulum
from airflow.providers.standard.operators.python import PythonOperator

from airflow import DAG


def test_dag(**context):
    print(f"start_date:{datetime.datetime.now()}")
    print(f"start_date:{datetime.datetime.now()}")


with DAG(
    dag_id="dag_test",
    description="A test process DAG",
    schedule=None,
    start_date=pendulum.now(),
    tags=["dag_test"],
) as dag:
    t1 = PythonOperator(task_id="dag_test", python_callable=test_dag)

    t1


if __name__ == "__main__":
    dag.test()

使用VSCode或者Pycharm直接debug这个文件,这里最重要的就是 dag.test()方法