date d'exécution dans airflow: besoin d'accéder en tant que variable

je suis vraiment un internaute novice dans ce forum. Mais je joue avec airflow, depuis un moment, pour notre compagnie. Désolé si cette question semble vraiment stupide.

j'écris un pipeline en utilisant un tas de BashOperators. Fondamentalement, pour chaque tâche, je veux simplement appeler une api REST en utilisant 'curl'

voici à quoi ressemble mon pipeline (version très simplifiée):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

Si vous remarquez que je suis en train de faire current_datetime= datetime_obj.now(tz=tz.tzlocal()) Au lieu de ce que je veux, ici, 'execution_date'

comment utiliser 'execution_date' directement et l'assigner à une variable dans mon fichier python?

j'ai ce problème général d'accès à l'args. Toute aide sera vraiment appréciée.

Merci

28
demandé sur Roger 2016-04-20 01:36:39

5 réponses

BashOperatorbash_command argument est un template. Vous pouvez accéder à execution_date dans n'importe quel modèle comme un datetimeobjetexecution_date variable. Dans le modèle, vous pouvez utiliser n'importe quel jinja2 méthodes pour les manipuler.

en utilisant ce qui suit comme BashOperator bash_commandchaîne:

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

si vous voulez juste l'équivalent string de la date d'exécution,ds retournera un datastamp (AAAA-MM-JJ), ds_nodash retourne la même chose sans tirets (AAAAMMJJ), etc. Plus sur macros est disponible dans les Api Docs.


votre opérateur final ressemblerait à:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
21
répondu Erik Schuchmann 2018-08-01 07:39:23

le constructeur PythonOperator prend un paramètre 'provide_context' (voir https://pythonhosted.org/airflow/code.html). Si C'est vrai, alors il passe un certain nombre de paramètres dans python_callable via kwargs. kwargs['execution_date'] est ce que vous voulez, je crois.

quelque Chose comme ceci:

def python_method(ds, **kwargs):
    Variable.set('execution_date', kwargs['execution_date'])
    return

doit = PythonOperator(
    task_id='doit',
    provide_context=True,
    python_callable=python_method,
    dag=dag)

Je ne suis pas sûr de savoir comment le faire avec le BashOperator, mais vous pourriez commencer par ce numéro: https://github.com/airbnb/airflow/issues/775

19
répondu Ziggy Eunicien 2016-04-20 20:50:15

je pense que vous ne pouvez pas assigner des variables avec des valeurs du contexte airflow en dehors d'une instance de tâche, elles ne sont disponibles qu'à l'exécution. Fondamentalement, il y a 2 étapes différentes quand un dag est chargé et exécuté dans airflow :

  • tout d'abord votre fichier dag est interprété et analysé. Il doit fonctionner et compiler et les définitions de tâches doivent être correctes (aucune erreur de syntaxe ou autre). Pendant cette étape, si vous faites des appels de fonction pour remplir certaines valeurs, ces fonctions ne seront pas être capable d'accéder au contexte airflow (la date d'exécution par exemple, encore plus si vous faites du remplissage).

  • la deuxième étape est l'exécution du dag. Ce n'est que lors de cette deuxième étape que les variables fournies par le flux d'air (execution_date, ds, etc...) sont disponibles car elles sont liées à une exécution de la dag.

donc vous ne pouvez pas initialiser les variables globales en utilisant le contexte Airflow, cependant, Airflow vous donne plusieurs mécanismes pour atteindre le même effet :

  1. utiliser Jinja template dans votre commande (cela peut être dans une chaîne de caractères dans le code ou dans un fichier, les deux seront traités). Vous avez la liste des modèles disponibles ici:https://airflow.apache.org/code.html#default-variables. Notez que certaines fonctions sont également disponibles, en particulier pour le formatage des jours de calcul delta et date.

  2. utiliser un PythonOperator dans lequel vous passez le contexte (avec le provide_context argument.) Cela vous permettra d'accéder à un même modèle avec la syntaxe kwargs['<variable_name']. Si vous en avez besoin, vous pouvez retourner une valeur d'un PythonOperator, celle-ci sera stockée dans une variable XCOM que vous pouvez utiliser plus tard dans n'importe quel modèle. L'accès aux variables XCOM utilise cette syntaxe:https://airflow.apache.org/concepts.html#xcoms

  3. si vous écrivez votre propre opérateur, vous pouvez accéder aux variables airflow avec le dict context.

7
répondu Babcool 2017-12-12 08:45:57
def execute(self, context):
    execution_date = context.get("execution_date")

ceci doit être dans la méthode execute () de L'opérateur

4
répondu l0n3r4ng3r 2017-11-29 01:04:51

Le execution_date, (datetime.datetime)

 {{ execution_date }}
0
répondu 田咖啡 2017-07-23 03:56:13