mise en place de s3 pour les logs dans le flux d'air
j'utilise docker-compose pour mettre en place un cluster d'écoulement d'air évolutif. J'ai basé mon approche sur ce fichier Dockerfile https://hub.docker.com/r/puckel/docker-airflow/
mon problème est de configurer les logs pour écrire/lire à partir de s3. Quand un dag a terminé, je reçois une erreur comme celle-ci
*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.
*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00
j'ai créé une nouvelle section dans le fichier airflow.cfg
comme ceci
[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx
et puis spécifié le chemin s3 dans la section "remote logs" dans airflow.cfg
remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn
est-ce que j'ai mis ça en place correctement et il y a un bug? Est-il une recette pour le succès ici que je suis absent?
-- mise à Jour
j'ai essayé D'exporter dans les formats URI et JSON et ni l'un ni l'autre ne semblait fonctionner. J'ai ensuite exporté la clé aws_access_key_id et aws_secret_access_key, puis airflow a commencé à la récupérer. Maintenant, je reçois son erreur dans le travailleur journaux
6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00
-- mise à Jour
j'ai aussi trouvé ce lien https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html
j'ai alors shelled dans l'une de mes machines de travail (séparé du serveur web et de l'ordonnanceur) et ai exécuté ce morceau de code en python
import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))
je reçois cette erreur.
boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden
j'ai essayé d'exporter plusieurs types différents de AIRFLOW_CONN_
envs comme expliqué ici dans la section Connexions https://airflow.incubator.apache.org/concepts.html et par d'autres réponses à cette question.
s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3
{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}
{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}
j'ai aussi exporté AWS_ACCESS_KEY_ID et AWS_SECRET_ACCESS_KEY sans succès.
ces justificatifs d'identité sont stockés dans une base de données donc une fois que je les ajoute dans L'IU, ils devraient être récupérés par les travailleurs mais ils ne sont pas capable d'écrire/lire les journaux pour une raison quelconque.
5 réponses
vous devez configurer la connexion s3 à travers L'interface utilisateur airflow. Pour cela, vous devez aller dans L'onglet Admin -> Connections de airflow UI et créer une nouvelle ligne pour votre connexion S3.
un exemple de configuration serait:
Conn Id: my_conn_S3
Type Conn: S3
: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}
NOTE: en date du flux D'Air 1.9.0 l'enregistrement à distance a été considérablement modifié . Il existe des plans pour faciliter la journalisation à l'avenir - par exemple autodetect cloud provider à partir d'une chaîne de seau. Ces changements ne sont pas encore en cours, mais surveillez de près les versions. Si vous utilisez 1.9.0, lire sur.
référence ici
Instructions Complètes:
-
créez un répertoire pour stocker les configs et placez-le de sorte qu'il puisse être trouvé dans PYTHONPATH. Un exemple est $AIRFLOW_HOME / config
-
créer des fichiers vides appelés $AIRFLOW_HOME/config/log_config.py et $ $ AIRFLOW_HOME/config/__init__.py
-
copier le contenu de airflow/config_templates/airflow_local_settings.py dans le log_config.py dossier qui a été créé dans l'étape ci-dessus.
-
personnaliser les parties suivantes du modèle:
#Add this variable to the top of the file. Note the trailing slash. S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/' Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ... Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable 's3.task': { 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 's3_log_folder': S3_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['s3.task'], ... }, 'airflow.task_runner': { 'handlers': ['s3.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, }
-
assurez-vous qu'un crochet de connexion s3 a été défini dans Airflow, conformément à la réponse ci-dessus . Le crochet devrait avoir accès en lecture et en écriture au seau s3 défini ci-dessus dans S3_LOG_FOLDER.
-
mise à Jour $AIRFLOW_HOME / airflow.cfg pour contenir:
task_log_reader = s3.task logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id = <name of the s3 platform hook>
-
redémarrez le serveur web Airflow et l'ordonnanceur, et déclenchez (ou attendez) une nouvelle exécution de tâche.
-
vérifiez que les journaux apparaissent pour les tâches nouvellement exécutées dans le seau que vous avez défini.
-
vérifiez que la visionneuse de stockage s3 fonctionne dans L'interface utilisateur. Tirer vers le haut d'une tâche nouvellement exécutée, et vérifier que vous voyez quelque chose comme:
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'] [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
Voici une solution si vous ne voulez pas utiliser l'interface d'administration.
mon processus de déploiement est Dockerisé,et je ne touche jamais à l'interface D'administration. J'aime aussi placer des variables d'environnement spécifiques à L'écoulement D'air dans un script bash, qui l'emporte sur le .fichier cfg.
la circulation de l'air[s3]
tout d'abord, vous avez besoin du sous-paquet s3
installé pour écrire vos journaux de flux d'air à S3. ( boto3
fonctionne très bien pour les travaux Python dans vos DAGs, mais le S3Hook
dépend du sous-paquet s3.)
encore un côté note: conda install ne gère pas encore , donc je dois faire pip install airflow[s3]
.
variables D'environnement
dans un script bash, j'ai mis ces variables core
. À partir de ces instructions mais en utilisant la Convention d'appellation AIRFLOW__{SECTION}__{KEY}
pour les variables d'environnement, je fais:
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False
S3 ID de connexion
s3_uri
est un identifiant de connexion que j'ai inventé. Dans Airflow, il correspond à une autre variable d'environnement, AIRFLOW_CONN_S3_URI
. La valeur de cela est votre chemin S3, qui doit être sous forme URI. C'est
s3://access_key:secret_key@bucket/key
stockez ceci cependant vous manipulez d'autres variables d'environnement sensibles.
avec cette configuration, Airflow écrira vos logs À S3. Ils suivront le chemin de s3://bucket/key/dag/task_id
.
pour compléter la réponse D'Arne avec les récentes mises à jour Airflow, vous n'avez pas besoin de définir task_log_reader
à une autre valeur que celle par défaut: task
comme si vous suiviez le modèle de journalisation par défaut airflow/config_templates/airflow_local_settings.py vous pouvez voir depuis ce commit (notez que le nom du gestionnaire a changé en 's3': {'task'...
au lieu de s3.task
) c'est la valeur sur le dossier distant ( REMOTE_BASE_LOG_FOLDER
) remplacera le gestionnaire avec la bonne:
REMOTE_LOGGING = conf.get('core', 'remote_logging')
if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])
plus de détails sur la façon de se connecter à / lire à partir de S3: https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3
juste une note de côté pour quiconque suit les instructions très utiles dans la réponse ci-dessus : Si vous tombez sur cette question: "Modulenotfonderror: aucun module nommé 'la circulation de l'air.utils.journal.logging_mixin.RedirectStdHandler' " comme référencé ici (qui se produit lors de l'utilisation d'airflow 1.9), le correctif est simple-utilisez plutôt Ce modèle de base: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (et suivre toutes les autres instructions dans la réponse ci-dessus )
le modèle actuel incubator-airflow/airflow/config_templates/airflow_local_settings.py présente dans master branch contient un renvoi à la classe" airflow.utils.journal.s3_task_handler.S3TaskHandler", qui n'est pas présent dans apache-débit d'air==1.9.0 paquet python. Espérons que cette aide!