Obtenir CSV à Spark dataframe

j'utilise python sur Spark et je voudrais mettre un csv dans une dataframe.

documentation pour le Spark SQL étrangement ne fournit pas d'explications pour le CSV comme une source.

j'ai trouvé Spark-CSV cependant j'ai des problèmes avec les deux parties de la documentation:

  • "This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3" Ai-je vraiment besoin d'ajouter cet argument chaque fois que je lance pyspark ou spark-submit? Cela semble très inélégant. N'est-il pas un moyen de l'importer en python plutôt que de le recharger à chaque fois?

  • df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv") même si je fais ce qui précède, ça ne marchera pas. Que signifie l'argument "source" dans cette ligne de code? Comment puis-je simplement charger un fichier local sur linux, par exemple "/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv"?

15
demandé sur Alexis Eggermont 2015-04-29 09:43:44

8 réponses

lire le fichier csv dans un RDD et ensuite générer un RowRDD à partir du RDD original.

créer le schéma représenté par un type de structure correspondant à la structure des lignes dans le RDD créé à L'Étape 1.

appliquez le schéma au RDD des lignes via la méthode createDataFrame fournie par SQLContext.

lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

source: GUIDE DE PROGRAMMATION SPARK

12
répondu None 2017-01-03 23:03:26
from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)

Employee_rdd = sc.textFile("\..\Employee.csv")
               .map(lambda line: line.split(","))

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])

Employee_df.show()
21
répondu Aravind Krishnakumar 2016-04-20 04:38:18

Avec les versions plus récentes de l'Étincelle (comme, je crois, 1.4), cela est devenu beaucoup plus facile. L'expression sqlContext.readDataFrameReader exemple .csv() méthode:

df = sqlContext.read.csv("/path/to/your.csv")

notez que vous pouvez également indiquer que le fichier csv a un en-tête en ajoutant le mot-clé argument header=True.csv() appel. Une poignée d'autres options sont disponibles, et décrit dans le lien ci-dessus.

12
répondu ohruunuruus 2017-01-13 15:45:33

si la dépendance supplémentaire du paquet ne vous dérange pas, vous pouvez utiliser Pandas pour analyser le fichier CSV. Il gère très bien les virgules internes.

Dépendances:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

lire tout le fichier immédiatement dans une fenêtre Spark DataFrame:

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# If no header:
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) 
s_df = sql_sc.createDataFrame(pandas_df)

Ou, encore plus de données, consciemment, vous pouvez segmenter les données en une Étincelle CA alors DF:

chunk_100k = pd.read_csv('file.csv', chunksize=100000)

for chunky in chunk_100k:
    Spark_temp_rdd = sc.parallelize(chunky.values.tolist())
    try:
        Spark_full_rdd += Spark_temp_rdd
    except NameError:
        Spark_full_rdd = Spark_temp_rdd
    del Spark_temp_rdd

Spark_DF = Spark_full_rdd.toDF(['column 1','column 2'])
9
répondu abby sobh 2016-09-25 22:23:35

après Spark 2.0, Il est recommandé d'utiliser une session Spark:

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a SparkSession
spark = SparkSession \
    .builder \
    .appName("basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3]))

lines = spark.sparkContext.textFile("file.csv")
df = lines.map(mapper)

# Infer the schema, and register the DataFrame as a table.
schemaDf = spark.createDataFrame(df).cache()
schemaDf.createOrReplaceTempView("tablename")
6
répondu Florent 2016-12-09 04:05:17

pour Pyspark, en supposant que la première ligne du fichier csv contient un en-tête

spark = SparkSession.builder.appName('chosenName').getOrCreate()
df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True)
3
répondu Grant Shannon 2017-10-03 08:06:32

j'ai rencontré le même problème. La solution est d'ajouter une variable d'environnement appelée "PYSPARK_SUBMIT_ARGS" et de définir sa valeur à "--packages com.databricks: spark-csv_2.10: 1.4.0 pyspark-shell". Cela fonctionne avec le shell interactif Python de Spark.

assurez-vous de faire correspondre la version de spark-csv avec la version de Scala installée. Avec Scala 2.11, c'est spark-csv_2.11 et avec Scala 2.10 ou 2.10.5 c'est spark-csv_2.10.

j'Espère que ça fonctionne.

0
répondu mahima 2016-07-26 17:28:57

basé sur la réponse D'Aravind, mais beaucoup plus court, par exemple:

lines = sc.textFile("/path/to/file").map(lambda x: x.split(","))
df = lines.toDF(["year", "month", "day", "count"])
0
répondu JARS 2017-09-29 11:43:31