Comment créer une tâche conditionnelle dans Airflow
je voudrais créer une tâche conditionnelle dans Airflow comme décrit dans le schéma ci-dessous. Le scénario attendu est le suivant:
- Tâche 1 exécute
- si la tâche 1 réussit, alors exécutez la tâche 2a
- Sinon si la tâche 1 échoue, alors exécutez la tâche 2b
- enfin exécuter la tâche 3
Toutes les tâches ci-dessus sont Sshexecuteopérator. Je suppose que je devrais utiliser le ShortCircuitOperator et / ou XCom pour gérer la condition, mais je ne suis pas clair sur la façon de mettre en œuvre cela. Pourriez-vous décrire la solution?
2 réponses
Vous devez utiliser débit d'air des règles de déclenchement
tous les opérateurs ont un argument trigger_rule qui définit la règle par laquelle la tâche générée est déclenchée.
La règle de déclencheur possibilités:
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
Ici, c'est l'idée pour résoudre votre problème:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
Airflow a un BranchPythonOperator qui peut être utilisé pour exprimer la dépendance de branchement plus directement.
docs décrire son utilisation:
le BranchPythonOperator est un peu comme le PythonOperator sauf qu'il attend un python_callable qui renvoie un task_id. Le task_id retourné est suivi, et tous les autres chemins sont ignorés. Le task_id retourné par la fonction Python doit renvoyer une tâche directement en aval de la tâche BranchPythonOperator.
...
si vous voulez sauter quelques tâches, gardez à l'esprit que vous ne pouvez pas avoir un chemin vide, si donc faire une tâche factice.