Comment sauvegarder une DataFrame sous forme compressée (gzipped) CSV?

J'utilise Spark 1.6.0 et Scala.

je veux enregistrer une base de données en format CSV compressé.

Voici ce que j'ai jusqu'à présent (j'ai déjà df et scSparkContext):

//set the conf to the codec I want
sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

df.write
  .format("com.databricks.spark.csv")
  .save(my_directory)

La sortie n'est pas dans gz format.

10
demandé sur Jacek Laskowski 2016-10-20 23:32:03

4 réponses

https://github.com/databricks/spark-csv

On peut lire:

codec: codec de compression à utiliser lors de la sauvegarde dans le fichier. Devrait être le nom entièrement qualifié d'un organisme de mise en œuvre de classe.apache.hadoop.io.compresse.CompressionCodec ou un nom de raccourci insensible à la casse (bzip2, gzip, lz4, et snappy). Par défaut, aucune compression lorsqu'un codec n'est pas spécifié.

Dans votre cas, cela devrait travail: df.write.format("com.databricks.spark.csv").codec("gzip")\ .save('my_directory/my_file.gzip')

5
répondu Alex-Antoine Fortin 2016-11-28 19:37:42

ce code fonctionne pour Spark 2.1, où .codec n'est pas disponible.

df.write
  .format("com.databricks.spark.csv")
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
  .save(my_directory)

pour Spark 2.2, vous pouvez utiliser le df.write.csv(...,codec="gzip") option décrite ici: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=codec

16
répondu Ravi Kant Saini 2017-11-30 16:06:59

Avec Spark 2.0+, c'est devenu un peu plus simple:

df.write.csv("path", compression="gzip")

vous n'avez plus besoin du paquet CSV des Databricks externes.

csv() writer supporte un certain nombre d'options pratiques. Par exemple:

  • sep: pour définir le caractère du séparateur.
  • quote: Si et comment citer des valeurs.
  • header: doit-on inclure une ligne d'en-tête?

il y a aussi un certain nombre d'autres vous pouvez utiliser des codecs de compression, en plus de gzip:

  • bzip2
  • lz4
  • snappy
  • deflate

Le plein d'Étincelle docs pour l' csv() l'écrivain est ici: Python/ Scala

7
répondu Nick Chammas 2017-11-14 17:52:19

Pour écrire le fichier CSV avec les en-têtes et de renommer la partie-000 fichier .csv.gzip

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite")
.option("header","true")
.option("codec",org.apache.hadoop.io.compress.GzipCodec").save(tempLocationFileName)

copyRename(tempLocationFileName, finalLocationFileName)

def copyRename(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
  // the "true" setting deletes the source files once they are merged into the new output
}

si vous n'avez pas besoin de l'en-tête alors mettez-le à false et vous n'auriez pas besoin de faire la coalesce non plus. Il sera plus rapide à écrire aussi.

1
répondu morfious902002 2017-11-14 18:30:22