Parse CSV as DataFrame / DataSet avec Apache Spark et Java

je suis nouveau à spark, et je veux utiliser group-by & reduce pour trouver ce qui suit de CSV (une ligne par employé):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN
<!-Je voudrais simplifier le CSV environ avec le groupe par Ministère, Désignation, État avec des colonnes supplémentaires sum (costToCompany) et total Employeecount

Doit obtenir un résultat comme ceci:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

y a-t-il un moyen d'y parvenir en utilisant des transformations et des actions. Ou devrait nous allons pour CA?

17
demandé sur mrsrinivas 2014-08-18 16:07:52

4 réponses

Procédure

  • Créer une Classe (Schéma) pour encapsuler votre structure (il n'est pas nécessaire pour l'approche B, mais ce serait rendre votre code plus facile à lire si vous utilisez Java)

    public class Record implements Serializable {
      String department;
      String designation;
      long costToCompany;
      String state;
      // constructor , getters and setters  
    }
    
  • chargement du fichier CVS (JSON)

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
    
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    

À ce stade, vous avez 2 méthodes:

A. SparkSQL

  • enregistrez une table (en utilisant votre schéma défini Classe)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
  • interrogez la table avec votre groupe D'interrogation souhaité par

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
  • ici vous pouvez aussi faire n'importe quelle autre requête que vous désirez, en utilisant une approche SQL

B. Étincelle

  • Cartographie à l'aide d'une clé composée: Department,Designation,State

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    

    });

  • reduceByKey à l'aide de la clé composite, résumant costToCompany la colonne, et l'accumulation du nombre d'enregistrements par la touche

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    
38
répondu emecas 2018-04-17 21:12:46

le fichier CSV peut être divisé avec le lecteur CSV intégré Spark. Il sera de retour DataFrame / ensemble de données sur la lecture réussie du fichier. Sur le dessus de DataFrame/ DataSet, vous appliquez des opérations de type SQL facilement.

Utilisant Spark 2.x (et au-dessus) avec Java

Create SparkSession object alias spark

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL Example")
    .getOrCreate();

Créer un Schéma pour la Ligne avec StructType

import org.apache.spark.sql.types.StructType;

StructType schema = new StructType()
    .add("department", "string")
    .add("designation", "string")
    .add("ctc", "long")
    .add("state", "string");

créer une base de données à partir du fichier CSV et appliquer schéma

Dataset<Row> df = spark.read()
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .csv("path/input.csv");

plus d'options sur la lecture des données du fichier CSV

nous pouvons maintenant agréger les données de deux façons

1. Chemin SQL

enregistrer une table dans spark sql metastore pour effectuer L'opération SQL

df.createOrReplaceTempView("employee");

Exécuter une requête SQL sur les dataframe

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");

sqlResult.show(); //for testing

nous pouvons même exécuter SQL directement sur le fichier CSV sans créer de table avec Spark SQL


2. Enchaînement d'objets ou programmation ou Java-like way

faire l'importation nécessaire pour les fonctions sql

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

Utiliser groupBy et agg sur dataframe/dataset à effectuer count et sum sur les données

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default

dfResult.show();//for testing

bibliothèques dépendantes

"org.apache.spark" % "spark-core_2.11" % "2.0.0" 
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"
12
répondu mrsrinivas 2018-06-02 13:03:30

ce qui suit n'est peut-être pas tout à fait exact, mais cela devrait vous donner une idée de la façon de jongler avec les données. Il n'est pas joli, devrait être remplacé par des classes de cas, etc, mais comme un exemple rapide de la façon d'utiliser l'api spark, j'espère que c'est assez :)

val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
  .map(_.split(",") /*or use a proper CSV parser*/
  .map( Employee(row(0), row(1), row(2), row(3) )

# the 1 is the amount of employees (which is obviously 1 per line)
val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))

val results = keyVals.reduceByKey{ a,b =>
    (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
}

#debug output
results.take(100).foreach(println)

results
  .map( keyval => someThingToFormatAsCsvStringOrWhatever )
  .saveAsTextFile("hdfs://.../results")
val sqlContext = new SQLContext(sparkContext)

# case classes can easily be registered as tables
employees.registerAsTable("employees")

val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
  from employees 
  group by dep,des,state"""
4
répondu jkgeyti 2014-08-19 18:37:40

pour JSON, si votre fichier texte contient un objet JSON par ligne, vous pouvez utiliser sqlContext.jsonFile(path) pour Spark SQL charger un SchemaRDD (le schéma sera automatiquement déduite). Ensuite, vous pouvez l'enregistrer comme une table et des requêtes SQL. Vous pouvez également charger manuellement le fichier texte comme un RDD[String] contenant un objet JSON par enregistrement et utilisation sqlContext.jsonRDD(rdd) pour l'activer comme un SchemaRDD. jsonRDD est utile lorsque vous avez besoin de pré-traitement de vos données.

4
répondu yhuai 2016-11-01 17:52:50