Spark: produire RDD [(X, X)] de toutes les combinaisons possibles à partir de RDD[X]

Est-il possible d'Étincelle à mettre en œuvre."la fonction des combinaisons des collections scala?

   /** Iterates over combinations.
   *
   *  @return   An Iterator which traverses the possible n-element combinations of this $coll.
   *  @example  `"abbbc".combinations(2) = Iterator(ab, ac, bb, bc)`
   */

par exemple comment passer de RDD[X] à RDD[List[X]] ou RDD[(X,X)] pour des combinaisons de taille = 2. Et supposons que toutes les valeurs de RDD sont uniques.

20
demandé sur Eugene Zhulenev 2014-10-25 03:51:09

4 réponses

produit Cartésien et les combinaisons sont deux choses différentes, le produit cartésien va créer un EDR de taille rdd.size() ^ 2 et les combinaisons créeront un RDD de taille rdd.size() choose 2

val rdd = sc.parallelize(1 to 5)
val combinations = rdd.cartesian(rdd).filter{ case (a,b) => a < b }`.
combinations.collect()

Remarque: cela ne fonctionne que si une commande est défini sur les éléments de la liste, puisque nous utilisons <. Celui-ci ne fonctionne que pour choisir deux, mais peut facilement être étendu en s'assurant que la relation a < b pour tous a et b dans la séquence

27
répondu aaronman 2014-10-25 18:57:11

Comme expliqué, cartesian vous donnera n^2 éléments du produit cartésien du RDD avec lui-même. Cet algorithme calcule les combinaisons (n,2) d'un RDD sans avoir à calculer les éléments n^2 d'abord: (chaîne utilisée comme type, généraliser à un type T prend un peu de plomberie avec des fermoirs qui obscurciraient le but ici)

C'est probablement moins efficace que cartésien + filtrage en raison de l'itératif count et take actions qui forcent le calcul de le RDD, mais plus efficace de l'espace comme il calcule seulement le C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2) au lieu de n^2 du produit cartésien.

 import org.apache.spark.rdd._

 def combs(rdd:RDD[String]):RDD[(String,String)] = {
    val count = rdd.count
    if (rdd.count < 2) { 
        sc.makeRDD[(String,String)](Seq.empty)
    } else if (rdd.count == 2) {
        val values = rdd.collect
        sc.makeRDD[(String,String)](Seq((values(0), values(1))))
    } else {
        val elem = rdd.take(1)
        val elemRdd = sc.makeRDD(elem)
        val subtracted = rdd.subtract(elemRdd)  
        val comb = subtracted.map(e  => (elem(0),e))
        comb.union(combs(subtracted))
    } 
 }
3
répondu maasg 2014-10-25 20:06:17

ceci est supporté nativement par un Spark RDD avec le cartesian transformation.

e.g.:

val rdd = sc.parallelize(1 to 5)
val cartesian = rdd.cartesian(rdd)
cartesian.collect

Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5), 
(2,1), (2,2), (2,3), (2,4), (2,5), 
(3,1), (3,2), (3,3), (3,4), (3,5), 
(4,1), (4,2), (4,3), (4,4), (4,5), 
(5,1), (5,2), (5,3), (5,4), (5,5))
2
répondu maasg 2014-10-25 08:03:37

cela crée toutes les combinaisons (n, 2) et fonctionne pour n'importe quel RDD sans exiger n'importe quel ordre sur les éléments de RDD.

val rddWithIndex = rdd.zipWithIndex
rddWithIndex.cartesian(rddWithIndex).filter{case(a, b) => a._2 < b._2}.map{case(a, b) => (a._1, b._1)}

A. _2 et B. _2 sont les indices, tandis que A. _1 et B. _1 sont les éléments de la DRD originale.

Exemple:

notez qu'aucun ordre n'est défini sur les cartes ici.

val m1 = Map('a' -> 1, 'b' -> 2)
val m2 = Map('c' -> 3, 'a' -> 4)
val m3 = Map('e' -> 5, 'c' -> 6, 'b' -> 7)
val rdd = sc.makeRDD(Array(m1, m2, m3))
val rddWithIndex = rdd.zipWithIndex
rddWithIndex.cartesian(rddWithIndex).filter{case(a, b) => a._2 < b._2}.map{case(a, b) => (a._1, b._1)}.collect

Sortie:

Array((Map(a -> 1, b -> 2),Map(c -> 3, a -> 4)), (Map(a -> 1, b -> 2),Map(e -> 5, c -> 6, b -> 7)), (Map(c -> 3, a -> 4),Map(e -> 5, c -> 6, b -> 7)))
1
répondu Behzad Behzadan 2017-03-10 11:37:51