Apache Spark: carte vs mapPartitions?
Quelle est la différence entre un RDD l' map
et mapPartitions
méthode? Et flatMap
se comporte - t-il comme map
ou comme mapPartitions
? Grâce.
(modifier) c'est-à-dire quelle est la différence (sémantiquement ou en termes d'exécution) entre
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
Et:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
3 réponses
Quelle est la différence entre la carte D'un RDD et la méthode mapPartitions?
La méthode map convertit chaque élément de la source RDD en un seul élément du résultat RDD en appliquant une fonction. mapPartitions convertit chaque partition du RDD source en plusieurs éléments du résultat (éventuellement aucun).
Et flatMap se comporte-t-il comme map ou comme mapPartitions?
Ni l'un ni l'autre, flatMap fonctionne sur un seul élément (comme map
) et produit plusieurs éléments de la suite (comme mapPartitions
).
Imp. Astuce:
Chaque fois que vous avez une initialisation lourde, cela devrait être fait une fois pour de nombreux éléments
RDD
plutôt qu'une fois par élémentRDD
, et si d'initialisation, comme la création d'objets à partir d'un tiers bibliothèque, ne peut pas être sérialisé (de sorte que Spark peut le transmettre à travers le cluster aux nœuds de travail), utilisezmapPartitions()
au lieu demap()
.mapPartitions()
prévoit que l'initialisation doit être effectuée une fois par tâche/thread/partition de travail au lieu d'une fois parRDD
données élément pour exemple : voir ci-dessous.
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
T2.
flatMap
se comporte - t-il comme une carte ou commemapPartitions
?
Oui. voir l'exemple 2 de flatmap
.. son auto-explicatif.
T1. Quelle est la différence entre un CA de
map
etmapPartitions
map
Fonctionne la fonction utilisée à un niveau par élément tout enmapPartitions
exerce le fonction au niveau de la partition.
exemple de scénario : si nous avons 100K éléments dans une partition RDD
particulière, nous déclencherons la fonction utilisée par la transformation de mappage 100K fois lorsque nous utilisons map
.
Inversement, si nous utilisons mapPartitions
, nous n'appellerons la fonction particulière qu'une seule fois, mais nous transmettrons tous les enregistrements 100K et récupérerons toutes les réponses dans un appel de fonction.
Il y aura un gain de performance puisque map
Fonctionne sur une fonction particulière tant de fois, surtout si la fonction fait quelque chose de cher à chaque fois qu'elle n'aurait pas besoin de faire si nous passons tous les éléments à la fois (dans le cas de mappartitions
).
Carte
Applique une fonction de transformation sur chaque élément du RDD et renvoie le résultat comme un nouveau RDD.
Liste Des Variantes
Def de la carte[U: ClassTag](f: T => U): CA[U]
Exemple :
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
MapPartitions
C'est une carte spécialisée qui n'est appelée qu'une seule fois pour chaque partition. L'ensemble du contenu des partitions respectives est disponible en tant que flux séquentiel de valeurs via l'argument input (Iterarator [T]). La fonction personnalisée doit retourner un autre itérateur [U]. Le combiné les itérateurs de résultat sont automatiquement convertis en un nouveau RDD. S'il vous plaît notez que les tuples (3,4) et (6,7) sont absents des éléments suivants grâce à le partitionnement que nous avons choisi.
preservesPartitioning
indique si la fonction d'entrée conserve partitionneur, qui devrait êtrefalse
sauf s'il s'agit d'une paire RDD et l'entrée la fonction ne modifie pas les touches.Liste Des Variantes
Def mapPartitions[U: ClassTag](f: Iterator [T] = > Iterator [U], preservesPartitioning: Boolean = false): CA[U]
Exemple 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Exemple 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
Ce qui précède programme peut également être écrit en utilisant flatMap comme suit.
Exemple 2 en utilisant flatmap
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
Conclusion:
mapPartitions
la transformation est plus rapide que map
car elle appelle votre fonction une fois / partition, pas une fois / élément..
Carte :
- Il traite une ligne à la fois, très similaire à la méthode map() de MapReduce.
- vous revenez de la transformation après chaque ligne.
MapPartitions
- Il traite la partition complète en une seule fois.
- Vous ne pouvez revenir de la fonction qu'une seule fois après le traitement de la partition entière.
- Tous les résultats intermédiaires doivent être conservés en mémoire partition entière.
- Vous fournit la fonction setup() map () et cleanup() de MapReduce
Map Vs mapPartitions
http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
Spark Map
http://bytepadding.com/big-data/spark/spark-map/
Spark mapPartitions
http://bytepadding.com/big-data/spark/spark-mappartitions/