date d'exécution dans airflow: besoin d'accéder en tant que variable
je suis vraiment un internaute novice dans ce forum. Mais je joue avec airflow, depuis un moment, pour notre compagnie. Désolé si cette question semble vraiment stupide.
j'écris un pipeline en utilisant un tas de BashOperators. Fondamentalement, pour chaque tâche, je veux simplement appeler une api REST en utilisant 'curl'
voici à quoi ressemble mon pipeline (version très simplifiée):
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime
datetime_obj = datetime.datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
'email': ['xxxx@xxx.xxx'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
}
current_datetime = datetime_obj.now(tz=tz.tzlocal())
dag = DAG(
'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))
curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'
t1 = BashOperator(
task_id='rest-api-1',
bash_command=curl_cmd,
dag=dag)
Si vous remarquez que je suis en train de faire current_datetime= datetime_obj.now(tz=tz.tzlocal())
Au lieu de ce que je veux, ici, 'execution_date'
comment utiliser 'execution_date' directement et l'assigner à une variable dans mon fichier python?
j'ai ce problème général d'accès à l'args. Toute aide sera vraiment appréciée.
Merci
5 réponses
BashOperator
bash_command
argument est un template. Vous pouvez accéder à execution_date
dans n'importe quel modèle comme un datetime
objetexecution_date
variable. Dans le modèle, vous pouvez utiliser n'importe quel jinja2
méthodes pour les manipuler.
en utilisant ce qui suit comme BashOperator
bash_command
chaîne:
# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}
# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
si vous voulez juste l'équivalent string de la date d'exécution,ds
retournera un datastamp (AAAA-MM-JJ), ds_nodash
retourne la même chose sans tirets (AAAAMMJJ), etc. Plus sur macros
est disponible dans les Api Docs.
votre opérateur final ressemblerait à:
command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
le constructeur PythonOperator prend un paramètre 'provide_context' (voir https://pythonhosted.org/airflow/code.html). Si C'est vrai, alors il passe un certain nombre de paramètres dans python_callable via kwargs. kwargs['execution_date'] est ce que vous voulez, je crois.
quelque Chose comme ceci:
def python_method(ds, **kwargs):
Variable.set('execution_date', kwargs['execution_date'])
return
doit = PythonOperator(
task_id='doit',
provide_context=True,
python_callable=python_method,
dag=dag)
Je ne suis pas sûr de savoir comment le faire avec le BashOperator, mais vous pourriez commencer par ce numéro: https://github.com/airbnb/airflow/issues/775
je pense que vous ne pouvez pas assigner des variables avec des valeurs du contexte airflow en dehors d'une instance de tâche, elles ne sont disponibles qu'à l'exécution. Fondamentalement, il y a 2 étapes différentes quand un dag est chargé et exécuté dans airflow :
tout d'abord votre fichier dag est interprété et analysé. Il doit fonctionner et compiler et les définitions de tâches doivent être correctes (aucune erreur de syntaxe ou autre). Pendant cette étape, si vous faites des appels de fonction pour remplir certaines valeurs, ces fonctions ne seront pas être capable d'accéder au contexte airflow (la date d'exécution par exemple, encore plus si vous faites du remplissage).
la deuxième étape est l'exécution du dag. Ce n'est que lors de cette deuxième étape que les variables fournies par le flux d'air (
execution_date, ds, etc...
) sont disponibles car elles sont liées à une exécution de la dag.
donc vous ne pouvez pas initialiser les variables globales en utilisant le contexte Airflow, cependant, Airflow vous donne plusieurs mécanismes pour atteindre le même effet :
utiliser Jinja template dans votre commande (cela peut être dans une chaîne de caractères dans le code ou dans un fichier, les deux seront traités). Vous avez la liste des modèles disponibles ici:https://airflow.apache.org/code.html#default-variables. Notez que certaines fonctions sont également disponibles, en particulier pour le formatage des jours de calcul delta et date.
utiliser un PythonOperator dans lequel vous passez le contexte (avec le
provide_context
argument.) Cela vous permettra d'accéder à un même modèle avec la syntaxekwargs['<variable_name']
. Si vous en avez besoin, vous pouvez retourner une valeur d'un PythonOperator, celle-ci sera stockée dans une variable XCOM que vous pouvez utiliser plus tard dans n'importe quel modèle. L'accès aux variables XCOM utilise cette syntaxe:https://airflow.apache.org/concepts.html#xcomssi vous écrivez votre propre opérateur, vous pouvez accéder aux variables airflow avec le dict
context
.
def execute(self, context):
execution_date = context.get("execution_date")
ceci doit être dans la méthode execute () de L'opérateur