Charger le fichier CSV avec Spark

Je suis nouveau sur Spark et j'essaie de lire des données CSV à partir d'un fichier avec Spark. Voici ce que je fais:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

Je m'attendrais à ce que cet appel me donne une liste des deux premières colonnes de mon fichier mais je reçois cette erreur:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

Bien que mon fichier CSV comme plus d'une colonne.

63
demandé sur Kernael 2015-02-28 17:41:00

10 réponses

Etes-vous sûr que tous les les lignes ont au moins 2 colonnes? Pouvez-vous essayer quelque chose comme, juste pour vérifier?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

Alternativement, vous pouvez imprimer le coupable (le cas échéant):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
45
répondu G Quintana 2016-12-30 18:25:57

Spark 2.0.0+

Vous pouvez utiliser directement la source de données csv intégrée:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

Ou

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

Sans inclure de dépendances externes.

Spark :

Au lieu de l'analyse manuelle, ce qui est loin d'être trivial dans un cas général, je recommanderais spark-csv:

Assurez-vous que l'Étincelle CSV est inclus dans le chemin d'accès (--packages, --jars, --driver-class-path)

Et chargez vos données comme suit:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

Il peut gérer chargement, inférence de schéma, suppression de lignes malformées et ne nécessite pas de passer des données de Python à la JVM.

Note:

Si vous connaissez le schéma, il est préférable d'éviter l'inférence du schéma et de le transmettre à DataFrameReader. En supposant que vous avez trois colonnes-entier, double et chaîne:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))
116
répondu zero323 2017-08-30 08:33:21

Le simple fractionnement par virgule divisera également les virgules qui se trouvent dans les champs (par exemple a,b,"1,2,3",c), donc ce n'est pas recommandé. la réponse de zero323 est bonne si vous voulez utiliser L'API DataFrames, mais si vous voulez vous en tenir à base Spark, vous pouvez analyser csvs en base Python avec le modulecsv :

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDIT: comme @muon mentionné dans les commentaires, cela traitera l'en-tête comme n'importe quelle autre ligne, vous devrez donc l'extraire manuellement. Par exemple, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (assurez-vous de ne pas modifier header avant le filtre est évalué). Mais à ce stade, vous êtes probablement mieux d'utiliser un analyseur csv intégré.

10
répondu Galen Long 2017-05-23 12:34:37

Et encore une autre option qui consiste à lire le fichier CSV en utilisant Pandas, puis à importer le DataFrame Pandas dans Spark.

Par exemple:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
9
répondu JP Mercier 2015-11-14 00:39:52
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|");

print(df.collect())
9
répondu y durga prasad 2016-12-30 19:09:48

Ceci est en ligne avec ce que JP Mercier a initialement suggéré sur l'utilisation des Pandas, mais avec une modification majeure: si vous lisez des données dans des Pandas en morceaux, cela devrait être plus malléable. Ce qui signifie que vous pouvez analyser un fichier beaucoup plus grand que celui que les Pandas peuvent gérer en une seule pièce et le transmettre à Spark dans des tailles plus petites. (Cela répond également au commentaire sur la raison pour laquelle on voudrait utiliser Spark s'ils peuvent tout charger dans les Pandas de toute façon.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
3
répondu abby sobh 2017-05-23 12:03:05

Maintenant, il y a aussi une autre option pour tout fichier csv général: https://github.com/seahboonsiew/pyspark-csv comme suit:

Supposons que nous ayons le contexte suivant

sc = SparkContext
sqlCtx = SQLContext or HiveContext

Tout d'abord, distribuer pyspark-csv.py aux exécuteurs utilisant SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

Lire les données csv via SparkContext et les convertir en DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
3
répondu optimist 2017-08-01 09:09:29

Si vos données csv ne contiennent aucun saut de ligne dans l'un des champs, vous pouvez charger vos données avec textFile() et les Analyser

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)
2
répondu iec2011007 2015-11-23 05:02:48

Si vous voulez charger csv en tant que dataframe, vous pouvez effectuer les opérations suivantes:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

Cela a bien fonctionné pour moi.

1
répondu Jeril 2017-11-09 10:09:02
import pandas as pd

data1 = pd.read_csv("test1.csv")
data2 = pd.read_csv("train1.csv")
-4
répondu hey kay 2017-07-31 18:01:43