Mode autonome Spark: comment compresser la sortie spark écrite sur HDFS

Lié à mon autre question, mais distinct:

someMap.saveAsTextFile("hdfs://HOST:PORT/out")

Si j'enregistre un RDD sur HDFS, Comment puis-je dire à spark de compresser la sortie avec gzip? Dans Hadoop, il est possible de définir

mapred.output.compress = true

Et choisissez l'algorithme de compression avec

mapred.output.compression.codec = <<classname of compression codec>>

Comment ferais - je cela dans spark? Ce travail sera ainsi?

Edit: à l'aide de l'étincelle-0.7.2

22
demandé sur ptikobj 2013-06-21 21:37:00

3 réponses

La méthode saveAsTextFile prend un paramètre optionnel supplémentaire de la classe codec à utiliser. Donc, pour votre exemple, il devrait être quelque chose comme ça d'utiliser gzip:

someMap.saveAsTextFile("hdfs://HOST:PORT/out", classOf[GzipCodec])

Mise à jour

Puisque vous utilisez 0.7.2, vous pourriez être en mesure de porter le code de compression via les options de configuration que vous définissez au démarrage. Je ne suis pas sûr si cela fonctionnera exactement, mais vous devez partir de ceci:

conf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)

À quelque chose comme ceci:

System.setProperty("spark.hadoop.mapred.output.compress", "true")
System.setProperty("spark.hadoop.mapred.output.compression.codec", "true")
System.setProperty("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
System.setProperty("spark.hadoop.mapred.output.compression.type", "BLOCK")

Si vous le faites fonctionner, publiez votre configuration serait probablement utile aux autres aussi.

21
répondu Noah 2015-05-11 13:06:27

Une autre façon d'enregistrer les fichiers compressés sur le système de répertoires HDFS ou Amazon S3 consiste à utiliser la méthode saveAsHadoopFile.

SomeMap est RDD [(K, V)], si vous avez someMap comme RDD[V], vous pouvez appeler someMap.map (line = >(line, "") pour utiliser la méthode saveAsHadoopFile.

import org.apache.hadoop.io.compress.GzipCodec

someMap.saveAsHadoopFile(output_folder_path, classOf[String], classOf[String], classOf[MultipleTextOutputFormat[String, String]], classOf[GzipCodec])
2
répondu Gongqin Shen 2015-05-04 20:59:41

Pour les versions plus récentes de Spark, procédez comme suit dans vos paramètres par défaut de spark.fichier xml. (mapred est derecated).

<property>
    <name>mapreduce.output.fileoutputformat.compress</name>
    <value>true</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.codec</name>
    <value>GzipCodec</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.type</name>
    <value>BLOCK</value>
</property>
1
répondu nikk 2016-08-18 21:21:37