airflow2.8.4快速入门第一个dag

airflow2.8.4快速入门第一个dag

首页冒险解谜你好访客2更新时间:2024-05-09

AirFlow

Author

Date

Version

WangJian

2024-3-29

1.0/init

快速入门安装 miniconda

它只包含最基本的内容——python与conda,以及相关的必须依赖项,对于空间要求严格的用户,Miniconda是一种选择。就只包含最基本的东西,其他的库得自己装。

下载安装文件:

$ wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh

执行安装,注意不要使用sudo前缀,否则会安装给root用户

$Bash Miniconda3-latest-Linux-x86_64.sh

安装完成:

测试,输出source ~/.bashrs将会进入miniconda的环境,进入后,前面多了一个(base):

wyj@ubuntu2204:~$ source .bashrc (base) wyj@ubuntu2204:~$

退出conda环境:

(base) wyj@ubuntu2204:~$ conda deactivate wyj@ubuntu2204:~$

关闭conda的base环境,我们不需要每次执行source ~/.bashrs时进入这个环境

wyj@ubuntu2204:~$ conda config --set auto_activate_base false wyj@ubuntu2204:~$

给conda添加国内源:

# 阿里镜像源

conda config --add channels https://mirrors.aliyun.com/pypi/simple/

# 显示检索路径,每次安装包时会将包源路径显示出来

conda config --set show_channel_urls yes conda config --set always_yes True

#执行以下命令清除索引缓存,保证用的是镜像站提供的索引

conda clean -i

# 显示所有镜像通道路径命令

conda config --show channels

设置显示urls:

wyj@ubuntu2204:~$ conda config --set show_channel_urls yes2、创建python3.11的环境

根据airflow官网所说,3.7~3.11版本的python都支持。

创建:

wyj@ubu:~/software$ conda create --name airflow python=3.113、安装airflow

安装依赖:

(airflow) wyj@ubu:~/software$ pip install numpy -i https://pypi.tuna.tsinghua.edu.cn/simple

然后创建pip的配置文件,使得修改的pip源生效:

sudo mkdir ~/.pip sudo vim ~/.pip/pip.conf

添加以下内容:

[global] index-url = https://pypi.tuna.tsinghua.edu.cn/simple [install] trusted-host = https://pypi.tuna.tsinghua.edu.cn

安装airflow:到2024-3-28 最新版本的airflow为:2.8.4:

(airflow) wyj@ubu:~/software$ pip install "apache-airflow==2.8.4"

初始化数据库:默认使用sqlite:

(airflow) wyj@ubu:~/software$ airflow db init

创建用户:

airflow users create \ --username wyj \ --firstname wyj \ --lastname wyj \ --role Admin \ --email 549051701@qq.com

查看版本:

(airflow) wyj@ubu:~/airflow$ airflow version 2.8.4

启动airflow web服务:airflow webserver -p 8089 -D(为防止8080端口被占用,选用不常用的端口)

(airflow) wyj@ubu:~/airflow$ airflow webserver -p 8089 -D

启动airflow调度:airflow scheduler -D

(airflow) wyj@ubu:~/airflow$ airflow scheduler -D

可选的创建启动和停止脚本:

#!/bin/bash case $1 in "start"){ echo " --------启动 airflow-------" ssh 你的服务器名称 "conda activate airflow;airflow webserver -p 8089 -D;airflow scheduler -D; conda deactivate" };; "stop"){ echo " --------关闭 airflow-------" ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15 };; esac

访问: http://192.168.56.100:8090

4、配置使用mysql8数据库管理元数据

启动mysql8数据库,指定skip_ssl参数:(docker-compose.yaml)

services: mysql8: image: mysql:8.3.0 ports: - 3306:3306 hostname: mysql8 container_name: mysql8 command: - --default-authentication-plugin=mysql_native_password - --skip-ssl environment: MYSQL_ROOT_PASSWORD: 123456 TZ: Asia/Shanghai volumes: - ${PWD}/data:/var/lib/mysql

安装python mysql驱动:(注意先进入miniconda的airflow环境)

$(airflow) wyj@ubu:$ pip install mysql-connector-python

修改airflow配置文件:

$ vim ~/airflow/airflow.cfg

修改以下内容:

sql_alchemy_conn = mysql mysqlconnector://root:hadoop@服务器名称:3306/airflow_db executor = LocalExecutor

重新启动airflow:

(airflow) wyj@ubu:~/shell$ ./airflow.sh stop (airflow) wyj@ubu:~/shell$ ./airflow.sh start

配置用户:

airflow users create \ --username wyj \ --firstname wyj \ --lastname wyj \ --role Admin \ --email 549051701@qq.com

登录:

http://192.168.56.100:8090

到此安装已经完成。

开发第一个dag

参考官网:https://airflow.apache.org/docs/apache-airflow/stable/index.html

步1:进入airflow环境:(即已经安装好的python3.11和airflow2.8.4的miniconda的环境)

wyj@ubu:~/airflow$ conda activate airflow (airflow) wyj@ubu:~/airflow$

步2:在任意的位置,编写一个python脚本,参考官网:

from datetime import datetime from airflow import DAG from airflow.decorators import task from airflow.operators.bash import BashOperator # A DAG represents a workflow, a collection of tasks with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag: # Tasks are represented as operators hello = BashOperator(task_id="hello", bash_command="echo hello") @task() def airflow(): print("airflow") # Set dependencies between tasks hello >> airflow()

步3:测试是否可运行

使用python运行,如果没有出错,则为正常可用:

(airflow) wyj@ubu:~/shell/dags$ python demo01.py (airflow) wyj@ubu:~/shell/dags$

步4:放到airflow配置的dags目录下

$ cp demo01.py ~/airflow/airflow/dags/demo01.py

步5:查询airflow web ui界面

启动运行,并查看效果:

到此,第一个dag运成功。

查看全文
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved