Airflow Logs BrokenPipeException

j'utilise un environnement de flux d'Air groupé où j'ai quatre EC2-instances AWS pour les serveurs.

ec2-instances

  • Serveur 1: Serveur Web, Planificateur, Redis File D'Attente, PostgreSQL Base De Données
  • Serveur 2: Serveur Web
  • Le Serveur 3: Travailleur
  • Le Serveur 4: Travailleur

mon installation fonctionne parfaitement bien depuis trois mois maintenant mais sporadiquement environ une fois par semaine je reçois un tuyau cassé Exception lorsque le flux d'Air Tente d'enregistrer quelque chose.

*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-13T00:00:00/1.log

[2018-07-16 00:00:15,521] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:00:15,698] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1407} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2018-07-16 00:00:15,719] {models.py:1428} INFO - Executing <Task(OmegaFileSensor): task_1> on 2018-07-13 00:00:00
[2018-07-16 00:00:15,720] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'airflow run foobar task_1 2018-07-13T00:00:00 --job_id 1320 --raw -sd DAGS_FOLDER/datalake_digitalplatform_arl_workflow_schedule_test_2.py']
[2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,532] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config
[2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,532] {default_celery.py:41} WARNING - Celery Executor will run without SSL
[2018-07-16 00:00:16,534] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,533] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-07-16 00:00:16,597] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,597] {models.py:189} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/datalake_digitalplatform_arl_workflow_schedule_test_2.py
[2018-07-16 00:00:16,768] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - --- Logging error ---

[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - Traceback (most recent call last):

[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING -   File "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
    self.flush()

[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING -   File "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
    self.stream.flush()

[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - BrokenPipeError: [Errno 32] Broken pipe

[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - Call stack:

[2018-07-16 00:16:24,933] {logging_mixin.py:84} WARNING -   File "/usr/bin/airflow", line 27, in <module>
    args.func(args)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in _run_raw_task
    result = task_copy.execute(context=context)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 78, in execute
    while not self.poke(context):

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
    directory = os.listdir(full_path)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 36, in handle_timeout
    self.log.error("Process timed out")

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - Message: 'Process timed out'
Arguments: ()

[2018-07-16 00:16:24,942] {models.py:1595} ERROR - Timeout
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 78, in execute
    while not self.poke(context):
  File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
    directory = os.listdir(full_path)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout
[2018-07-16 00:16:24,942] {models.py:1624} INFO - Marking task as FAILED.
[2018-07-16 00:16:24,956] {models.py:1644} ERROR - Timeout

parfois l'erreur dira aussi

*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
*** Failed to fetch log file from worker. 404 Client Error: NOT FOUND for url: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log

Je ne sais pas pourquoi les logs fonctionnent ~95% du temps mais échouent au hasard à d'autres moments. Voici mes paramètres de journal dans mon flux d'Air.fichier cfg,

# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /home/ec2-user/airflow/logs

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id =
encrypt_s3_logs = False

# Logging level
logging_level = INFO

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =

# Log format
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Name of handler to read task instance logs.
# Default to use file task handler.
task_log_reader = file.task

# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -
error_logfile = 

# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5

# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# How often should stats be printed to the logs
print_stats_interval = 30

child_process_log_directory = /home/ec2-user/airflow/logs/scheduler

je me demande si peut-être je devrais essayer une technique différente pour ma journalisation telle que l'écriture dans un seau S3 ou s'il y a quelque chose d'autre que je peux faire pour corriger ceci question.

mise à Jour:

L'écriture des logs à S3 n'a pas résolu ce problème. Aussi, l'erreur est plus cohérent maintenant (encore sporadiques). Ça arrive plus que 50% du temps maintenant. Une chose que j'ai remarqué, c'est que la tâche sur laquelle ça se passe est ma tâche de création D'EMR AWS. Le démarrage d'un cluster EMR AWS prend environ 20 minutes, puis la tâche doit attendre que les commandes Spark s'exécutent sur le cluster EMR. La tâche unique dure environ 30 minutes. Je suis je me demande si c'est trop long pour qu'une tâche de flux D'air soit exécutée et si c'est pour cela qu'elle commence à ne pas écrire dans les journaux. Si c'est le cas, alors je pourrais briser la tâche EMR de sorte qu'il ya une tâche pour la création EMR, puis une autre tâche pour les commandes Spark sur le cluster EMR.

Remarque:

j'ai aussi créé un nouveau billet bug sur la Jira d'Airflow icihttps://issues.apache.org/jira/browse/AIRFLOW-2844

9
demandé sur Kyle Bridenstine 2018-07-16 19:01:15

1 réponses

Ce problème est un symptôme d'un autre problème, j'ai juste réglé ici AirflowException: la commande Celery a échoué - le nom d'hôte enregistré ne correspond pas au nom d'hôte de cette instance.

Je n'ai pas vu le AirflowException: Céleri échec de la commande parce qu'il a montré sur le débit d'air travailleur journaux. Ce n'est que lorsque j'ai regardé les logs de l'airflow worker en temps réel que j'ai vu que L'erreur était lancée que J'ai aussi eu la BrokenPipeException dans ma tâche.

mais ça devient un peu plus bizarre. Je tiens seulement à voir le BrokenPipeException levée si je n'ai print("something to log")etAirflowException: Celery command failed... une erreur s'est produite sur le noeud Worker. Quand j'ai changé tous mes impression instructions d'utilisation import logging ... logging.info("something to log") je ne voudrais pas voir le BrokenPipeException mais la tâche échouerait encore à cause de AirflowException: Celery command failed... erreur. Mais si je n'avais pas vu la BrokenPipeException être jetée dans mon journal des tâches D'Airflow Je n'aurais pas su pourquoi la tâche échouait parce qu'une fois que j'ai éliminé les instructions d'impression, je n'ai jamais vu d'erreur dans les journaux des tâches de flux D'Air (seulement sur le $préposé à l'écoulement de l'air journaux)

pour faire court, il y a quelques prises.

  1. Ne pas le faire print("something to log") utilisez Airflow intégré à la journalisation en important la journalisation et en utilisant ensuite la classe de journalisation comme import logginglogging.info("something to log")

  2. si vous utilisez une Instance AWS EC2 comme serveur pour Airflow alors vous peut-être que vous éprouvez ce problème:https://github.com/apache/incubator-airflow/pull/2484 un correctif à cette question a déjà été intégré dans la version 1.10 de Airflow (j'utilise actuellement la Version 1.9 de Airflow). Donc mise à niveau de votre version Airflow à 1.10. Vous pouvez également utiliser la commandepip install git+git://github.com/apache/incubator-airflow.git@v1-10-stable. En outre, si vous ne voulez pas mettre à jour votre version Airflow, vous pouvez suivre les étapes sur le github question pour mettre à jour manuellement fichier avec le fix ou le fork Airflow et choisir la propagation qui le corrige.

2
répondu Kyle Bridenstine 2018-08-13 16:19:54