mise en place de s3 pour les logs dans le flux d'air

j'utilise docker-compose pour mettre en place un cluster d'écoulement d'air évolutif. J'ai basé mon approche sur ce fichier Dockerfile https://hub.docker.com/r/puckel/docker-airflow/

mon problème est de configurer les logs pour écrire/lire à partir de s3. Quand un dag a terminé, je reçois une erreur comme celle-ci

*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.

*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00

j'ai créé une nouvelle section dans le fichier airflow.cfg comme ceci

[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx

et puis spécifié le chemin s3 dans la section "remote logs" dans airflow.cfg

remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn

est-ce que j'ai mis ça en place correctement et il y a un bug? Est-il une recette pour le succès ici que je suis absent?

-- mise à Jour

j'ai essayé D'exporter dans les formats URI et JSON et ni l'un ni l'autre ne semblait fonctionner. J'ai ensuite exporté la clé aws_access_key_id et aws_secret_access_key, puis airflow a commencé à la récupérer. Maintenant, je reçois son erreur dans le travailleur journaux

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00

-- mise à Jour

j'ai aussi trouvé ce lien https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html

j'ai alors shelled dans l'une de mes machines de travail (séparé du serveur web et de l'ordonnanceur) et ai exécuté ce morceau de code en python

import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))

je reçois cette erreur.

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

j'ai essayé d'exporter plusieurs types différents de AIRFLOW_CONN_ envs comme expliqué ici dans la section Connexions https://airflow.incubator.apache.org/concepts.html et par d'autres réponses à cette question.

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

j'ai aussi exporté AWS_ACCESS_KEY_ID et AWS_SECRET_ACCESS_KEY sans succès.

ces justificatifs d'identité sont stockés dans une base de données donc une fois que je les ajoute dans L'IU, ils devraient être récupérés par les travailleurs mais ils ne sont pas capable d'écrire/lire les journaux pour une raison quelconque.

26
demandé sur JackStat 2017-06-27 15:49:26

5 réponses

vous devez configurer la connexion s3 à travers L'interface utilisateur airflow. Pour cela, vous devez aller dans L'onglet Admin -> Connections de airflow UI et créer une nouvelle ligne pour votre connexion S3.

un exemple de configuration serait:

Conn Id: my_conn_S3

Type Conn: S3

: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

13
répondu Him 2017-06-28 07:33:29

NOTE: en date du flux D'Air 1.9.0 l'enregistrement à distance a été considérablement modifié . Il existe des plans pour faciliter la journalisation à l'avenir - par exemple autodetect cloud provider à partir d'une chaîne de seau. Ces changements ne sont pas encore en cours, mais surveillez de près les versions. Si vous utilisez 1.9.0, lire sur.

référence ici

Instructions Complètes:

  1. créez un répertoire pour stocker les configs et placez-le de sorte qu'il puisse être trouvé dans PYTHONPATH. Un exemple est $AIRFLOW_HOME / config

  2. créer des fichiers vides appelés $AIRFLOW_HOME/config/log_config.py et $ $ AIRFLOW_HOME/config/__init__.py

  3. copier le contenu de airflow/config_templates/airflow_local_settings.py dans le log_config.py dossier qui a été créé dans l'étape ci-dessus.

  4. personnaliser les parties suivantes du modèle:

    #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
    
  5. assurez-vous qu'un crochet de connexion s3 a été défini dans Airflow, conformément à la réponse ci-dessus . Le crochet devrait avoir accès en lecture et en écriture au seau s3 défini ci-dessus dans S3_LOG_FOLDER.

  6. mise à Jour $AIRFLOW_HOME / airflow.cfg pour contenir:

    task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>
    
  7. redémarrez le serveur web Airflow et l'ordonnanceur, et déclenchez (ou attendez) une nouvelle exécution de tâche.

  8. vérifiez que les journaux apparaissent pour les tâches nouvellement exécutées dans le seau que vous avez défini.

  9. vérifiez que la visionneuse de stockage s3 fonctionne dans L'interface utilisateur. Tirer vers le haut d'une tâche nouvellement exécutée, et vérifier que vous voyez quelque chose comme:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
    
23
répondu Arne Huang 2018-06-27 21:44:54

Voici une solution si vous ne voulez pas utiliser l'interface d'administration.

mon processus de déploiement est Dockerisé,et je ne touche jamais à l'interface D'administration. J'aime aussi placer des variables d'environnement spécifiques à L'écoulement D'air dans un script bash, qui l'emporte sur le .fichier cfg.

la circulation de l'air[s3]

tout d'abord, vous avez besoin du sous-paquet s3 installé pour écrire vos journaux de flux d'air à S3. ( boto3 fonctionne très bien pour les travaux Python dans vos DAGs, mais le S3Hook dépend du sous-paquet s3.)

encore un côté note: conda install ne gère pas encore , donc je dois faire pip install airflow[s3] .

variables D'environnement

dans un script bash, j'ai mis ces variables core . À partir de ces instructions mais en utilisant la Convention d'appellation AIRFLOW__{SECTION}__{KEY} pour les variables d'environnement, je fais:

export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

S3 ID de connexion

s3_uri est un identifiant de connexion que j'ai inventé. Dans Airflow, il correspond à une autre variable d'environnement, AIRFLOW_CONN_S3_URI . La valeur de cela est votre chemin S3, qui doit être sous forme URI. C'est

s3://access_key:secret_key@bucket/key

stockez ceci cependant vous manipulez d'autres variables d'environnement sensibles.

avec cette configuration, Airflow écrira vos logs À S3. Ils suivront le chemin de s3://bucket/key/dag/task_id .

4
répondu Niels Joaquin 2017-12-22 20:39:34

pour compléter la réponse D'Arne avec les récentes mises à jour Airflow, vous n'avez pas besoin de définir task_log_reader à une autre valeur que celle par défaut: task

comme si vous suiviez le modèle de journalisation par défaut airflow/config_templates/airflow_local_settings.py vous pouvez voir depuis ce commit (notez que le nom du gestionnaire a changé en 's3': {'task'... au lieu de s3.task ) c'est la valeur sur le dossier distant ( REMOTE_BASE_LOG_FOLDER ) remplacera le gestionnaire avec la bonne:

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

plus de détails sur la façon de se connecter à / lire à partir de S3: https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3

1
répondu Paul Leclercq 2018-05-31 19:06:36

juste une note de côté pour quiconque suit les instructions très utiles dans la réponse ci-dessus : Si vous tombez sur cette question: "Modulenotfonderror: aucun module nommé 'la circulation de l'air.utils.journal.logging_mixin.RedirectStdHandler' " comme référencé ici (qui se produit lors de l'utilisation d'airflow 1.9), le correctif est simple-utilisez plutôt Ce modèle de base: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (et suivre toutes les autres instructions dans la réponse ci-dessus )

le modèle actuel incubator-airflow/airflow/config_templates/airflow_local_settings.py présente dans master branch contient un renvoi à la classe" airflow.utils.journal.s3_task_handler.S3TaskHandler", qui n'est pas présent dans apache-débit d'air==1.9.0 paquet python. Espérons que cette aide!

1
répondu diogoa 2018-06-22 17:09:22