Airflow: transmettre une valeur dynamique à L'opérateur Sub DAG
je suis nouveau à Airflow.
J'ai rencontré un scénario, où Dag Parent doit passer un certain nombre dynamique (disons n
) à Sub dag.
Où as SubDAG utilisera ce nombre pour créer dynamiquement des tâches parallèles n
.
ne couvre aucun moyen d'y parvenir. J'ai donc exploré plusieurs voies :
Option-1 (utilisant XCOM Pull)
j'ai essayé de passer comme valeur xcom, mais pour une raison quelconque SubDAG ne se résout pas à la valeur passée.
Fichier Parent Dag
def load_dag(**kwargs):
number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
dag_data = json.dumps({
"number_of_runs": number_of_runs
})
return dag_data
# ------------------ Tasks ------------------------------
load_config = PythonOperator(
task_id='load_config',
provide_context=True,
python_callable=load_dag,
dag=dag)
t1 = SubDagOperator(
task_id=CHILD_DAG_NAME,
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
default_args=default_args,
dag=dag,
)
Sous Dag Fichier
def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval=None)
variabe_names = {}
for i in range(num_of_runs):
variabe_names['task' + str(i + 1)] = DummyOperator(
task_id='dummy_task',
dag=dag_subdag,
)
return dag_subdag
Option-2
j'ai également essayé de passer number_of_runs
comme une variable globale, qui ne fonctionnait pas.
Option-3
Aussi, nous avons essayé d'écrire cette valeur à un fichier de données. Mais sub DAG lance File doesn't exist error
. C'est peut-être parce que nous générons dynamiquement ce fichier.
Peut-on m'aider avec cela.
4 réponses
Je l'ai fait avec L'Option 3. La clé est de retourner un dag valide sans tâches, si le fichier n'existe pas. Ainsi load_config générera un fichier avec votre nombre de tâches ou plus d'informations si nécessaire. Votre usine ressemblerait à quelque chose comme:
def subdag(...):
sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1))
file_path = "/path/to/generated/file"
if os.path.exists(file_path):
data_file = open(file_path)
list_tasks = data_file.readlines()
for task in list_tasks:
DummyOperator(
task_id='task_'+task,
default_args=args,
dag=sdag,
)
return sdag
à la génération dag vous verrez un sous-DAG sans tâches. Lors de l'exécution dag, une fois que load_config est terminé, vous pouvez voir que vous avez généré dynamiquement un sous-dag
L'Option 1 devrait fonctionner si vous changez simplement l'appel à xcom_pull
pour inclure le dag_id
du dag parent. Par défaut, l'appel xcom_pull
cherchera le task_id
'load_config'
dans son propre dag qui n'existe pas.
changez donc la macro d'appel x_com en:
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config', dag_id='" + PARENT_DAG_NAME + "' }}'" ),
si le nom du fichier vers lequel vous écrivez n'est pas dynamique (par exemple, vous écrivez sur le même fichier encore et encore pour chaque instance de tâche), la réponse de Jaime fonctionnera:
file_path = "/path/to/generated/file"
mais si vous avez besoin d'un nom de fichier unique ou voulez un contenu différent écrit au fichier par chaque instance de tâche pour les tâches exécutées en parallèle, airflow ne fonctionnera pas pour ce cas, car il n'y a aucun moyen de passer la date d'exécution ou la variable en dehors d'un modèle. Jetez un oeil à ce post .
regardez ma réponse ici , dans laquelle je décris une façon de créer une tâche dynamiquement basée sur les résultats d'une tâche précédemment exécutée en utilisant xcoms et sous-dags.