Airflow: transmettre une valeur dynamique à L'opérateur Sub DAG

je suis nouveau à Airflow.

J'ai rencontré un scénario, où Dag Parent doit passer un certain nombre dynamique (disons n ) à Sub dag.

Où as SubDAG utilisera ce nombre pour créer dynamiquement des tâches parallèles n .

La documentation

ne couvre aucun moyen d'y parvenir. J'ai donc exploré plusieurs voies :

Option-1 (utilisant XCOM Pull)

j'ai essayé de passer comme valeur xcom, mais pour une raison quelconque SubDAG ne se résout pas à la valeur passée.

Fichier Parent Dag

def load_dag(**kwargs):
    number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
    dag_data = json.dumps({
        "number_of_runs": number_of_runs
    })
    return dag_data

# ------------------ Tasks ------------------------------
load_config = PythonOperator(
    task_id='load_config',
    provide_context=True,
    python_callable=load_dag,
    dag=dag)


t1 = SubDagOperator(
    task_id=CHILD_DAG_NAME,
    subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
    default_args=default_args,
    dag=dag,
)

Sous Dag Fichier

def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval=None)

    variabe_names = {}

    for i in range(num_of_runs):
        variabe_names['task' + str(i + 1)] =  DummyOperator(
        task_id='dummy_task',
        dag=dag_subdag,
    )

    return dag_subdag

Option-2

j'ai également essayé de passer number_of_runs comme une variable globale, qui ne fonctionnait pas.

Option-3

Aussi, nous avons essayé d'écrire cette valeur à un fichier de données. Mais sub DAG lance File doesn't exist error . C'est peut-être parce que nous générons dynamiquement ce fichier.

Peut-on m'aider avec cela.

4
demandé sur Maneesh Sharma 2017-06-05 12:25:10

4 réponses

Je l'ai fait avec L'Option 3. La clé est de retourner un dag valide sans tâches, si le fichier n'existe pas. Ainsi load_config générera un fichier avec votre nombre de tâches ou plus d'informations si nécessaire. Votre usine ressemblerait à quelque chose comme:

def subdag(...):
    sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1))
    file_path = "/path/to/generated/file"
    if os.path.exists(file_path):
        data_file = open(file_path)
        list_tasks = data_file.readlines()
        for task in list_tasks:
            DummyOperator(
                  task_id='task_'+task,
                  default_args=args,
                  dag=sdag,
            )
    return sdag

à la génération dag vous verrez un sous-DAG sans tâches. Lors de l'exécution dag, une fois que load_config est terminé, vous pouvez voir que vous avez généré dynamiquement un sous-dag

2
répondu Jaime 2017-09-21 05:56:31

L'Option 1 devrait fonctionner si vous changez simplement l'appel à xcom_pull pour inclure le dag_id du dag parent. Par défaut, l'appel xcom_pull cherchera le task_id 'load_config' dans son propre dag qui n'existe pas.

changez donc la macro d'appel x_com en:

subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config', dag_id='" + PARENT_DAG_NAME + "' }}'" ),
0
répondu randal25 2018-08-02 11:06:50

si le nom du fichier vers lequel vous écrivez n'est pas dynamique (par exemple, vous écrivez sur le même fichier encore et encore pour chaque instance de tâche), la réponse de Jaime fonctionnera:

file_path = "/path/to/generated/file"

mais si vous avez besoin d'un nom de fichier unique ou voulez un contenu différent écrit au fichier par chaque instance de tâche pour les tâches exécutées en parallèle, airflow ne fonctionnera pas pour ce cas, car il n'y a aucun moyen de passer la date d'exécution ou la variable en dehors d'un modèle. Jetez un oeil à ce post .

0
répondu Mark Matthews 2018-08-22 06:51:57

regardez ma réponse ici , dans laquelle je décris une façon de créer une tâche dynamiquement basée sur les résultats d'une tâche précédemment exécutée en utilisant xcoms et sous-dags.

0
répondu Christopher Beck 2018-08-23 03:16:55