airflow安装与使用

摘要:摘要: Airflow是Airbnb内部发起的一个工作流(数据管道Data Pipeline)管理平台,现已进入Apache孵化项目。

1.安装环境

CentOS-6.5 Python-2.7.12 setuptools-29.0.1 pip-9.0.1

5 (6).jpg

2.编译Python

sudo yum install -y gcc sudo yum install -y gcc-c++ sudo yum install -y wget sudo yum install -y mysql sudo yum install -y mysql-devel sudo yum install -y python-devel sudo yum install -y zlib-devel sudo yum install -y openssl-devel sudo yum install -y sqlite-devel wget https://www.python.org/ftp/python/2.7.12/Python-2.7.12.tgz sudo mkdir /usr/local/python27 sudo tar zxfv Python-2.7.12.tgz -C /usr/local/ cd /usr/local/Python-2.7.12/ ./configure --prefix=/usr/local/python27 make make install sudo mv /usr/bin/python /usr/bin/python2.6 sudo ln -sf /usr/local/python/bin/python /usr/bin/python2.7 vim /usr/bin/yum #!/usr/bin/python2.6 vim /etc/profile export PYTHON_HOME=/usr/bin/python2.6 export PATH=$PYTHON_HOME/bin:$PATH wget https://pypi.python.org/packages/59/88/2f3990916931a5de6fa9706d6d75eb32ee8b78627bb2abaab7ed9e6d0622/setuptools-29.0.1.tar.gz#md5=28ecfd0f2574b489b9a18343879a7324 tar zxfv setuptools-29.0.1.tar.gz cd setuptools-29.0.1 python setup.py install wget https://pypi.python.org/packages/11/b6/abcb525026a4be042b486df43905d6893fb04f05aac21c32c638e939e447/pip-9.0.1.tar.gz#md5=35f01da33009719497f01a4ba69d63c9 tar zxfv pip-9.0.1.tar.gz cd pip-9.0.1 python setup.py install pip install --upgrade pip wget https://pypi.python.org/packages/a5/e9/51b544da85a36a68debe7a7091f068d802fc515a3a202652828c73453cad/MySQL-python-1.2.5.zip#md5=654f75b302db6ed8dc5a898c625e030c unzip MySQL-python-1.2.5.zip cd MySQL-python-1.2.5 python setup.py install #第三方包 /usr/local/python27/lib/python2.7/site-packages

3.安装

airflow通过pip可以方便的安装到系统中。

# airflow needs a home, ~/airflow is the default, # but you can lay foundation somewhere else if you prefer # (optional) export AIRFLOW_HOME=/usr/local/airflow # install from pypi using pip pip install airflow pip install airflow[hive] # initialize the database airflow initdb # start the web server, default port is 8080 airflow webserver -p 8080

4.设置mysql为元数据库

#首先要安装mysql客户端 sudo yum install -y mysql sudo yum install -y mysql-devel CREATE USER airflow; CREATE DATABASE airflow; CREATE DATABASE celery_result_airflow; GRANT all privileges on airflow.* TO \'airflow\'@\'%\' IDENTIFIED BY \'airflow\'; GRANT all privileges on celery_result_airflow.* TO \'airflow\'@\'%\' IDENTIFIED BY \'airflow\'; #安装mysql模块 wget https://pypi.python.org/packages/a5/e9/51b544da85a36a68debe7a7091f068d802fc515a3a202652828c73453cad/MySQL-python-1.2.5.zip#md5=654f75b302db6ed8dc5a898c625e030c unzip MySQL-python-1.2.5.zip cd MySQL-python-1.2.5 python setup.py install #在airflow的配置文件中配置mysql为元数据的存储库 sudo vi $AIRFLOW_HOME/airflow.cfg #更改数据库链接: sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow #对应字段解释如下: dialect+driver://username:password@host:port/database #初始化元数据库 airflow initdb #重置元数据库 airflow resetdb

5.安装登录模块

#安装password模块 pip install airflow[password] #在airflow的配置文件中修改需要认证 sudo vi $AIRFLOW_HOME/airflow.cfg [webserver] authenticate = True filter_by_owner = True auth_backend = airflow.contrib.auth.backends.password_auth

运行以下代码将用户名密码写入元数据库中

import airflow from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = \'quzhengpeng\' user.email = \'quzhengpeng@163.com\' user.password = \'quzhengpeng\' session = settings.Session() session.add(user) session.commit() session.close() exit()

5.启动守护进程

启动后台守护进程了之后,Airflow才能实时监控任务的调度情况。将任务脚本放到${AIRFLOW_HOME}/dags下在web UI 就能看到任务执行情况。

airflow scheduler

6.启动web服务

#启动web进程 airflow webserver -p 8080 #关闭CentOS6的防火墙 sudo service iptables stop #关闭CentOS6的SELinux setenforce 0 #关闭CentOS7的防火墙 systemctl stop firewalld.service #禁止firewall开机启动 systemctl disable firewalld.service

 

Celery+MySQL

#Celery文档 http://docs.jinkan.org/docs/celery/index.html #Celery4.0.0在airflow中有一些问题,所以安装Celery3 pip install -U Celery==3.1.24 pip install airflow[celery]

修改配置文件

vi airflow.cfg [core] executor = CeleryExecutor [celery] broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow

启动airflow

airflow webserver -p 8080 airflow scheduler #以非root用户运行 airflow worker #启动Celery WebUI 查看celery任务 airflow flower http://localhost:5555/

 

Celery+RabbitMQ

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm #安装RabbitMQ的依赖包 yum install erlang yum install socat #如果下载了rabbitmq的yum源 sudo yum install -y rabbitmq-server rpm -ivhrabbitmq-server-3.6.5-1.noarch.rpm

启动RabbitMQ服务

#启动rabbitmq服务 sudo service rabbitmq-server start #或者 sudo rabbitmq-server #添加-detached属性来让它在后台运行(注意:只有一个破折号) sudo rabbitmq-server -detached #设置开机启动rabbitmq服务 chkconfig rabbitmq-server on #永远不要用kill停止 RabbitMQ 服务器,而是应该用rabbitmqctl命令 sudo rabbitmqctl stop

设置RabbitMQ

#创建一个RabbitMQ用户 rabbitmqctl add_user airflow airflow #创建一个RabbitMQ虚拟主机 rabbitmqctl add_vhost vairflow #将这个用户赋予admin的角色 rabbitmqctl set_user_tags airflow admin #允许这个用户访问这个虚拟主机 rabbitmqctl set_permissions -p vairflow airflow ".*" ".*" ".*" # no usage rabbitmq-plugins enable rabbitmq_management

修改airflow配置文件支持Celery

vi $AIRFLOW_HOME/airflow/airflow.cfg #更改Executor为CeleryExecutor executor = CeleryExecutor #更改broker_url broker_url = amqp://airflow:airflow@localhost:5672/vairflow Format explanation: transport://userid:password@hostname:port/virtual_host #更改celery_result_backend celery_result_backend = amqp://airflow:airflow@localhost:5672/vairflow Format explanation: transport://userid:password@hostname:port/virtual_host

安装airflow的celery和rabbitmq模块

pip install airflow[celery] pip install airflow[rabbitmq]

 

airflow使用DAG(Directed Acyclic Graph,有向无环图为)来管理作业流的

#创建DAG from datetime import datetime, timedelta from airflow.models import DAG args = { \'owner\': \'airflow\', \'start_date\': seven_days_ago, \'email\': [\'airflow@airflow.com\'], \'email_on_failure\': True, \'email_on_retry\': True, \'retries\': 3, \'retries_delay\': timedelta(seconds=60), \'depends_on_past\': True } dag = DAG( dag_id=\'dag\', default_args=args, schedule_interval=\'0 0 * * *\', dagrun_timeout=timedelta(minutes=60) )

创建任务将任务添加到DAG中

from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator demo = DummyOperator( task_id=\'demo\', dag=dag ) last_execute = BashOperator( task_id=\'last_execute\', bash_command=\'echo 1\', dag=dag )

配置任务的依赖关系

demo.set_downstream(last_execute)

 

https://hub.docker.com/r/camil/airflow/

https://dwtobigdata.wordpress.com/2016/01/14/designing-workflow-with-airflow/

http://www.jianshu.com/p/59d69981658a

https://segmentfault.com/a/1190000005078547

http://www.tuicool.com/articles/A3yIri6

http://ju.outofmemory.cn/entry/245373

http://blog.csdn.net/permike/article/details/51898213

http://www.cnblogs.com/harrychinese/p/airflow.html

http://stackoverflow.com/questions/37785061/unable-to-start-airflow-worker-flower-and-need-clarification-on-airflow-architec?rq=1

http://stackoverflow.com/questions/19689510/celery-flower-security-in-production