Charger le fichier CSV avec Spark
Je suis nouveau sur Spark et j'essaie de lire des données CSV à partir d'un fichier avec Spark. Voici ce que je fais:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Je m'attendrais à ce que cet appel me donne une liste des deux premières colonnes de mon fichier mais je reçois cette erreur:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
Bien que mon fichier CSV comme plus d'une colonne.
10 réponses
Etes-vous sûr que tous les les lignes ont au moins 2 colonnes? Pouvez-vous essayer quelque chose comme, juste pour vérifier?:
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()
Alternativement, vous pouvez imprimer le coupable (le cas échéant):
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()
Spark 2.0.0+
Vous pouvez utiliser directement la source de données csv intégrée:
spark.read.csv(
"some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)
Ou
(spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv"))
Sans inclure de dépendances externes.
Spark :
Au lieu de l'analyse manuelle, ce qui est loin d'être trivial dans un cas général, je recommanderais spark-csv
:
Assurez-vous que l'Étincelle CSV est inclus dans le chemin d'accès (--packages
, --jars
, --driver-class-path
)
Et chargez vos données comme suit:
(df = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
Il peut gérer chargement, inférence de schéma, suppression de lignes malformées et ne nécessite pas de passer des données de Python à la JVM.
Note:
Si vous connaissez le schéma, il est préférable d'éviter l'inférence du schéma et de le transmettre à DataFrameReader
. En supposant que vous avez trois colonnes-entier, double et chaîne:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
Le simple fractionnement par virgule divisera également les virgules qui se trouvent dans les champs (par exemple a,b,"1,2,3",c
), donc ce n'est pas recommandé. la réponse de zero323 est bonne si vous voulez utiliser L'API DataFrames, mais si vous voulez vous en tenir à base Spark, vous pouvez analyser csvs en base Python avec le modulecsv :
# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
EDIT: comme @muon mentionné dans les commentaires, cela traitera l'en-tête comme n'importe quelle autre ligne, vous devrez donc l'extraire manuellement. Par exemple, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(assurez-vous de ne pas modifier header
avant le filtre est évalué). Mais à ce stade, vous êtes probablement mieux d'utiliser un analyseur csv intégré.
Et encore une autre option qui consiste à lire le fichier CSV en utilisant Pandas, puis à importer le DataFrame Pandas dans Spark.
Par exemple:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|");
print(df.collect())
Ceci est en ligne avec ce que JP Mercier a initialement suggéré sur l'utilisation des Pandas, mais avec une modification majeure: si vous lisez des données dans des Pandas en morceaux, cela devrait être plus malléable. Ce qui signifie que vous pouvez analyser un fichier beaucoup plus grand que celui que les Pandas peuvent gérer en une seule pièce et le transmettre à Spark dans des tailles plus petites. (Cela répond également au commentaire sur la raison pour laquelle on voudrait utiliser Spark s'ils peuvent tout charger dans les Pandas de toute façon.)
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)
for chunky in chunk_100k:
Spark_Full += sc.parallelize(chunky.values.tolist())
YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
Maintenant, il y a aussi une autre option pour tout fichier csv général: https://github.com/seahboonsiew/pyspark-csv comme suit:
Supposons que nous ayons le contexte suivant
sc = SparkContext
sqlCtx = SQLContext or HiveContext
Tout d'abord, distribuer pyspark-csv.py aux exécuteurs utilisant SparkContext
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
Lire les données csv via SparkContext et les convertir en DataFrame
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
Si vos données csv ne contiennent aucun saut de ligne dans l'un des champs, vous pouvez charger vos données avec textFile()
et les Analyser
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name1", "name2"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
Si vous voulez charger csv en tant que dataframe, vous pouvez effectuer les opérations suivantes:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load('sampleFile.csv') # this is your csv file
Cela a bien fonctionné pour moi.
import pandas as pd
data1 = pd.read_csv("test1.csv")
data2 = pd.read_csv("train1.csv")