Comment écrire le RDD résultant dans un fichier csv en Python Spark

j'ai un résultat de RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions). C'est la sortie dans ce format:

[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]

ce que je veux c'est créer un fichier CSV avec une colonne pour labels (la première partie du tuple dans la sortie ci-dessus) et un pour predictions(deuxième partie de la sortie tuple). Mais je ne sais pas comment écrire dans un fichier CSV en Spark en utilisant Python.

comment créer un fichier CSV avec la sortie ci-dessus?

20
demandé sur Daniel Darabos 2015-08-09 00:53:51

3 réponses

map les lignes de la RDD (labelsAndPredictions) dans les chaînes (les lignes du CSV) puis utilisez rdd.saveAsTextFile().

def toCSVLine(data):
  return ','.join(str(d) for d in data)

lines = labelsAndPredictions.map(toCSVLine)
lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv')
31
répondu Daniel Darabos 2015-08-10 11:29:08

je sais que c'est un vieux post. Mais pour aider quelqu'un à chercher la même chose, voici comment j'écris un RDD de deux colonnes à un seul fichier CSV dans PySpark 1.6.2

La CA:

>>> rdd.take(5)
[(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]

Maintenant le code:

# First I convert the RDD to dataframe
from pyspark import SparkContext
df = sqlContext.createDataFrame(rdd, ['count', 'word'])

The DF:

>>> df.show()
+-----+-----------+
|count|       word|
+-----+-----------+
|73342|      cells|
|62861|       cell|
|61714|    studies|
|61377|        aim|
|60168|   clinical|
|59275|          2|
|59221|          1|
|58274|       data|
|58087|development|
|56579|     cancer|
|50243|    disease|
|49817|   provided|
|49216|   specific|
|48857|     health|
|48536|      study|
|47827|    project|
|45573|description|
|45455|  applicant|
|44739|    program|
|44522|   patients|
+-----+-----------+
only showing top 20 rows

maintenant, écrivez à CSV

# Write CSV (I have HDFS storage)
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')

P. S: Je ne suis qu'un débutant qui apprend à partir des messages postés ici dans Stackoverflow. Donc je ne sais pas si c'est le meilleur moyen. Mais il a travaillé pour moi et j'espère que ça aidera quelqu'un!

16
répondu Insilico 2017-02-21 22:13:05

il n'est pas bon de simplement se joindre par des virgules parce que si les champs contiennent des virgules, ils ne seront pas correctement cités, par exemple ','.join(['a', 'b', '1,2,3', 'c'])a,b,1,2,3,c si vous voulez a,b,"1,2,3",c. Vous devriez plutôt utiliser le module csv de Python pour convertir chaque liste de la RDD en une chaîne csv correctement formatée:

# python 3
import csv, io

def list_to_csv_str(x):
    """Given a list of strings, returns a properly-csv-formatted string."""
    output = io.StringIO("")
    csv.writer(output).writerow(x)
    return output.getvalue().strip() # remove extra newline

# ... do stuff with your rdd ...
rdd = rdd.map(list_to_csv_str)
rdd.saveAsTextFile("output_directory")

puisque le module csv n'écrit que pour les objets file, nous devons créer un "fichier" vide avec io.StringIO("") et le dire au csv.Ecrivez la chaîne formatée par csv. Ensuite, nous utilisons output.getvalue() pour obtenir la chaîne de caractères que nous venons d'écrire à la "fichier". Pour que ce code fonctionne avec Python 2, Il suffit de remplacer io par le module StringIO.

si vous utilisez L'API Spark DataFrames, vous pouvez aussi regarder dans fonction de sauvegarde des banques de données, qui a un format csv.

8
répondu Galen Long 2017-01-27 15:33:57