Expliquer la fonctionnalité de l'agrégat dans Spark

je cherche une meilleure explication des fonctionnalités agrégées disponibles via spark en python.

l'exemple que j'ai est comme suit (en utilisant pyspark de la version 1.2.0 de Spark)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Sortie:

(10, 4)

- je obtenir le résultat attendu (10,4) qui est la somme de 1+2+3+4 et 4 éléments. Si je change la valeur initiale passée à la fonction d'agrégation (1,0)(0,0) j'ai le résultat

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Sortie:

(19, 4)

La valeur augmente de 9. Si je le change en (2,0), la valeur va (28,4) et ainsi de suite.

quelqu'un Peut m'expliquer comment cette valeur est-elle calculée? Je m'attendais à la valeur de 1 à 9, qui devrait voir (11,4) au lieu de cela je vois (19,4).

37
demandé sur zero323 2015-01-30 19:49:18

7 réponses

je n'ai pas assez de points de réputation pour commenter la réponse précédente par Maasg. En fait, la valeur zéro devrait être "neural" vers le seqop, ce qui signifie qu'elle n'interférerait pas avec le résultat du seqop, comme 0 vers add, ou 1 vers *;

vous ne devriez jamais essayer avec des valeurs non-neurales car il pourrait être appliqué des temps arbitraires. Ce comportement n'est pas seulement liée à nombre de partitions.

j'ai essayé la même expérience que celle mentionnée dans la question. avec 1 partition, le zéro la valeur a été appliquée 3 fois. avec 2 cloisons, 6 fois. avec 3 partitions, 9 fois et ça va continuer.

14
répondu John Knight 2017-08-15 11:53:15

Je n'ai pas été entièrement convaincu par la réponse acceptée, et la réponse de JohnKnight a aidé, donc voici mon point de vue:

tout d'Abord, nous allons expliquer aggregate () dans mes mots:

Prototype:

aggregate (zeroValue, seqOp, combOp)

Description:

aggregate() vous permet de prendre un RDD et de générer une valeur unique qui est d'un type différent de ce qui était stocké dans la version originale de la DRD.

Paramètres:

  1. zeroValue: La valeur d'initialisation pour votre résultat, dans le souhaité format.
  2. seqOp: l'opération que vous souhaitez appliquer aux enregistrements RDD. S'exécute une fois pour chaque enregistrement dans une partition.
  3. combOp: définit comment les objets résultants (un pour chaque partition), se combine.

Exemple:

Calculer la somme d'une liste et de la longueur de la liste. Renvoyer le résultat dans une paire de (sum, length).

dans un shell Spark, j'ai d'abord créé une liste avec 4 éléments, avec 2 partitions:

listRDD = sc.parallelize([1,2,3,4], 2)

puis j'ai défini ma seqOp:

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

et mon combOp:

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

et puis j'ai agrégées:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

comme vous pouvez le voir, j'ai donné des noms descriptifs à mes variables, mais laissez-moi expliquez plus en détail:

la première partition a la sous-liste [1, 2]. Nous appliquerons le seqOp à chaque élément de cette liste et cela produira un résultat local, une paire de (sum, length), qui reflétera le résultat localement, seulement dans cette première partition.

alors, commençons:local_result est initialisé à zeroValue paramètre, nous avons fourni l' aggregate(): (0, 0) et list_element est le premier élément de la liste, i.e. 1. La suite c'est ce que il se passe:

0 + 1 = 1
0 + 1 = 1

Maintenant, le résultat est (1, 1), ce qui signifie que jusqu'à présent, pour la 1ère partition, après le traitement, seul le premier élément, la somme est 1 et la longueur 1. Un avis, que local_result est mis à jour de (0, 0), (1, 1).

1 + 2 = 3
1 + 1 = 2

et aujourd'hui, la suite est (3, 2), qui sera le résultat final de la 1ère partition, car ils sont plus d'autres éléments dans la sous-liste de la 1ère partition.

faire la même chose pour la 2ème partition, nous get (7, 2).

maintenant nous appliquons le combOp à chaque résultat local, pour que nous puissions former, le résultat final, global, comme ceci:(3,2) + (7,2) = (10, 4)


Exemple décrit dans la figure:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

Inspiré par ce grand exemple.


alors maintenant si le zeroValue n'est pas (0, 0), mais (1, 0), on pourrait s'attendre à obtenir (8 + 4, 2 + 2) = (12, 4), ce qui ne veut pas expliquer ce que vous ressentez. Même si nous modifions le nombre de partitions de mon exemple, Je ne serai pas en mesure d'obtenir cela à nouveau.

la clé ici est la réponse de JohnKnight, qui déclare que le zeroValue est non seulement analogue au nombre de partitions, mais peut être appliqué plus de fois que vous attendez.

56
répondu gsamaras 2017-09-09 12:16:49

Aggregate vous permet de transformer et de combiner les valeurs du RDD à volonté.

Il utilise deux fonctions:

le premier transforme et ajoute les éléments de la collection originale [T] dans un agrégat local [U] et prend la forme: (U,T) => U. Vous pouvez le voir comme un pli et donc il nécessite aussi un zéro pour cette opération. Cette opération est appliquée localement à chaque partition en parallèle.

voici où se trouve la clé de la question: la seule valeur il convient d'utiliser ici la valeur zéro pour l'opération de réduction. Cette opération est exécutée localement sur chaque partition, par conséquent, ajouter quoi que ce soit à cette valeur zéro ajoutera au résultat multiplié par le nombre de partitions du RDD.

la seconde opération prend 2 valeurs du type de résultat de l'opération précédente [U] et la combine en une valeur. Cette opération réduira les résultats partiels de chaque partition et produira le total réel.

Pour exemple: Avec un RDD de Strings:

val rdd:RDD[String] = ???

disons que vous voulez l'agrégat de la longueur des chaînes dans ce RDD, donc vous feriez:

1) la première opération transformera les chaînes en taille (int) et accumulera les valeurs pour la taille.

val stringSizeCummulator: (Int, String) => Int  = (total, string) => total + string.lenght`

2) Fournir le zéro pour l'opération d'addition (0)

val ZERO = 0

3) une opération pour ajouter deux nombres entiers ensemble:

val add: (Int, Int) => Int = _ + _

Mettre tous ensemble:

rdd.aggregate(ZERO, stringSizeCummulator, add)

alors, pourquoi le Zéro est-il nécessaire? Quand la fonction cummulator est appliquée au premier élément d'une partition, il n'y a pas de Total en cours d'exécution. Le Zéro est utilisé ici.

par exemple. Mon RDD est: - La Partition 1: ["Sauter", "plus de"] - La Partition 2: ["le", "mur"]

résultat:

P1:

  1. stringSizeCummulator(ZÉRO, un "Saut") = 4
  2. stringSizeCummulator(4, "plus de") = 8

P2:

  1. stringSizeCummulator(ZÉRO, "") = 3
  2. stringSizeCummulator(3, "mur") = 7

réduire: ajouter (P1, P2) = 15

26
répondu maasg 2015-01-30 18:09:42

bonnes explications, ça m'a vraiment aidé à comprendre le fonctionnement en dessous de la fonction agrégée. J'ai joué avec pendant un certain temps et a trouvé comme ci-dessous.

  • si vous utilisez l'acc, (0,0), alors il ne va pas changer le résultat de la sortie de la fonction.

  • si l'accumulateur initial est modifié alors il va traiter le résultat comme ci-dessous

[somme des éléments RDD + valeur initiale de l'acc * no. de partitions RDD + acc valeur initiale ]

pour la question ici, je suggérerais de vérifier les partitions car le nombre de partitions devrait être de 8 selon mon interprétation car chaque fois que nous traitons la seq op sur une partition de RDD, cela commencera avec la somme initiale du résultat de l'acc et aussi quand il va faire l'opération de combination il utilisera de nouveau la valeur initiale de l'acc une fois.

pour par ex. Liste (1,2,3,4) et acc (1,0)

Obtenir partitions de scala par RDD.partition.taille

si les Partitions sont 2 et le nombre d'éléments est 4 alors => [ 10 + 1 * 2 + 1 ] => (13,4)

si la Partition est 4 et le nombre d'éléments est 4 alors => [ 10 + 1 * 4 + 1 ] => (15,4)

j'Espère que cette aide, vous pouvez vérifier ici pour l'explication. Grâce.

1
répondu iSingh 2016-11-23 18:11:28

vous pouvez utiliser le code suivant (en scala) pour voir précisément ce que aggregate est en train de faire. Il construit un arbre de toutes les plus et les opérations de fusion:

sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]

val zero : Tree[Int] = Leaf(0)
val rdd = sc.parallelize(1 to 4).repartition(3)

Et puis, dans le shell:

scala> rdd.glom().collect()
res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))

Donc, nous avons ces 3 partitions: [4], [1,2], et [3].

scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))

vous pouvez représenter le résultat sous forme d'arbre:

+
| \__________________
+                    +
| \________          | \
+          +         +   2
| \        | \       | \         
0  +       0  3      0  1
   | \
   0  4

vous pouvez voir qu'un premier élément Zéro est créé sur le noeud du pilote (à la gauche de l'arbre), et puis, le résultats de toutes les partitions sont fusionnées un par un. Vous voyez également que si vous remplacez 0 par 1 comme vous l'avez fait dans votre question, il ajoute 1 à chaque résultat sur chaque partition, et aussi d'ajouter 1 à la valeur initiale du conducteur. Donc, le nombre total de fois où le zéro la valeur que vous donnez est utilisé est:

number of partitions + 1.

Donc, dans votre cas, le résultat de

aggregate(
  (X, Y),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

sera:

(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)

la mise en oeuvre de aggregate est tout à fait simple. Elle est définie dans RDD.scala, ligne 1107:

  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
}
1
répondu lovasoa 2017-06-09 19:07:34

pour les personnes recherchant un code équivalent à Scala pour l'exemple ci - dessus-le voici. Même logique, même d'entrée/de résultat.

scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21

scala> listRDD.collect()
res7: Array[Int] = Array(1, 2, 3, 4)

scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
res10: (Int, Int) = (10,4)
0
répondu Prasanna Saraswathi Krishnan 2016-10-25 03:39:06

j'essaie beaucoup d'expériences sur cette question. Il est préférable de définir le num de partition pour l'agrégat. le seqOp traitera chaque partion et appliquera la valeur initiale, qui plus est, combOp appliquera également la valeur initiale quand combine toutes les partitions. Donc, je vous présente le format de cette question:

final result = sum(list) + num_Of_Partitions * initial_Value + 1
0
répondu W.Sen 2017-02-21 07:39:06