Bonne façon de créer des flux de travail dynamiques dans le flux D'Air

problème

est - il possible dans Airflow de créer un flux de travail tel que le nombre de tâches B.* est inconnu jusqu'à l'achèvement de la tâche A? J'ai regardé les sous-dags mais il semble qu'il ne peut fonctionner qu'avec un ensemble statique de tâches qui doivent être déterminées à la création de Dag.

est-ce que dag déclenchera le travail? Et si oui, pourriez-vous donner un exemple.

j'ai un problème où il est impossible de connaître le nombre de tâches B cela sera nécessaire pour calculer la tâche C jusqu'à ce que la tâche a soit terminée. Chaque tâche B.* prend plusieurs heures à calculer et ne peut être combinée.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

idée #1

Je n'aime pas cette solution parce que je dois créer un masque externe de blocage et toute la tâche B.* prendra entre 2-24 heures à accomplir. Je ne considère donc pas cela comme une solution viable. Il y a sûrement un moyen plus facile? Ou Airflow n'a-t-il pas été conçu pour cela?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Edit 1:

à ce jour, cette question n'a toujours pas une grande réponse . J'ai été contacté par plusieurs personnes à la recherche d'une solution.

35
demandé sur costrouc 2017-01-07 07:32:52

4 réponses

Voici comment je l'ai fait avec une requête similaire sans aucune sous-étiquette:

créez D'abord une méthode qui retourne les valeurs que vous voulez

def values_function():
     return values

méthode de création suivante qui générera les emplois dynamiquement:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

et puis les combiner:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete
10
répondu Oleg Yamin 2017-01-13 02:37:07

j'ai trouvé un moyen de créer des workflows basés sur le résultat des tâches précédentes.

Fondamentalement, ce que vous voulez faire est d'avoir deux sous-Tags avec ce qui suit:

  1. Xcom pousser une liste (ou ce que jamais vous avez besoin pour créer la dynamique de flux de travail plus tard) dans le subdag qui est exécutée en premier (cf. test1.py def return_list() )
  2. Passer le principal dag objet en tant que paramètre à votre deuxième subdag
  3. Maintenant, si vous avez le principal objet dag, vous pouvez l'utiliser pour obtenir une liste de ses tâches instances. À partir de cette liste d'instances de tâches, vous pouvez filtrer une tâche de l'exécution courante en utilisant parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1] ), on pourrait probablement ajouter plus de filtres ici.
  4. avec cette instance de tâche, vous pouvez utiliser XCOM pull pour obtenir la valeur dont vous avez besoin en spécifiant le dag_id à celui du premier sous-dag: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. utilisez la liste / valeur pour créer vos tâches dynamiquement

maintenant j'ai testé cela dans mon installation locale de flux d'air et cela fonctionne très bien. Je ne sais pas si la partie XCOM pull aura des problèmes s'il y a plus d'une instance du dag tournant en même temps, mais alors vous devrez probablement utiliser une clé unique ou quelque chose comme ça pour identifier de façon unique la valeur xcom que vous voulez. On pourrait probablement optimiser les 3. étape pour être sûr à 100% pour obtenir une tâche spécifique de l'actuel de dag, mais pour mon utilisation c' fonctionne assez bien, je pense qu'on n'a besoin que d'un seul objet task_instance pour utiliser xcom_pull.

je nettoie aussi les xcoms pour le premier subdag avant chaque exécution, juste pour m'assurer que je n'obtiens pas accidentellement une mauvaise valeur.

je suis assez mauvais pour expliquer, donc j'espère que le code suivant fera tout clair:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

et le workflow principal:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2
2
répondu Christopher Beck 2018-08-23 03:23:29

OA: "Est-il un chemin dans la circulation de l'Air pour créer un flux de travail tels que le nombre de tâches B.* est inconnue jusqu'à la fin de la Tâche?"

réponse Courte est non. Airflow va construire le flux de DAG avant de commencer à le faire fonctionner.

Qui dit que nous sommes arrivés à une conclusion simple, c'est que nous n'avons pas besoin. Quand vous voulez pour paralléliser peu de travail, vous devez évaluer les ressources disponibles et non pas le nombre d'éléments à traiter.

Nous l'avons fait: nous générer dynamiquement un nombre fixe de tâches, disons, 10, qui va diviser le travail. Par exemple, si nous devons traiter 100 fichiers, chaque tâche en traitera 10. Je posterai le code plus tard dans la journée.

1
répondu Ena 2018-06-14 13:44:10

j'ai trouvé cette Moyen post , qui est très similaire à cette question. Cependant, il est plein de fautes de frappe, et ne fonctionne pas lorsque j'ai essayé de le mettre en œuvre.

ma réponse à ce qui précède est la suivante:

si vous créez des tâches de façon dynamique, vous devez le faire en itérant sur quelque chose qui n'est pas créé par une tâche en amont, ou qui peut être défini indépendamment de cette tâche. j'ai appris que vous ne pouvez pas passer les dates d'exécution ou d'autres variables de flux d'air à quelque chose en dehors d'un modèle (par exemple, une tâche) comme beaucoup d'autres l'ont souligné auparavant. Voir aussi ce post .

0
répondu Mark Matthews 2018-08-21 14:49:51