Temps d'écriture extrêmement lent S3 à partir D'EMR / Spark

j'écris pour voir si quelqu'un sait comment accélérer les temps d'écriture de S3 à partir de Spark courant dans EMR?

mon travail D'étincelle prend plus de 4 heures à compléter, cependant le faisceau est seulement sous la charge pendant les premières 1,5 heures.

enter image description here

<!-J'étais curieux de savoir ce que Spark faisait pendant tout ce temps. J'ai regardé les logs et j'ai trouvé beaucoup de s3 mv commandes, une pour chaque fichier. Puis je jette un coup d'oeil directement sur S3 je vois que tous mes fichiers sont dans un _temporaire répertoire.

secondaire, je suis préoccupé par mon coût de cluster, il semble que je dois acheter 2 heures de calcul pour cette tâche spécifique. Cependant, je finis par acheter jusqu'à 5 heures. Je suis curieux de savoir si EMR AutoScaling peut aider avec les coûts dans cette situation.

certains articles discutent de la modification de l'algorithme du committeur de sortie de fichier mais j'ai eu peu de succès avec cela.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

écrire au HDFS local est rapide. Je suis curieux si l'émission d'un la commande hadoop pour copier les données dans S3 serait plus rapide?

enter image description here

17
demandé sur jspooner 2017-03-16 02:14:35

4 réponses

Ce que vous voyez est un problème avec outputcommitter et s3. le travail de propagation s'applique fs.rename sur le dossier _temporary et puisque S3 ne supporte pas le renommage, cela signifie qu'une seule requête copie et supprime maintenant tous les fichiers de _temporary à sa destination finale..

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") ne fonctionne qu'avec la version hadoop > 2.7. ce qu'il fait est de copier chaque fichier de _temporary on commit task et non de commit job de sorte qu'il est distribué et fonctionne assez rapidement.

Si vous utilisez l'ancienne version de hadoop J'utiliserais Spark 1.6 et utilisez:

sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

*notez que cela ne fonctionne pas avec la specualtion activée ou l'écriture en mode ajout

**notez aussi qu'il est déprécié dans Spark 2.0 (remplacé par algorithme.version=2)

BTW dans mon équipe nous écrivons en fait avec Spark à HDFS et utilisons les jobs DISTCP (spécifiquement s3-dist-cp) dans la production pour copier les fichiers à S3 mais ceci est fait pour plusieurs autres raisons (cohérence, tolérance de défaut) de sorte qu'il n'est pas nécessaire.. vous pouvez écrire à S3 assez rapidement en utilisant ce que j'ai suggéré.

7
répondu Tal Joffe 2018-09-07 16:03:02

le committer direct a été retiré de spark car il n'était pas résistant aux échecs. Je conseil vivement contre son utilisation.

il y a des travaux en cours à Hadoop, s3guard, pour ajouter 0-renommer les committers, qui seront O(1) et tolérants aux défauts; gardez un oeil sur HADOOP-13786.

Ignorer "la Magie committer" pour l'instant, Netflix base de mise en scène valider va premier navire (hadoop 2.9? 3,0?)

  1. ceci écrit le travail au FS local, en tâche commit
  2. questions non validées multipart mettre opérations d'écrire les données, mais ne pas se matérialiser.
  3. enregistre les informations nécessaires pour propager la PUT vers HDFS, en utilisant le "algorithme 1" de propagation de fichier
  4. implémente une commit de travail qui utilise la commit de sortie de fichier de HDFS pour décider quels PUTs doivent être complétés et lesquels doivent être annulés.

résultat: la tâche commit prend des données / bande passante en secondes, mais la tâche commit ne prend pas plus de le temps de faire 1-4 obtient sur le dossier de destination et un POST pour chaque dossier en attente, ce dernier étant parallélisé.

vous pouvez récupérer le committer sur lequel ce travail est basé,à partir de netflix, et probablement l'utiliser dans des étincelles aujourd'hui. Faire définir le fichier de commettre un algorithme = 1 (ce qui devrait être la valeur par défaut) ou il l'habitude de l'écriture de données.

5
répondu Steve Loughran 2017-03-16 13:47:31

j'ai eu cas d'utilisation similaire où j'ai utilisé spark pour écrire à s3 et avait des problèmes de performance. La raison principale était spark était la création de beaucoup de fichiers de pièce à zéro octet et le remplacement des fichiers temp par le nom réel du fichier ralentissait le processus d'écriture. Essayé ci-dessous approche comme le travail autour de

  1. Ecrire la sortie de spark à HDFS et utiliser Hive pour écrire à s3. Les performances étaient bien meilleures car hive créait moins de fichiers partiels. Le problème que j'ai eu est(a également eu le même problème lors de l'utilisation spark), supprimer l'action sur la Politique n'a pas été prévue dans prod env pour des raisons de sécurité. S3 bucket était crypté dans mon cas.

  2. écrire la sortie spark à HDFS et copié les fichiers hdfs à la copie locale et utilisé aws S3 pour pousser les données à s3. Deuxième meilleur résultat avec cette approche. Créé billet avec Amazon et ils ont suggéré d'aller avec celui-ci.

  3. utilisez S3 dist cp pour copier des fichiers de HDFS vers S3. Ce qui fonctionnait sans problèmes, mais pas performant

3
répondu Vikrame 2017-03-16 04:11:21

que voyez-vous dans la production d'étincelles? Si vous voyez beaucoup d'opérations de renommage, lisez

1
répondu Niros 2017-05-23 12:32:06