Comment exporter des données de Spark SQL vers CSV
cette commande fonctionne avec HiveQL:
insert overwrite directory '/data/home.csv' select * from testtable;
mais avec Spark SQL je reçois une erreur avec un org.apache.spark.sql.hive.HiveQl
trace de la pile:
java.lang.RuntimeException: Unsupported language features in query:
insert overwrite directory '/data/home.csv' select * from testtable
Merci de me guider pour écrire les exporter au format CSV fonction Spark SQL.
6 réponses
vous pouvez utiliser la déclaration ci-dessous pour écrire le contenu de dataframe dans le format CSV
df.write.csv("/data/home/csv")
si vous avez besoin d'écrire toute la dataframe dans un seul fichier CSV, alors utilisez
df.coalesce(1).write.csv("/data/home/sample.csv")
étincelle 1.x, vous pouvez utiliser spark-csv pour écrire les résultats dans les fichiers CSV
en dessous de scala extrait de code aiderait
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")
Pour écrire le contenu dans un seul fichier
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
Depuis Étincelle 2.X
spark-csv
intégré natif de la source de données. Par conséquent, la déclaration nécessaire simplifie à (windows)
df.write
.option("header", "true")
.csv("file:///C:/out.csv")
ou UNIX
df.write
.option("header", "true")
.csv("/var/out.csv")
la réponse ci-dessus avec spark - csv est correcte mais il y a un problème-la bibliothèque crée plusieurs fichiers basés sur le partitionnement de la base de données. Et ce n'est pas ce dont nous avons habituellement besoin. Ainsi, vous pouvez combiner toutes les partitions en une seule:
df.coalesce(1).
write.
format("com.databricks.spark.csv").
option("header", "true").
save("myfile.csv")
et renommer la sortie de la lib (nom "part-00000") en un nom de fichier desire.
Ce billet de blog fournit plus de détails: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/
la façon la plus simple est de cartographier le RDD De La base de données et d'utiliser mkString:
df.rdd.map(x=>x.mkString(","))
Comme d'Étincelle 1.5 (ou même avant)
df.map(r=>r.mkString(","))
faire la même chose
si vous voulez que CSV s'échappe, vous pouvez utiliser apache commons lang pour cela. par exemple, voici le code que nous utilisons
def DfToTextFile(path: String,
df: DataFrame,
delimiter: String = ",",
csvEscape: Boolean = true,
partitions: Int = 1,
compress: Boolean = true,
header: Option[String] = None,
maxColumnLength: Option[Int] = None) = {
def trimColumnLength(c: String) = {
val col = maxColumnLength match {
case None => c
case Some(len: Int) => c.take(len)
}
if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
}
def rowToString(r: Row) = {
val st = r.mkString("~-~").replaceAll("[\p{C}|\uFFFD]", "") //remove control characters
st.split("~-~").map(trimColumnLength).mkString(delimiter)
}
def addHeader(r: RDD[String]) = {
val rdd = for (h <- header;
if partitions == 1; //headers only supported for single partitions
tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
rdd.getOrElse(r)
}
val rdd = df.map(rowToString).repartition(partitions)
val headerRdd = addHeader(rdd)
if (compress)
headerRdd.saveAsTextFile(path, classOf[GzipCodec])
else
headerRdd.saveAsTextFile(path)
}
le message d'erreur suggère qu'il ne s'agit pas d'une fonctionnalité supportée dans le langage de requête. Mais vous pouvez sauvegarder une base de données dans n'importe quel format comme d'habitude via l'interface RDD (df.rdd.saveAsTextFile
). Ou vous pouvez vérifier l' https://github.com/databricks/spark-csv.
avec l'aide de spark-csv nous pouvons écrire dans un fichier CSV.
val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`