定时任务管理

Posted by Hsz on May 10, 2017

定时任务

我们常有这样一种需求,每隔一段时间我们希望启动一个脚本让它实现一些信息收集工作并以此产生一些操作, 比如检查数据库中的数据量然后发送邮件,比如定期爬取数据保存进本地数据库等等.如果使用脚本语言比如python,node进行, 会要跑虚拟机,也就会额外占用资源,而且如果脚本多的话要管理起来也会比较麻烦.因此这种方案并不好用.

本文介绍两种定时任务的配置方案

  1. 使用linux工具crontab配置定时任务
  2. 使用Apache-airflow配置定时任务

使用linux工具crontab配置定时任务

crontab是定时任务的配置软件,可以运行在linux和mac osx下,其特点是执行简单,只需要命令行操作和修改配置文件就可以实现定时任务的配置.

crontab服务会有用户区分,要管理不同用户的定时任务,可以使用-u xxx来指定用户.

我们以系统自带的获取本地时间程序date来作为例子,看如何创建一个定时任务.

crontab定时任务配置文件

要创建定时任务必须配置crontab服务文件.使用crontab -e编辑当前用户的crontab服务文件(默认使用vim编辑)

crontab定时任务的配置文件语法

定时任务配置文件中一行即为一项任务.每项任务按分,时,日,月,周共5位来定时,取值范围如下:

小时 星期
0-59 0-23 1-31 1-12 0-6 (取值范围,0表示周日)

同时可以使用一些通配符定义定时逻辑

通配符 说明
* 代表取值范围内的数字,通常用来作为占位符
/n 代表”每”隔多久
n-m 代表从某个数字到某个数字,
, 分开几个离散的数字

比如我们定义每小时的0到30分之间,每隔2分钟运行一次,那么可以写成这样

0-30/2 * * * * command

我们要运行的是这样的一个任务:

*/2 * * * * /usr/bin/env date >> now.txt

它的意思是每2分钟记录下一条当前时间到now.txt文件夹

crontab定时任务的增删改查

  1. 启动crontab服务

    一般启动服务用 /sbin/service crond start 若是根用户的cron服务可以用 sudo service crond start,这里还是要注意下不同版本Linux系统启动的服务的命令也不同像我的虚拟机里只需用sudo service cron restart即可,若是在根用下直接键入service cron start就能启动服务

  2. 查看crontab服务是否已经运行

    ps -ax | grep cron

  3. 查看该用户下的crontab服务是否创建成功

    crontab -l

  4. 删除某个用户的cron服务

    crontab -r

使用Apache-airflow配置定时任务

airflow是apache基金会下的一个孵化产品,它的定位是一个工作流管理工具,它不止可以定义定时任务,更可以利用有向无环图来串联任务管理工作流.

详细的文档可以看Airflow中文文档

airflow有着以下天然优势:

  • 灵活易用,airflow由Python编写,可读性很强,且完全开源,如果真的需要我们甚至可以很容易的做到定制化扩展.
  • 功能强大,自带的Operators有15个以上,囊括了shell脚本,python程序,数据库操作,hive操作等方方面面的操作,即便使用这些原生的操作也已经足够完成大部分工作.
  • 优雅,作业的定义很简单明了,基于jinja模板引擎很容易做到脚本命令参数化,同时自带一个web端的管理界面,交互相当友好.
  • 极易扩展,airflow有多种执行器可供选择,其中CeleryExcutor使用了消息队列来编排多个工作节点(worker),可分布式部署多个 worker,只要消息中间件足够稳定,理论上airflow可以无限横向扩展
  • 丰富的命令工具,我们可以直接在终端敲命令就能完成测试->部署->运行->清理->重跑->追数等任务.

使用docker部署airflow

作为一个常驻的服务,我们还是使用docker对其进行部署可能更方便些.我们使用的镜像puckel/docker-airflow来部署.这边给出两种最常用excutor的部署方案

LocalExcutor

localExcutor适用于那种不需要做横向扩展的情况,一般就是单机部署,它的好处是相对更加轻量,对资源的消耗更低,也不需要依赖消息队列.适合业务较少的初期团队使用.

我们定义如下docker-compose.yml

version: '3.6'
services:
  postgres:
    image: postgres:latest
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

  webserver:
    image: puckel/docker-airflow:latest
    restart: always
    depends_on:
      - postgres
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
    volumes:
      - ./dags:/usr/local/airflow/dags
      # Uncomment to include custom plugins
      # - ./plugins:/usr/local/airflow/plugins
    ports:
      - 8080:8080
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

这种方式相对结构简单,只对外暴露一个8080端口,最好数据库不要放在docker中,而是放在外部,并做好容灾和读写分离.

CeleryExcutor

CeleryExcutor可以借助消息队列实现横向扩展,适合有大量错综复杂任务流,且必须使用集群的的情况.我们的部署方案针对docker swarm

部署的话我们定义如下docker-compose.yml

version: '3.6'
services:
  redis: #如果有外部的redis则可以不创建
    image: redis:latest
  broker: #如果有外部的rabbitmq则可以不创建
    image: rabbitmq:3.7-management

  postgres: #如果有外部的pg则可以不创建
    image: postgres:latest
    environment:
        - POSTGRES_USER=airflow
        - POSTGRES_PASSWORD=airflow
        - POSTGRES_DB=airflow

  airflow-webserver:
    image: puckel/docker-airflow:latest
    restart: always
    depends_on:
      - postgres
      - broker
      - redis
    environment:
      - LOAD_EX=n
      - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
      - EXECUTOR=Celery
      - AIRFLOW__CELERY__BROKER_URL=amqp://guest:guest@broker:5672/
      - AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres:3306/airflow
      # - POSTGRES_USER=airflow
      # - POSTGRES_PASSWORD=airflow
      # - POSTGRES_DB=airflow
      # - REDIS_PASSWORD=redispass
    volumes:
      - dags:/usr/local/airflow/dags
      # Uncomment to include custom plugins
      # - ./plugins:/usr/local/airflow/plugins
    ports:
      - 8080:8080
    networks:
      - net-output
    deploy:
      replicas: 1
      restart_policy:
        condition: on-failure
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

  flower:
    image: puckel/docker-airflow:latest
    restart: always
    depends_on:
      - broker
      - redis
    environment:
      - EXECUTOR=Celery
      - AIRFLOW__CELERY__BROKER_URL=amqp://guest:guest@broker:5672/
      - AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres:3306/airflow
      # - REDIS_PASSWORD=redispass
    networks:
      - net-output
    deploy:
      replicas: 1
      # restart_policy:
      #   condition: on-failure
    command: flower

  scheduler:
    image: puckel/docker-airflow:latest
    restart: always
    depends_on:
      - airflow-webserver
    volumes:
      - dags:/usr/local/airflow/dags
      # Uncomment to include custom plugins
      # - ./plugins:/usr/local/airflow/plugins
    environment:
      - LOAD_EX=n
      - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
      - EXECUTOR=Celery
      - AIRFLOW__CELERY__BROKER_URL=amqp://guest:guest@broker:5672/
      - AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres:3306/airflow
      # - POSTGRES_USER=airflow
      # - POSTGRES_PASSWORD=airflow
      # - POSTGRES_DB=airflow
      # - REDIS_PASSWORD=redispass
    deploy:
      replicas: 1
      # restart_policy:
      #   condition: on-failure
    command: scheduler

  worker:
    image: puckel/docker-airflow:latest
    restart: always
    depends_on:
      - scheduler
    volumes:
      - dags:/usr/local/airflow/dags
      # Uncomment to include custom plugins
      # - ./plugins:/usr/local/airflow/plugins
    environment:
      - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
      - EXECUTOR=Celery
      - AIRFLOW__CELERY__BROKER_URL=amqp://guest:guest@broker:5672/
      - AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres:3306/airflow
      # - POSTGRES_USER=airflow
      # - POSTGRES_PASSWORD=airflow
      # - POSTGRES_DB=airflow
      # - REDIS_PASSWORD=redispass
    deploy:
      replicas: 4
      # restart_policy:
      #   condition: on-failure
    command: worker

  networks:
    net-output:
      external: true
  volumes:
    dags:
      driver_opts:
      type: "nfs"
      o: "addr=10.40.0.199,nolock,soft,rw"
      device: ":/docker/dags"

这种方式我们需要借助nfs来在集群间共享一个文件夹,这个文件夹dags就是用于存放定义任务和工作流脚本的位置.

上面的例子中我们使用一个网络net-output来连接这个stack和反向代理,需要代理的端口有

  • flower:5555:flower的端口,用于监控celery
  • airflow-webserver:8080:airflow-webserver的端口,用于监控任务和工作流的情况

使用CeleryExcutor相对是比较复杂的,建议生产环境下将数据库,redis,以及消息队列这种有状态的服务都不要放在docker中,同时做好主备以及读写分离

配置airflow

配置airflow一般在$AIRFLOW_HOME目录中名为airflow.cfg的文件中配置,但在puckel/docker-airflow中我们可以通过设置环境变量来设置配置.其形式为:

AIRFLOW__{SECTION}__{key}={value}

常见的配置项有:

[core]
# 存放dags脚本文件的文件夹
dags_folder = /home/frappe/airflow/dags
# 保存log数据的文件夹
base_log_folder = /home/frappe/airflow/logs
# 插件的存放位置
plugins_folder = /home/frappe/airflow/plugins

# 保存meta数据的数据库连接
sql_alchemy_conn = mysql://airflow:xxx@airflow:3306/airflow
# sql_alchemy_conn配置的连接池大小
sql_alchemy_pool_size = 5
# sql_alchemy_conn连接回收时间
sql_alchemy_pool_recycle = 3600

# 最大并发数量
parallelism = 4
# 计划任务允许并发运行的任务实例数
dag_concurrency = 8
# dags在被创建时是否默认是暂停状态
dags_are_paused_at_creation = True
# 不指定pool时,任务在"default pool"中运行,其大小由此配置元素引导
non_pooled_task_slot_count = 128
# 每个DAG的最大活跃数
max_active_runs_per_dag = 16
# 是否加载示例DAG
load_examples = False
# 保存connections到db时password的加密秘钥
fernet_key = bNyVEIcfk-itUWnbErA9LaVbxH7eOqsOnuA=
# 是否不pickleDAGS
donot_pickle = False
# 在填充dagbag时,导入python文件要多长时间超时
dagbag_import_timeout = 30

[email]
email_backend = airflow.utils.email.send_email_smtp

[smtp]
#如果要设置邮箱用于发送一些提醒时可以用这些设置
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
smtp_user = airflow
smtp_port = 25
smtp_password = airflow
smtp_mail_from = airflow@airflow.com

[celery]
# 使用CeleryEXECUTOR时的设置
# celery app 名
celery_app_name = airflow.executors.celery_executor
# celery woeker的并发数
celeryd_concurrency = 16
# celery worker的龙服务器端口号
worker_log_server_port = 8793
# 消息队列中间件地址
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
result_backend = sqla+mysql://airflow:airflow@localhost:3306/airflow

RESTfulApi

airflow实验性的提供RESTful的api用于管理和调用DAGS,这些接口可以在web界面相同的host上访问,详细接口可以看官方文档

构造一个任务流

我们来构造一个简单的任务流,在airflow中有这么几个概念与任务流相关:

  • Operators算子/操作符,不同的Operator类实现了具体的功能,比如:
    • BashOperator: 可以执行用户指定的一个Bash命令
    • PythonOperator: 可以执行用户指定的一个python函数
    • EmailOperator: 可以进行邮件发送
    • SimpleHttpOperator: 发起一个http请求,并获取响应,基于requests
    • HiveOperator/MySqlOperator/MsSqlOperator/PostgresOperator:使用sql语句对数据库进行操作
    • DingdingOperator: 可以进行钉钉消息推送
  • Sensor: 感知器/触发器,一种特殊的Operator,可以定义触发条件和动作,在条件满足时执行某个动作.Airflow提供了更具体的Sensor,比如FileSensor,DatabaseSensor
  • Tasks: Operators的具体实例.
  • Task Instances: 一个Task的一次运行会产生一个实例对象
  • DAGS:有向无环图,包括一系列的tasks和tasks之间的链接关系,这用来构建一个工作流

由此可以看出使用Airflow的步骤就是定义以上概念的过程:

  1. 根据实际需要,使用不同的Operator,传入具体的参数,定义一系列的Tasks
  2. 定义Tasks间的关系,形成一个DAG
  3. 调度DAG运行,每个Task会行成一个Instance
  4. 使用命令行或者Web UI进行查看和管理

工作流定义使用python脚本一个典型的工作流定义文件为:

  • pyoper_test.py

    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.mysql_operator import MySqlOperator
    from datetime import datetime, timedelta
    
    
    default_args = {
        "owner": "airflow",
        "depends_on_past": False,
        "start_date": datetime(2019, 5, 6),
        "email": ["airflow@airflow.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        'end_date': datetime(2019, 5, 7)
    }
    
    
    def print_context():
        print('Whatever you return gets printed in the logs')
    
    
    with DAG("opra_test", default_args=default_args,schedule_interval='@daily') as dag:
      step1 = PythonOperator(
          task_id='print_the_context',
          provide_context=True,
          python_callable=print_context
      )
      sql = """
      DESC comicfeature_current
      """
      step2 = MySqlOperator(
          task_id='basic_mysql',
          sql=sql,
          mysql_conn_id="test_149")
    
    step1 >> step2
    

在1.8版本后,airflow也支持使用上下文管理器来管理DAG:

  from airflow import DAG
  from airflow.operators.python_operator import PythonOperator
  from airflow.operators.mysql_operator import MySqlOperator
  from datetime import datetime, timedelta


  default_args = {
      "owner": "airflow",
      "depends_on_past": False,
      "start_date": datetime(2019, 5, 6),
      "email": ["airflow@airflow.com"],
      "email_on_failure": False,
      "email_on_retry": False,
      "retries": 1,
      "retry_delay": timedelta(minutes=5),
      # 'queue': 'bash_queue',
      # 'pool': 'backfill',
      # 'priority_weight': 10,
      'end_date': datetime(2019, 5, 7)
  }


  def print_context():
      print('Whatever you return gets printed in the logs')


  dag = DAG("opra_test", default_args=default_args,schedule_interval='@daily')
  step1 = PythonOperator(
      task_id='print_the_context',
      provide_context=True,
      python_callable=print_context,
      dag=dag,
  )


  sql = """
  DESC comicfeature_current
  """
  step2 = MySqlOperator(
      task_id='basic_mysql',
      sql=sql,
      mysql_conn_id="test_149",
      dag=dag)

  step1 >> step2

这个文件在容器中放在/usr/local/airflow/dags文件夹下,所以我们才需要使用一个volumes来映射这个文件夹.

其中我们定义了两个串联的任务,第一个任务会打印一串字符串,第二个则是在mysql中执行一条sql语句.它们的执行顺序则由step1 >> step2指明,符号x>>y表示y是x的下游,y可以是一个Operator对象也可以是Operator对象为元素的列表,同理x<<y表明x是y的上游.

设置任务定时

airflow和定时相关的参数有

  • schedule_interval设定定时器
  • start_date: 流程开始调度的时间,可以早于或者晚于当前时间,值为datetime.datetime
  • end_data: 流程结束调度的时间,值为datetime.datetime
  • catch_up: 如果指定的开始时间早于当前时间且catch_up设置为True,那么airflow会把过去‘遗漏’的调度执行一遍,值为bool

其中schedule_interval支持的格式有如下几种:

  • cron experssion格式的字符串
  • timedelta(n)用于指定间隔
  • None,专门用于外部触发事件
  • @once,只执行一次
  • @hourly,每小时执行一次,只在每小时的0分0秒执行
  • @daily,每天执行一次,只在午夜执行
  • @weekly,每周执行一次,每周日午夜执行
  • @monthly,每个月执行一次,每月1日午夜执行
  • @yearly,每年执行一次,每年1月1日午夜执行

测试部署

进入有scheduler的容器,执行python /usr/local/airflow/dags/pyoper_test.py就可以测试工作流是否配置正确.

具体的命令是:

docker exec -it deploy-airflow_webserver_1 python /usr/local/airflow/dags/pyoper_test.py

而部署就更简单了,我们将脚本放在dags文件夹下后稍等一会儿刷新下页面就可以看到工作流了.需要注意下工作流命名不能重复.实在不行可以用uuid来命名,只是这样可维护性就差了.

airflow的元数据

连接配置

在上面的脚本中我们的数据库操作MySqlOperator需要使用一个连接来连对应的mysql,airflow中的所有连接都需要预先在web界面的admin->Connections中配置,并给连接一个独有的名字.

连接配置

全局变量配置

在页面admin->Variables中我们可以导入全局的变量,它支持键值对形式的数据,值也可以是json格式的文本,并且可以使用json格式的文件批量导入.

在DAG描述脚本中我们可以使用airflow.models.Variable.get(key,deserialize_json=False,default_var=None)获取定义好的值,其中deserialize_json表示将数据按json格式解析,default_var则定义在取不到key的情况下获得什么值.airflow.models.Variable是一个sqlalchemy的model.因此你可以按sqlalchemy的model的操作来操作它.

而如果是使用jinjia2模板则可以使用模板宏或者来取Variables中定义的值.

  • val_test.py
from datetime import datetime,timedelta
import random
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    "start_date": datetime(2019, 5, 7),
    'depends_on_past': False,
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'end_date': datetime(2019, 5, 9)
}

with DAG(
    dag_id="val_test", 
    catchup=False,
    default_args=default_args,
    schedule_interval=timedelta(seconds=10)) as dag:

    def say_hello(*args,**kwargs):
        test_value = Variable.get("test_key",default_var=None)
        print("hello test_value: "+ test_value)
    
    t1 = PythonOperator(task_id="say_hello_val",python_callable=say_hello,provide_context=True)

XCOM配置

XCOM是更加通用的变量传递方式,其和variables类似,可以在页面admin->XCOM中定义,像PythonOperator的返回值默认会生成一个taskid与对应taskid一致,且key为return_value,xcom不光PythonOperator可以使用,多数操作符都可以使用,最通用的方法是使用jinja2的模板配合airflow中的宏ti.xcom_pull(task_ids='run_task_1')来实现

不同的Operator对xcom的支持程度和支持方式并不相同,具体的可以看相关文档,下面会介绍几个最常见的Operator如何支持XCOM

  • BashOperator

BashOperator的输入是一条bash命令的字符串,因此可以直接使用jinja2模板动态的生成;而输出则需要在参数中设置xcom_pushTrue,这样命令的最后一行写在stdout中的内容就会被存入xcom中对应task_id的return_value字段中

  • PythonOperator

PythonOperator应该是最灵活的Operator,它除了可以像上面一样让参数使用模本来获取上一步的结果外,还可以直接使用context['task_instance'].xcom_pull(task_ids='say_hello')来拉取结果,context是PythonOperator默认会传入的参数.我们在定义函数时这样定义其签名即可:def echo_hello(**context),PythonOperator的返回值会直接传入对应task_id的xcom中,如果要额外定义,则可以使用context['task_instance'].xcom_push(key='value from pusher 1', value=value_1)来实现.

  • SimpleHttpOperator

BashOperator类似,通过定义xcom_pushTrue将response结果放入xcom

  • MySqlOperator等数据库相关操作

这类操作符是执行sql语句的,而sql语句可以使用jinja2模板生成,这也就是说可以使用宏ti.xcom_pull(task_ids='run_task_1')来实现输入,但很遗憾这类操作都无法定义输出.

使用jinja2作为模板

airflow支持使用jinja2作为模板引擎,配合自带的宏实现动态的命令生成等操作,jinja2语法可以去其官网学习.

常用的Operator示例

下面介绍几个常见的操作类型

PythonOperator

python操作符的签名为airflow.operators.python_operator.PythonOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)

  • python_test.py
from datetime import datetime,timedelta
import random
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    "start_date": datetime(2019, 5, 7),
    'depends_on_past': False,
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'end_date': datetime(2019, 5, 9)
}

with DAG(
    dag_id="pythonop_test", 
    catchup=False,
    default_args=default_args,
    schedule_interval=timedelta(seconds=10)) as dag:

    def say_hello(*args,**kwargs):
        print("Hello, guys! Your are the best!")
        return round(random.random(),2)

    def echo_hello(**context):
        var = str(context['task_instance'].xcom_pull(task_ids='say_hello'))
        print("echo "+ var)
        if float(var) > 0.5:
            return True
        else:
            return False
    
    t1 = PythonOperator(task_id="say_hello",python_callable=say_hello,provide_context=True)
    t2 = PythonOperator(task_id="echo_hello", python_callable=echo_hello,provide_context=True)

t2.set_upstream(t1)

PythonOperator中的函数只要有返回值就会自动将返回值放入对应task_idkeyreturn_value的XCOM中. 上面例子中我们使用context['task_instance'].xcom_pull(task_ids='say_hello')在下游取出上游的返回值,然后再对返回值做出判断

BashOperator

直接执行bash操作的操作符,其签名为:BashOperator(bash_command, xcom_push=False, env=None, output_encoding='utf-8', *args, **kwargs),

  • bash_test.py
from datetime import datetime,timedelta
import random
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    "start_date": datetime(2019, 5, 7),
    'depends_on_past': False,
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'end_date': datetime(2019, 5, 9)
}

with DAG(
    dag_id="bashop_test",
    catchup=False,
    default_args=default_args,
    schedule_interval=timedelta(seconds=10)) as dag:

    t1 = BashOperator(task_id="say_name",bash_command="echo hsz",xcom_push=True )
    t2 = BashOperator(task_id="say_hello",bash_command="echo hello ")

t2.set_upstream(t1)

复杂流程控制

我们的工作流往往不会是简单的一条路走到底,更多的时候我们需要根据一些条件来判断是否要执行一些任务,如何执行一些任务.

使用DummyOperator标记节点

DummyOperator什么都不会做,它一般用于标记节点,比如我们要标记任务开始,那就可以定义一个DummyOperator,其id为”start”

使用BranchPythonOperator控制执行分支

我们可以使用PythonOperator的子类BranchPythonOperator来控制流程,它要求返回一个下游task_id为内容的字符串,这样它就会跳过其他的task分支选择以返回值为分支名的分支执行.

一个完整的例子

  • complex_test.py
from datetime import datetime,timedelta
import random
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator

default_args = {
    'owner': 'airflow',
    "start_date": datetime(2019, 5, 13),
    'depends_on_past': False,
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'end_date': datetime(2019, 5, 14)
}

with DAG(
    dag_id="bashop_test",
    catchup=False,
    default_args=default_args,
    schedule_interval=timedelta(seconds=10)) as dag:
    start = DummyOperator(task_id="start")

    def random_choice():
        options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']
        return random.choice(options)
    branching = BranchPythonOperator(task_id="random_choice",python_callable=random_choice)
    start >> branching

    end = DummyOperator(task_id="end")

    for option in ['branch_a', 'branch_b', 'branch_c', 'branch_d']:
        t1 = DummyOperator(
            task_id=option
        )
        t2 = BashOperator(task_id="say_hello_"+option,bash_command="echo hello "+option)
        branching >> t1 >> t2 >> end
#t2.set_upstream(t1)