Comment créer une tâche conditionnelle dans Airflow

je voudrais créer une tâche conditionnelle dans Airflow comme décrit dans le schéma ci-dessous. Le scénario attendu est le suivant:

  • Tâche 1 exécute
  • si la tâche 1 réussit, alors exécutez la tâche 2a
  • Sinon si la tâche 1 échoue, alors exécutez la tâche 2b
  • enfin exécuter la tâche 3

Conditional Task Toutes les tâches ci-dessus sont Sshexecuteopérator. Je suppose que je devrais utiliser le ShortCircuitOperator et / ou XCom pour gérer la condition, mais je ne suis pas clair sur la façon de mettre en œuvre cela. Pourriez-vous décrire la solution?

29
demandé sur Alexis.Rolland 2017-04-28 13:49:32

2 réponses

Vous devez utiliser débit d'air des règles de déclenchement

tous les opérateurs ont un argument trigger_rule qui définit la règle par laquelle la tâche générée est déclenchée.

La règle de déclencheur possibilités:

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'

Ici, c'est l'idée pour résoudre votre problème:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)

task_1 = SSHExecuteOperator(
        task_id='task_1',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2 = SSHExecuteOperator(
        task_id='conditional_task',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2a = SSHExecuteOperator(
        task_id='task_2a',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2b = SSHExecuteOperator(
        task_id='task_2b',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_FAILED,
        ssh_hook=sshHook,
        dag=dag)

task_3 = SSHExecuteOperator(
        task_id='task_3',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)


task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
23
répondu Jean S 2017-04-28 12:28:25

Airflow a un BranchPythonOperator qui peut être utilisé pour exprimer la dépendance de branchement plus directement.

docs décrire son utilisation:

le BranchPythonOperator est un peu comme le PythonOperator sauf qu'il attend un python_callable qui renvoie un task_id. Le task_id retourné est suivi, et tous les autres chemins sont ignorés. Le task_id retourné par la fonction Python doit renvoyer une tâche directement en aval de la tâche BranchPythonOperator.

...

si vous voulez sauter quelques tâches, gardez à l'esprit que vous ne pouvez pas avoir un chemin vide, si donc faire une tâche factice.

enter image description here

25
répondu villasv 2017-10-08 15:50:38