comment exporter une table dataframe dans pyspark vers csv?
j'utilise spark-1.3.1 (pyspark) et j'ai généré une table en utilisant une requête SQL. J'ai maintenant un objet qui est une base de données. Je veux exporter cet objet DataFrame (Je l'ai appelé "table") vers un fichier csv pour pouvoir le manipuler et tracer les colonnes. Comment exporter le DataFrame "table" vers un fichier csv?
Merci!
5 réponses
si data frame entre dans la mémoire d'un pilote et que vous voulez sauvegarder dans le système de fichiers local, vous pouvez convertir Spark DataFrame local Pandas DataFrame en utilisant toPandas
méthode et puis tout simplement utiliser to_csv
:
df.toPandas().to_csv('mycsv.csv')
Sinon, vous pouvez utiliser spark-csv:
étincelle 1.3
df.save('mycsv.csv', 'com.databricks.spark.csv')
Spark 1.4+
df.write.format('com.databricks.spark.csv').save('mycsv.csv')
dans Spark 2.0+ vous pouvez utilisez csv
source des données directement:
df.write.csv('mycsv.csv')
pour Apache Spark 2+, afin de sauvegarder dataframe dans un seul fichier csv. Utilisez la commande suivante
query.repartition(1).write.csv("cc_out.csv", sep='|')
Ici 1
indiquer que j'ai besoin d'une partition de csv. vous pouvez les modifier en fonction de vos besoins.
df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")
si vous avez besoin de manipuler des cordes avec des linebreaks ou des virgule qui ne fonctionneront pas. Utiliser ceci:
import csv
import cStringIO
def row2csv(row):
buffer = cStringIO.StringIO()
writer = csv.writer(buffer)
writer.writerow([str(s).encode("utf-8") for s in row])
buffer.seek(0)
return buffer.read().strip()
df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")
Que Diriez-vous de ceci (dans vous ne voulez pas un seul liner) ?
for row in df.collect():
d = row.asDict()
s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"])
f.write(s)
f est un descripteur de fichier ouvert. Aussi le séparateur est un char D'onglet, mais il est facile de changer à tout ce que vous voulez.
vous devez repartitionner le datagramme dans une seule partition et ensuite définir le format, le chemin et d'autres paramètres pour le fichier dans le format du système de fichiers Unix et ici vous allez,
df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')
en savoir plus sur le fonction de répartition Lire plus sur le enregistrer la fonction