Comment sauvegarder une DataFrame sous forme compressée (gzipped) CSV?
J'utilise Spark 1.6.0 et Scala.
je veux enregistrer une base de données en format CSV compressé.
Voici ce que j'ai jusqu'à présent (j'ai déjà df
et sc
SparkContext
):
//set the conf to the codec I want
sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
df.write
.format("com.databricks.spark.csv")
.save(my_directory)
La sortie n'est pas dans gz
format.
4 réponses
On peut lire:
codec
: codec de compression à utiliser lors de la sauvegarde dans le fichier. Devrait être le nom entièrement qualifié d'un organisme de mise en œuvre de classe.apache.hadoop.io.compresse.CompressionCodec ou un nom de raccourci insensible à la casse (bzip2, gzip, lz4, et snappy). Par défaut, aucune compression lorsqu'un codec n'est pas spécifié.
Dans votre cas, cela devrait travail:
df.write.format("com.databricks.spark.csv").codec("gzip")\
.save('my_directory/my_file.gzip')
ce code fonctionne pour Spark 2.1, où .codec
n'est pas disponible.
df.write
.format("com.databricks.spark.csv")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.save(my_directory)
pour Spark 2.2, vous pouvez utiliser le df.write.csv(...,codec="gzip")
option décrite ici: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=codec
Avec Spark 2.0+, c'est devenu un peu plus simple:
df.write.csv("path", compression="gzip")
vous n'avez plus besoin du paquet CSV des Databricks externes.
csv()
writer supporte un certain nombre d'options pratiques. Par exemple:
sep
: pour définir le caractère du séparateur.quote
: Si et comment citer des valeurs.header
: doit-on inclure une ligne d'en-tête?
il y a aussi un certain nombre d'autres vous pouvez utiliser des codecs de compression, en plus de gzip
:
bzip2
lz4
snappy
deflate
Le plein d'Étincelle docs pour l' csv()
l'écrivain est ici: Python/ Scala
Pour écrire le fichier CSV avec les en-têtes et de renommer la partie-000 fichier .csv.gzip
DF.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite")
.option("header","true")
.option("codec",org.apache.hadoop.io.compress.GzipCodec").save(tempLocationFileName)
copyRename(tempLocationFileName, finalLocationFileName)
def copyRename(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
si vous n'avez pas besoin de l'en-tête alors mettez-le à false et vous n'auriez pas besoin de faire la coalesce non plus. Il sera plus rapide à écrire aussi.