Utiliser reduceByKey dans Apache Spark (Scala)
j'ai une liste de Tuples de type : (identifiant, nom, comte de).
Par exemple,
val x = sc.parallelize(List(
("a", "b", 1),
("a", "b", 1),
("c", "b", 1),
("a", "d", 1))
)
j'essaie de réduire cette collection à un type où chaque nom de l'élément est compté.
ainsi, au-dessus de val x est converti en :
(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))
Voici le code que j'utilise actuellement :
val byKey = x.map({case (id,uri,count) => (id,uri)->count})
val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey
grouped2.foreach(println)
j'essaie d'utiliser reduceByKey car il fonctionne plus vite que groupByKey.
comment reduceByKey peut-il être implémenté au lieu de code ci-dessus pour fournir le même de la cartographie ?
3 réponses
suivant votre code:
val byKey = x.map({case (id,uri,count) => (id,uri)->count})
Tu pourrais faire:
val reducedByKey = byKey.reduceByKey(_ + _)
scala> reducedByKey.collect.foreach(println)
((a,d),1)
((a,b),2)
((c,b),1)
PairRDDFunctions[K,V].reduceByKey
prend une fonction de réduction associative qui peut être appliquée au type V du RDD[(K,V)]. En d'autres termes, vous avez besoin d'une fonction f[V](e1:V, e2:V) : V
. Dans ce cas particulier avec la somme sur les Entiers: (x:Int, y:Int) => x+y
ou _ + _
en résumé, notation underscore.
Pour l'enregistrement: reduceByKey
fonctionne mieux que groupByKey
parce qu'il tente d'appliquer la fonction de réduction localement avant la lecture aléatoire/réduire phase. groupByKey
va forcer un mélange de tous les éléments avant de grouper.
votre structure de données d'origine est: RDD [(String, String, Int)], et reduceByKey
ne peut être utilisé que si la structure des données est RDD[(K, V)].
val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
val reduced = kv.reduceByKey(_ + _) // reduced is RDD[((String, String), Int)]
val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
val grouped = kv2.groupByKey() // grouped is RDD[(String, Iterable[(String, Int)])]
grouped.foreach(println)
La syntaxe est la suivante:
reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],
qui dit que pour la même clé dans un RDD il prend les valeurs (qui seront certainement du même type) effectue l'opération fournie dans le cadre de la fonction et renvoie la valeur du même type que de RDD parent.