Écrire un seul fichier CSV en utilisant spark-csv

j'utilise https://github.com/databricks/spark-csv , j'essaie d'écrire un seul CSV, mais pas capable, il fait un dossier.

a besoin d'une fonction Scala qui prendra les paramètres comme chemin et nom de fichier et écrira ce fichier CSV.

66
demandé sur user6910411 2015-07-28 14:08:20

7 réponses

il crée un dossier avec plusieurs fichiers, parce que chaque partition est sauvegardée individuellement. Si vous avez besoin d'un seul fichier de sortie (toujours dans un dossier) vous pouvez repartition (de préférence si les données en amont sont volumineuses, mais nécessite un shuffle):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

ou coalesce :

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

bloc de données avant de les enregistrer:

toutes les données seront écrites en mydata.csv/part-00000 . Avant d'utiliser cette option soyez sûr que vous comprendre ce qui se passe et quel est le coût de transfert de toutes les données à un seul travailleur . Si vous utilisez le système de fichiers distribués avec réplication, les données seront transférées plusieurs fois - d'abord récupéré à un seul travailleur et ensuite distribué sur les noeuds de stockage.

alternativement vous pouvez laisser votre code tel quel et utiliser des outils polyvalents comme cat ou HDFS getmerge pour fusionner simplement toutes les pièces ensuite.

105
répondu zero323 2018-01-13 02:48:09

si vous lancez Spark avec HDFS, j'ai résolu le problème en écrivant des fichiers csv normalement et en utilisant HDFS pour faire la fusion. Je le fais dans Spark (1.6) directement:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

ne me rappelle pas où j'ai appris ce truc, mais ça pourrait marcher pour toi.

22
répondu Minkymorgan 2018-01-13 02:54:14

je pourrais être un peu en retard au jeu ici, mais en utilisant coalesce(1) ou repartition(1) peut fonctionner pour les petits ensembles de données, mais les grands ensembles de données seraient tous jetés dans une partition sur un noeud. Cela est susceptible de jeter OOM erreurs, ou au mieux, le processus lentement.

je suggère fortement que vous utilisiez la fonction FileUtil.copyMerge() de L'API Hadoop. Cela va fusionner les sorties dans un seul fichier.

EDIT - This apporte efficacement les données au pilote plutôt qu'à un nœud exécuteur. Coalesce() serait bien si un seul exécuteur a plus de RAM à utiliser que le pilote.

EDIT 2: copyMerge() est supprimé dans Hadoop 3.0. Voir l'article suivant sur le débordement de la pile pour plus d'informations sur la façon de travailler avec la nouvelle version: Hadoop how to do CopyMerge in Hadoop 3.0

19
répondu etspaceman 2018-01-13 02:51:36

si vous utilisez des Databricks et que vous pouvez ajuster toutes les données en RAM sur un seul travailleur( et donc utiliser .coalesce(1) ), vous pouvez utiliser dbfs pour trouver et déplacer le fichier CSV résultant:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

si votre fichier ne correspond pas à la mémoire vive du worker, vous pouvez considérer la suggestion de chaotic3quilibrium d'utiliser des FileUtils.copyMerge () . Je n'ai pas fait cela, et je ne sais pas encore si c'est possible ou non, par exemple, sur S3.

This la réponse est construit sur les précédentes réponses à cette question ainsi que mes propres tests de l'extrait de code. Je l'ai d'abord posté sur Databricks et je le republie ici.

la meilleure documentation pour l'option récursive de dbfs rm que j'ai trouvé est sur un forum de Databricks .

6
répondu Josiah Yoder 2018-01-13 02:54:33

répartition / coalesce à 1 partition avant d'Enregistrer (vous obtiendriez toujours un dossier mais il y aurait une partie du fichier)

2
répondu Arnon Rotem-Gal-Oz 2015-07-28 11:46:02

vous pouvez utiliser rdd.coalesce(1, true).saveAsTextFile(path)

il stockera les données comme fichier unique dans le chemin / part-00000

1
répondu Gourav 2016-09-16 17:02:49

il y a une autre façon D'utiliser Java

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  {
     val p = new java.io.PrintWriter(f);  
     try { op(p) } 
     finally { p.close() }
  } 

printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}
-2
répondu Sergio Alyoshkin 2017-04-04 07:35:00