Comment fonctionne HashPartitioner?
j'ai lu sur la documentation de HashPartitioner
. Malheureusement, rien n'a été expliqué à part les appels API. Je suis sous l'hypothèse que HashPartitioner
partitionne l'ensemble distribué basé sur le hachage des clés. Par exemple, si mes données sont comme
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
donc partitionneur mettrait ceci dans des partitions différentes avec les mêmes clés tombant dans la même partition. Cependant je ne comprends pas la signification de la argument du constructeur
new HashPartitoner(numPartitions) //What does numPartitions do?
pour l'ensemble de données ci-dessus, en quoi les résultats différeraient-ils si je le faisais
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
alors comment fonctionne HashPartitioner
?
3 réponses
Eh bien, permet de rendre votre ensemble de données légèrement plus intéressant:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
nous avons six éléments:
rdd.count
Long = 6
pas de programme de partitionnement:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
et huit partitions:
rdd.partitions.length
Int = 8
permet maintenant de définir petit helper pour compter le nombre d'éléments par partition:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Puisque nous n'avons pas notre ensemble de données est distribué uniformément entre les partitions ( schéma de partitionnement par défaut dans Spark ):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
permet maintenant de repartitionner notre ensemble de données:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
depuis que le paramètre est passé à HashPartitioner
définit le nombre de partitions, nous nous attendons à une partition:
rddOneP.partitions.length
Int = 1
comme nous n'avons qu'une partition, elle contient tous les éléments:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
noter que l'ordre des valeurs après le mélange n'est pas déterministe.
même façon si nous utilisons HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
nous aurons 2 partitions:
rddTwoP.partitions.length
Int = 2
Puisque rdd
est divisé par des données clés ne sera plus distribué uniformément plus:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
parce qu'avec trois touches et seulement deux valeurs différentes de hashCode
mod numPartitions
il n'y a rien d'inattendu ici:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
juste pour confirmer ce qui précède:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
enfin avec HashPartitioner(7)
nous obtenons sept partitions, trois non vides avec 2 éléments chacun:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
résumé et Notes
-
HashPartitioner
prend un seul argument qui définit le nombre de partitions -
les valeurs sont affectées à partitions utilisant
hash
de touches.hash
fonction peut différer selon la langue (Scala RDD peut utiliserhashCode
,DataSets
utiliser MurmurHash 3, PySpark,portable_hash
).dans un cas simple comme celui-ci, où la clé est un petit entier, vous pouvez supposer que
hash
est une identité (i = hash(i)
).L'API de Scala utilise
nonNegativeMod
pour déterminer la partition basée sur hash calculé, -
si la distribution des clés n'est pas uniforme, vous pouvez vous retrouver dans des situations où une partie de votre cluster est inactif
-
les clés doivent être hachables. Vous pouvez vérifier ma réponse pour une liste comme une clé pour reduceByKey de PySpark pour lire à propos de PySpark questions spécifiques. Un autre problème possible est souligné par documentation Hashparer :
les matrices Java ont des hashCodes qui sont basés sur l'identité des matrices plutôt que sur leur contenu, donc tenter de partager un RDD[Array[ ]] ou RDD[(Array[ ], _)] en utilisant un Hashpartioner produira un résultat inattendu ou incorrect.
-
en Python 3, vous devez vous assurer que le hachage est cohérent. Voir Qu'est-ce qui fait Exception: L'aléatoire de hachage de chaîne doit être désactivé via PYTHONHASHSEED moyen dans pyspark?
-
n'est ni injectif ni surjectif. Plusieurs clés peuvent être attribuées à une seule partition et certaines partitions peuvent rester vides.
-
veuillez noter que les méthodes actuellement basées sur le hash ne fonctionnent pas dans Scala lorsqu'elles sont combinées avec des classes de cas définies par REPL ( égalité des classes de cas en Apache Spark ).
-
HashPartitioner
(ou tout autrePartitioner
) mélange les données. À moins que le partitionnement soit réutilisé entre plusieurs opérations, il ne réduit pas la quantité de données à mélanger.
RDD est distribuée cela signifie qu'il est fractionné sur un certain nombre de pièces. Chacune de ces cloisons est potentiellement sur une machine différente. Hash partitioner avec arument numPartitions
tuyaux sur quelle partition placer la paire (key, value)
de la manière suivante:
- crée exactement
numPartitions
partitions. - Places
(key, value)
dans la partition avec le numéroHash(key) % numPartitions
la méthode HashPartitioner.getPartition
prend comme argument une clé et renvoie le index de la partition à laquelle la clé appartient. Le partitioner doit savoir ce que sont les indices valides, de sorte qu'il retourne des nombres dans la bonne gamme. Le nombre de partitions est spécifié par l'argument du constructeur numPartitions
.
l'implémentation renvoie approximativement key.hashCode() % numPartitions
. Voir Outil De Partitionnement.scala pour plus de détails.