Parse CSV as DataFrame / DataSet avec Apache Spark et Java
je suis nouveau à spark, et je veux utiliser group-by & reduce pour trouver ce qui suit de CSV (une ligne par employé):
Department, Designation, costToCompany, State
Sales, Trainee, 12000, UP
Sales, Lead, 32000, AP
Sales, Lead, 32000, LA
Sales, Lead, 32000, TN
Sales, Lead, 32000, AP
Sales, Lead, 32000, TN
Sales, Lead, 32000, LA
Sales, Lead, 32000, LA
Marketing, Associate, 18000, TN
Marketing, Associate, 18000, TN
HR, Manager, 58000, TN
<!-Je voudrais simplifier le CSV environ avec le groupe par Ministère, Désignation, État avec des colonnes supplémentaires sum (costToCompany) et total Employeecount
Doit obtenir un résultat comme ceci:
Dept, Desg, state, empCount, totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000
y a-t-il un moyen d'y parvenir en utilisant des transformations et des actions. Ou devrait nous allons pour CA?
4 réponses
Procédure
Créer une Classe (Schéma) pour encapsuler votre structure (il n'est pas nécessaire pour l'approche B, mais ce serait rendre votre code plus facile à lire si vous utilisez Java)
public class Record implements Serializable { String department; String designation; long costToCompany; String state; // constructor , getters and setters }
chargement du fichier CVS (JSON)
JavaSparkContext sc; JavaRDD<String> data = sc.textFile("path/input.csv"); //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified JavaRDD<Record> rdd_records = sc.textFile(data).map( new Function<String, Record>() { public Record call(String line) throws Exception { // Here you can use JSON // Gson gson = new Gson(); // gson.fromJson(line, Record.class); String[] fields = line.split(","); Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]); return sd; } });
À ce stade, vous avez 2 méthodes:
A. SparkSQL
enregistrez une table (en utilisant votre schéma défini Classe)
JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class); table.registerAsTable("record_table"); table.printSchema();
interrogez la table avec votre groupe D'interrogation souhaité par
JavaSchemaRDD res = sqlContext.sql(" select department,designation,state,sum(costToCompany),count(*) from record_table group by department,designation,state ");
ici vous pouvez aussi faire n'importe quelle autre requête que vous désirez, en utilisant une approche SQL
B. Étincelle
Cartographie à l'aide d'une clé composée:
Department
,Designation
,State
JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = rdd_records.mapToPair(new PairFunction<Record, String, Tuple2<Long, Integer>>(){ public Tuple2<String, Tuple2<Long, Integer>> call(Record record){ Tuple2<String, Tuple2<Long, Integer>> t2 = new Tuple2<String, Tuple2<Long,Integer>>( record.Department + record.Designation + record.State, new Tuple2<Long, Integer>(record.costToCompany,1) ); return t2; }
});
reduceByKey à l'aide de la clé composite, résumant
costToCompany
la colonne, et l'accumulation du nombre d'enregistrements par la toucheJavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long, Integer>, Tuple2<Long, Integer>>() { public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1, Tuple2<Long, Integer> v2) throws Exception { return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2); } });
le fichier CSV peut être divisé avec le lecteur CSV intégré Spark. Il sera de retour DataFrame / ensemble de données sur la lecture réussie du fichier. Sur le dessus de DataFrame/ DataSet, vous appliquez des opérations de type SQL facilement.
Utilisant Spark 2.x (et au-dessus) avec Java
Create SparkSession object alias spark
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL Example")
.getOrCreate();
Créer un Schéma pour la Ligne avec StructType
import org.apache.spark.sql.types.StructType;
StructType schema = new StructType()
.add("department", "string")
.add("designation", "string")
.add("ctc", "long")
.add("state", "string");
créer une base de données à partir du fichier CSV et appliquer schéma
Dataset<Row> df = spark.read()
.option("mode", "DROPMALFORMED")
.schema(schema)
.csv("path/input.csv");
plus d'options sur la lecture des données du fichier CSV
nous pouvons maintenant agréger les données de deux façons
1. Chemin SQL
enregistrer une table dans spark sql metastore pour effectuer L'opération SQL
df.createOrReplaceTempView("employee");
Exécuter une requête SQL sur les dataframe
Dataset<Row> sqlResult = spark.sql( "SELECT department, designation, state, SUM(ctc), COUNT(department)" + " FROM employee GROUP BY department, designation, state"); sqlResult.show(); //for testing
nous pouvons même exécuter SQL directement sur le fichier CSV sans créer de table avec Spark SQL
2. Enchaînement d'objets ou programmation ou Java-like way
faire l'importation nécessaire pour les fonctions sql
import static org.apache.spark.sql.functions.count; import static org.apache.spark.sql.functions.sum;
Utiliser
groupBy
etagg
sur dataframe/dataset à effectuercount
etsum
sur les donnéesDataset<Row> dfResult = df.groupBy("department", "designation", "state") .agg(sum("ctc"), count("department")); // After Spark 1.6 columns mentioned in group by will be added to result by default dfResult.show();//for testing
bibliothèques dépendantes
"org.apache.spark" % "spark-core_2.11" % "2.0.0"
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"
ce qui suit n'est peut-être pas tout à fait exact, mais cela devrait vous donner une idée de la façon de jongler avec les données. Il n'est pas joli, devrait être remplacé par des classes de cas, etc, mais comme un exemple rapide de la façon d'utiliser l'api spark, j'espère que c'est assez :)
val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
.map(_.split(",") /*or use a proper CSV parser*/
.map( Employee(row(0), row(1), row(2), row(3) )
# the 1 is the amount of employees (which is obviously 1 per line)
val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))
val results = keyVals.reduceByKey{ a,b =>
(a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
}
#debug output
results.take(100).foreach(println)
results
.map( keyval => someThingToFormatAsCsvStringOrWhatever )
.saveAsTextFile("hdfs://.../results")
val sqlContext = new SQLContext(sparkContext)
# case classes can easily be registered as tables
employees.registerAsTable("employees")
val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*)
from employees
group by dep,des,state"""
pour JSON, si votre fichier texte contient un objet JSON par ligne, vous pouvez utiliser sqlContext.jsonFile(path)
pour Spark SQL charger un SchemaRDD
(le schéma sera automatiquement déduite). Ensuite, vous pouvez l'enregistrer comme une table et des requêtes SQL. Vous pouvez également charger manuellement le fichier texte comme un RDD[String]
contenant un objet JSON par enregistrement et utilisation sqlContext.jsonRDD(rdd)
pour l'activer comme un SchemaRDD
. jsonRDD
est utile lorsque vous avez besoin de pré-traitement de vos données.