Apache Spark: carte vs mapPartitions?

Quelle est la différence entre un RDD l' map et mapPartitions méthode? Et flatMap se comporte - t-il comme map ou comme mapPartitions? Grâce.

(modifier) c'est-à-dire quelle est la différence (sémantiquement ou en termes d'exécution) entre

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

Et:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
98
demandé sur mrsrinivas 2014-01-17 15:41:12

3 réponses

Quelle est la différence entre la carte D'un RDD et la méthode mapPartitions?

La méthode map convertit chaque élément de la source RDD en un seul élément du résultat RDD en appliquant une fonction. mapPartitions convertit chaque partition du RDD source en plusieurs éléments du résultat (éventuellement aucun).

Et flatMap se comporte-t-il comme map ou comme mapPartitions?

Ni l'un ni l'autre, flatMap fonctionne sur un seul élément (comme map) et produit plusieurs éléments de la suite (comme mapPartitions).

90
répondu Alexey Romanov 2015-12-20 22:09:52

Imp. Astuce:

Chaque fois que vous avez une initialisation lourde, cela devrait être fait une fois pour de nombreux éléments RDD plutôt qu'une fois par élément RDD, et si d'initialisation, comme la création d'objets à partir d'un tiers bibliothèque, ne peut pas être sérialisé (de sorte que Spark peut le transmettre à travers le cluster aux nœuds de travail), utilisez mapPartitions() au lieu de map(). mapPartitions() prévoit que l'initialisation doit être effectuée une fois par tâche/thread/partition de travail au lieu d'une fois par RDD données élément pour exemple : voir ci-dessous.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

T2. flatMap se comporte - t-il comme une carte ou comme mapPartitions?

Oui. voir l'exemple 2 de flatmap.. son auto-explicatif.

T1. Quelle est la différence entre un CA de map et mapPartitions

map Fonctionne la fonction utilisée à un niveau par élément tout en mapPartitions exerce le fonction au niveau de la partition.

exemple de scénario : si nous avons 100K éléments dans une partition RDD particulière, nous déclencherons la fonction utilisée par la transformation de mappage 100K fois lorsque nous utilisons map.

Inversement, si nous utilisons mapPartitions, nous n'appellerons la fonction particulière qu'une seule fois, mais nous transmettrons tous les enregistrements 100K et récupérerons toutes les réponses dans un appel de fonction.

Il y aura un gain de performance puisque map Fonctionne sur une fonction particulière tant de fois, surtout si la fonction fait quelque chose de cher à chaque fois qu'elle n'aurait pas besoin de faire si nous passons tous les éléments à la fois (dans le cas de mappartitions).

Carte

Applique une fonction de transformation sur chaque élément du RDD et renvoie le résultat comme un nouveau RDD.

Liste Des Variantes

Def de la carte[U: ClassTag](f: T => U): CA[U]

Exemple :

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

MapPartitions

C'est une carte spécialisée qui n'est appelée qu'une seule fois pour chaque partition. L'ensemble du contenu des partitions respectives est disponible en tant que flux séquentiel de valeurs via l'argument input (Iterarator [T]). La fonction personnalisée doit retourner un autre itérateur [U]. Le combiné les itérateurs de résultat sont automatiquement convertis en un nouveau RDD. S'il vous plaît notez que les tuples (3,4) et (6,7) sont absents des éléments suivants grâce à le partitionnement que nous avons choisi.

preservesPartitioning indique si la fonction d'entrée conserve partitionneur, qui devrait être false sauf s'il s'agit d'une paire RDD et l'entrée la fonction ne modifie pas les touches.

Liste Des Variantes

Def mapPartitions[U: ClassTag](f: Iterator [T] = > Iterator [U], preservesPartitioning: Boolean = false): CA[U]

Exemple 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Exemple 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

Ce qui précède programme peut également être écrit en utilisant flatMap comme suit.

Exemple 2 en utilisant flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Conclusion:

mapPartitions la transformation est plus rapide que map car elle appelle votre fonction une fois / partition, pas une fois / élément..

67
répondu Ram Ghadiyaram 2017-05-23 12:26:22

Carte :

  1. Il traite une ligne à la fois, très similaire à la méthode map() de MapReduce.
  2. vous revenez de la transformation après chaque ligne.

MapPartitions

  1. Il traite la partition complète en une seule fois.
  2. Vous ne pouvez revenir de la fonction qu'une seule fois après le traitement de la partition entière.
  3. Tous les résultats intermédiaires doivent être conservés en mémoire partition entière.
  4. Vous fournit la fonction setup() map () et cleanup() de MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

9
répondu KrazyGautam 2017-08-09 10:05:29