Spark-charger le fichier CSV en tant que DataFrame?

Je voudrais lire un CSV dans spark et le convertir en DataFrame et le stocker dans HDFS avec df.registerTempTable("table_name")

J'ai essayé:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

Erreur que j'ai eu:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Quelle est la bonne commande pour charger le fichier CSV en tant que DataFrame dans Apache Spark?

83
demandé sur mrsrinivas 2015-04-17 19:10:21

8 réponses

Spark-csv fait partie de la fonctionnalité principale de Spark et ne nécessite pas de bibliothèque séparée. Donc, vous pouvez simplement faire par exemple

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
77
répondu Shyamendra Solanki 2018-08-08 14:13:23

Analyse CSV en tant que DataFrame / DataSet avec Spark 2.x

Initialisez D'abord SparkSession objet par défaut, il sera disponible dans les shells comme spark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local")
        .appName("Spark CSV Reader")
        .getOrCreate;

Utilisez l'une des suivantes pour charger CSV comme DataFrame/DataSet

1. Faites-le de manière programmatique

 val df = spark.read
         .format("csv")
         .option("header", "true") //reading the headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

2. Vous pouvez également faire cela de manière SQL

 val df = spark.sql("SELECT * FROM csv.`csv/file/path/in/hdfs`")

Dépendances:

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,


Version Spark
val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

Dépendances:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
116
répondu mrsrinivas 2018-07-22 02:22:30

Avec Spark 2.0, voici comment vous pouvez lire CSV

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
  .config(conf = conf)
  .appName("spark session example")
  .getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
  csv(path)
10
répondu penny chan 2016-10-30 06:34:10

C'est Pour dont Hadoop est 2.6 et Spark est 1.6 et sans paquet "databricks".

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)
10
répondu Eric Yiwei Liu 2017-03-20 09:07:40

En Java 1.8 cet extrait de code fonctionne parfaitement pour lire les fichiers CSV

POM.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

Java

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
7
répondu Rajeev Rathor 2016-10-30 06:32:37

L'exemple Spark 2 de Penny est la façon de le faire dans spark2. Il y a un truc de plus: avoir cet en-tête généré pour vous en faisant une analyse initiale des données, en définissant l'option inferSchema à true

Ici, en supposant que spark est une session spark que vous avez configurée, est l'opération à charger dans le fichier D'index CSV de toutes les images Landsat hébergées par amazon sur S3.

  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */

val csvdata = spark.read.options(Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "true",
    "mode" -> "FAILFAST"))
  .csv("s3a://landsat-pds/scene_list.gz")

La mauvaise nouvelle est: cela déclenche une analyse à travers le fichier; pour quelque chose de grand comme ceci 20 + MB zippé CSV fichier, qui peut prendre 30s sur une connexion longue distance. Gardez cela à l'esprit: il vaut mieux coder manuellement le schéma une fois que vous l'avez entré.

(extrait de code Apache Software License 2.0 sous licence pour éviter toute ambiguïté; quelque chose que j'ai fait comme un test de démonstration / intégration de l'intégration S3)

4
répondu Steve Loughran 2016-10-18 19:34:02

Le format de fichier par défaut est Parquet avec spark.lire.. et la lecture de fichiers csv qui explique pourquoi vous obtenez l'exception. Spécifiez le format csv avec l'api que vous essayez d'utiliser

0
répondu tazak 2018-08-01 06:30:28

Il y a beaucoup de défis à l'analyse D'un fichier CSV, il continue à s'additionner si la taille du fichier est plus grande, s'il y a des caractères non-anglais/escape/separator/autres dans les valeurs de colonne, qui pourraient causer des erreurs d'analyse.

La magie est alors dans les options qui sont utilisées. Ceux qui ont fonctionné pour moi et hope devraient couvrir la plupart des cas de bord sont dans le code ci-dessous:

### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()

### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path, 
                         header=True, 
                         multiLine=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         encoding="UTF-8",
                         sep=',',
                         quote='"', 
                         escape='"',
                         maxColumns=2,
                         inferSchema=True)

J'espère que ça aide. Pour plus d'informations, reportez-vous à: Utilisation de PySpark 2 pour lire CSV ayant une source HTML code

Remarque: le code ci-dessus provient de L'API Spark 2, où L'API de lecture de fichiers CSV est fournie avec des packages intégrés de Spark installable.

Remarque: PySpark est un wrapper Python pour Spark et partage la même API que Scala / Java.

0
répondu karthiks 2018-08-21 10:57:39