Comment exporter des données de Spark SQL vers CSV

cette commande fonctionne avec HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;

mais avec Spark SQL je reçois une erreur avec un org.apache.spark.sql.hive.HiveQl trace de la pile:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

Merci de me guider pour écrire les exporter au format CSV fonction Spark SQL.

37
demandé sur Daniel Darabos 2015-08-11 12:24:15

6 réponses

vous pouvez utiliser la déclaration ci-dessous pour écrire le contenu de dataframe dans le format CSV df.write.csv("/data/home/csv")

si vous avez besoin d'écrire toute la dataframe dans un seul fichier CSV, alors utilisez df.coalesce(1).write.csv("/data/home/sample.csv")

étincelle 1.x, vous pouvez utiliser spark-csv pour écrire les résultats dans les fichiers CSV

en dessous de scala extrait de code aiderait

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")

Pour écrire le contenu dans un seul fichier

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
60
répondu sag 2017-11-29 09:26:46

Depuis Étincelle 2.X spark-csv intégré natif de la source de données. Par conséquent, la déclaration nécessaire simplifie à (windows)

df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")

ou UNIX

df.write
  .option("header", "true")
  .csv("/var/out.csv")
32
répondu Boern 2016-12-09 09:10:32

la réponse ci-dessus avec spark - csv est correcte mais il y a un problème-la bibliothèque crée plusieurs fichiers basés sur le partitionnement de la base de données. Et ce n'est pas ce dont nous avons habituellement besoin. Ainsi, vous pouvez combiner toutes les partitions en une seule:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

et renommer la sortie de la lib (nom "part-00000") en un nom de fichier desire.

Ce billet de blog fournit plus de détails: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

22
répondu Dmitry Petrov 2016-12-07 22:54:41

la façon la plus simple est de cartographier le RDD De La base de données et d'utiliser mkString:

  df.rdd.map(x=>x.mkString(","))

Comme d'Étincelle 1.5 (ou même avant) df.map(r=>r.mkString(",")) faire la même chose si vous voulez que CSV s'échappe, vous pouvez utiliser apache commons lang pour cela. par exemple, voici le code que nous utilisons

 def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = {

    def trimColumnLength(c: String) = {
      val col = maxColumnLength match {
        case None => c
        case Some(len: Int) => c.take(len)
      }
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    }
    def rowToString(r: Row) = {
      val st = r.mkString("~-~").replaceAll("[\p{C}|\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    }

    def addHeader(r: RDD[String]) = {
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    }

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  }
9
répondu Arnon Rotem-Gal-Oz 2016-06-16 18:42:16

le message d'erreur suggère qu'il ne s'agit pas d'une fonctionnalité supportée dans le langage de requête. Mais vous pouvez sauvegarder une base de données dans n'importe quel format comme d'habitude via l'interface RDD (df.rdd.saveAsTextFile). Ou vous pouvez vérifier l' https://github.com/databricks/spark-csv.

1
répondu Daniel Darabos 2015-08-11 10:45:04

avec l'aide de spark-csv nous pouvons écrire dans un fichier CSV.

val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
1
répondu Uva Prakash P 2018-01-15 15:41:19