Utiliser reduceByKey dans Apache Spark (Scala)

j'ai une liste de Tuples de type : (identifiant, nom, comte de).

Par exemple,

val x = sc.parallelize(List(
    ("a", "b", 1),
    ("a", "b", 1),
    ("c", "b", 1),
    ("a", "d", 1))
)

j'essaie de réduire cette collection à un type où chaque nom de l'élément est compté.

ainsi, au-dessus de val x est converti en :

(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))

Voici le code que j'utilise actuellement :

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey

grouped2.foreach(println)

j'essaie d'utiliser reduceByKey car il fonctionne plus vite que groupByKey.

comment reduceByKey peut-il être implémenté au lieu de code ci-dessus pour fournir le même de la cartographie ?

20
demandé sur Seonghyeon Cho 2014-06-06 02:58:08

3 réponses

suivant votre code:

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

Tu pourrais faire:

val reducedByKey = byKey.reduceByKey(_ + _)

scala> reducedByKey.collect.foreach(println)
((a,d),1)
((a,b),2)
((c,b),1)

PairRDDFunctions[K,V].reduceByKey prend une fonction de réduction associative qui peut être appliquée au type V du RDD[(K,V)]. En d'autres termes, vous avez besoin d'une fonction f[V](e1:V, e2:V) : V . Dans ce cas particulier avec la somme sur les Entiers: (x:Int, y:Int) => x+y ou _ + _ en résumé, notation underscore.

Pour l'enregistrement: reduceByKey fonctionne mieux que groupByKey parce qu'il tente d'appliquer la fonction de réduction localement avant la lecture aléatoire/réduire phase. groupByKey va forcer un mélange de tous les éléments avant de grouper.

26
répondu maasg 2014-06-06 08:43:36

votre structure de données d'origine est: RDD [(String, String, Int)], et reduceByKey ne peut être utilisé que si la structure des données est RDD[(K, V)].

val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
val reduced = kv.reduceByKey(_ + _)       // reduced is RDD[((String, String), Int)]
val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
val grouped = kv2.groupByKey()            // grouped is RDD[(String, Iterable[(String, Int)])]
grouped.foreach(println)
5
répondu cloud 2014-06-06 08:58:10

La syntaxe est la suivante:

reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],

qui dit que pour la même clé dans un RDD il prend les valeurs (qui seront certainement du même type) effectue l'opération fournie dans le cadre de la fonction et renvoie la valeur du même type que de RDD parent.

0
répondu napster 2017-03-13 06:45:27