Comment fonctionne Spark aggregate function-aggregateByKey?

disons que j'ai un système de distribution sur 3 noeuds et que mes données sont distribuées entre ces noeuds. par exemple, j'ai un test.fichier csv qui existe sur les 3 noeuds et qui contient 2 colonnes de:

**row   | id,  c.**
---------------
row1  | k1 , c1  
row2  | k1 , c2  
row3  | k1 , c3  
row4  | k2 , c4  
row5  | k2 , c5  
row6  | k2 , c6  
row7  | k3 , c7  
row8  | k3 , c8  
row9  | k3 , c9  
row10 | k4 , c10   
row11 | k4 , c11  
row12 | k4 , c12 

alors J'utilise SparkContext.texte pour lire le fichier comme ca et si. Pour autant que je comprends, chaque étincelle travailleur nœud de lire la partie du fichier. Donc maintenant disons que chaque noeud va stocker:

  • noeud 1: Rangée 1~4
  • noeud 2: ligne 5~8
  • noeud 3: rangée 9~12

ma question Est que disons que je veux faire des calculs sur ces données, et il y a une étape dont j'ai besoin pour grouper la clé ensemble, donc la paire de valeurs de clé serait [k1 [{k1 c1} {k1 c2} {k1 c3}]].. et ainsi de suite.

Il y a une fonction appelée groupByKey() qui est très cher à utiliser, et aggregateByKey() est recommandé d'utiliser. Alors je me demandais comment groupByKey() et aggregateByKey() fonctionne sous le capot? Est-ce que quelqu'un peut utiliser l'exemple que j'ai fourni ci-dessus pour expliquer s'il vous plaît? Après le brassage, où se trouvent les rangées sur chaque noeud?

31
demandé sur gsamaras 2014-07-17 17:14:17

2 réponses

aggregateByKey() est presque identique à reduceByKey()(les deux appelant combineByKey() dans les coulisses), sauf que vous donnez une valeur de départ pour aggregateByKey(). La plupart des gens sont familiers avec reduceByKey(), donc je vais l'utiliser que dans l'explication.

La raison reduceByKey() est tant mieux, car il rend l'utilisation d'un MapReduce dispositif appelé un combinateur. N'importe quelle fonction comme + ou * peut être utilisé de cette façon parce que l'ordre des éléments, il est appelé n'a pas d'importance. Ce permet Étincelle pour démarrer "réduire" les valeurs avec la même clé, même si elles ne sont pas toutes dans la même partition encore.

Sur le revers de la médaille groupByKey() vous donne plus de polyvalence puisque vous écrivez une fonction qui prend une itérable, ce qui signifie que vous pourriez même tirer tous les éléments dans un tableau. Cependant, il est inefficace, parce que pour qu'il fonctionne, l'ensemble des (K,V,) les paires doivent être dans une partition.

L'étape qui déplace les données sur un type d'opération est généralement appelé le aléatoire, au niveau le plus simple, les données sont réparties sur chaque noeud (souvent avec un partitionneur de hachage), puis triées sur chaque noeud.

43
répondu aaronman 2016-12-23 19:57:12

aggregateByKey () est très différent de reduceByKey. Ce qui se passe, c'est que reduceByKey est en quelque sorte un cas particulier d'agrégatebykey.

aggregateByKey () combinera les valeurs pour une clé particulière, et le résultat d'une telle combinaison peut être n'importe quel objet que vous spécifiez. Vous devez spécifier comment les valeurs sont combinées ("ajoutées") à l'intérieur d'une partition (qui est exécutée dans le même noeud) et comment vous combinez le résultat de différentes partitions (qui peuvent être dans différents nœud.) reduceByKey est un cas particulier, dans le sens que le résultat de la combinaison (par exemple une somme) est du même type que les valeurs, et que l'opération lorsqu'elle est combinée à partir de différentes partitions est également la même que l'opération lors de la combinaison des valeurs à l'intérieur d'une partition.

Un exemple: Imaginez que vous ayez une liste de paires. Vous paralléliser:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))

Maintenant vous voulez les "combiner" par la clé produisant une somme. Dans ce cas, reduceByKey et aggregateByKey sont les même:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))

//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))

Maintenant, imaginez que vous voulez l'agrégation d'un Ensemble de valeurs, qui est un autre type que les valeurs, qui sont des nombres entiers (la somme des nombres entiers est aussi des entiers):

import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))
46
répondu Antoni 2016-02-26 12:11:44