Spark-charger le fichier CSV en tant que DataFrame?
Je voudrais lire un CSV dans spark et le convertir en DataFrame et le stocker dans HDFS avec df.registerTempTable("table_name")
J'ai essayé:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Erreur que j'ai eu:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Quelle est la bonne commande pour charger le fichier CSV en tant que DataFrame dans Apache Spark?
8 réponses
Spark-csv fait partie de la fonctionnalité principale de Spark et ne nécessite pas de bibliothèque séparée. Donc, vous pouvez simplement faire par exemple
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
Analyse CSV en tant que DataFrame / DataSet avec Spark 2.x
Initialisez D'abord SparkSession
objet par défaut, il sera disponible dans les shells comme spark
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
Utilisez l'une des suivantes pour charger CSV comme
DataFrame/DataSet
1. Faites-le de manière programmatique
val df = spark.read
.format("csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
2. Vous pouvez également faire cela de manière SQL
val df = spark.sql("SELECT * FROM csv.`csv/file/path/in/hdfs`")
Dépendances:
"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,
Version Spark
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
Dépendances:
"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
Avec Spark 2.0, voici comment vous pouvez lire CSV
val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()
val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)
C'est Pour dont Hadoop est 2.6 et Spark est 1.6 et sans paquet "databricks".
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))
val df = sqlContext.createDataFrame(rdd, schema)
En Java 1.8 cet extrait de code fonctionne parfaitement pour lire les fichiers CSV
POM.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
Java
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
L'exemple Spark 2 de Penny est la façon de le faire dans spark2. Il y a un truc de plus: avoir cet en-tête généré pour vous en faisant une analyse initiale des données, en définissant l'option inferSchema
à true
Ici, en supposant que spark
est une session spark que vous avez configurée, est l'opération à charger dans le fichier D'index CSV de toutes les images Landsat hébergées par amazon sur S3.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
val csvdata = spark.read.options(Map(
"header" -> "true",
"ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
"inferSchema" -> "true",
"mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
La mauvaise nouvelle est: cela déclenche une analyse à travers le fichier; pour quelque chose de grand comme ceci 20 + MB zippé CSV fichier, qui peut prendre 30s sur une connexion longue distance. Gardez cela à l'esprit: il vaut mieux coder manuellement le schéma une fois que vous l'avez entré.
(extrait de code Apache Software License 2.0 sous licence pour éviter toute ambiguïté; quelque chose que j'ai fait comme un test de démonstration / intégration de l'intégration S3)
Le format de fichier par défaut est Parquet avec spark.lire.. et la lecture de fichiers csv qui explique pourquoi vous obtenez l'exception. Spécifiez le format csv avec l'api que vous essayez d'utiliser
Il y a beaucoup de défis à l'analyse D'un fichier CSV, il continue à s'additionner si la taille du fichier est plus grande, s'il y a des caractères non-anglais/escape/separator/autres dans les valeurs de colonne, qui pourraient causer des erreurs d'analyse.
La magie est alors dans les options qui sont utilisées. Ceux qui ont fonctionné pour moi et hope devraient couvrir la plupart des cas de bord sont dans le code ci-dessous:
### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path,
header=True,
multiLine=True,
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
encoding="UTF-8",
sep=',',
quote='"',
escape='"',
maxColumns=2,
inferSchema=True)
J'espère que ça aide. Pour plus d'informations, reportez-vous à: Utilisation de PySpark 2 pour lire CSV ayant une source HTML code
Remarque: le code ci-dessus provient de L'API Spark 2, où L'API de lecture de fichiers CSV est fournie avec des packages intégrés de Spark installable.
Remarque: PySpark est un wrapper Python pour Spark et partage la même API que Scala / Java.