Spark-foreach Vs foreachPartitions quand utiliser quoi?
je voudrais savoir si le foreachPartitions
se traduira par une meilleure performance, en raison d'un niveau plus élevé de parallélisme, par rapport à la foreach
méthode considérant le cas dans lequel je traverse un RDD
pour effectuer quelques sommes dans une variable d'accumulateur.
5 réponses
foreach
exécuter automatiquement la boucle sur plusieurs noeuds.
Cependant, parfois, vous voulez faire quelques opérations sur chaque nœud. Par exemple, faites une connexion à la base de données. Vous ne pouvez pas juste faire une connexion et le passer dans le foreach
fonction: la connexion se fait sur un seul noeud.
foreachPartition
, vous pouvez faire une connexion à la base de données sur chaque noeud avant d'exécuter la boucle.
Il n'y a vraiment pas beaucoup de différence entre foreach
et foreachPartitions
. Sous les couvertures, tout ce qui foreach
fait appel à foreach
en utilisant la fonction fournie. foreachPartition
vous donne juste la possibilité de faire quelque chose en dehors de la boucle de l'itérateur, habituellement quelque chose de cher comme lancer une connexion de base de données ou quelque chose dans le même sens. Donc, si vous n'avez rien qui pourrait être fait une fois pour chaque itérateur de noeud et réutilisé tout au long, alors Je voudrais suggérer à l'aide de foreach
pour plus de clarté et moins de complexité.
foreach
et foreachPartitions
sont des actions.
foreach(fonction): Unité
une fonction générique pour invoquer des opérations avec effets secondaires. Pour chaque élément de la RDD, il appelle la fonction . Ceci est généralement utilisé pour manipuler des accumulateurs ou pour écrire à des commerces.
Note: modifier les variables autres que les accumulateurs à l'extérieur de la foreach()
peut résulter non défini comportement. Voir Comprendre les fermetures pour plus de détails.
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
foreachPartition(fonction): Unité
semblable à
foreach()
, mais au lieu d'appeler la fonction pour chaque élément, il l'appelle pour chaque partition. La fonction doit être capable pour accepter un itérateur. C'est plus efficace queforeach()
parce que il réduit le nombre d'appels de fonction (commemapPartitions
() ).
Exemple d'Utilisation de foreachPartition
: pour chaque partition une connexion de base de données (à l'intérieur pour chaque bloc de partition) vous voulez utiliser alors ceci est un exemple d'utilisation de la façon dont il peut être fait en utilisant scala.
df.repartition(numofpartitionsyouwant) // numPartitions ~ number of simultaneous DB connections you can planning to give...
def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {
val tableHeader: String = dataFrame.columns.mkString(",")
dataFrame.foreachPartition { partition =>
//NOTE : EACH PARTITION ONE CONNECTION (more better way is to use connection pools)
val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
//Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
partition.grouped(1000).foreach {
group =>
val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
group.foreach {
record => insertString.append("('" + record.mkString(",") + "'),")
}
sqlExecutorConnection.createStatement()
.executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
+ insertString.stripSuffix(",")) }
sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
}
}
Accumulateur échantillons extrait de jouer avec cela... par qui vous pouvez tester la performance
test("Foreach - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x)) assert(accum.value == 6L) } test("Foreach partition - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_))) assert(accum.value == 6L) }
Conclusion:
foreachPartition
opérations sur les cloisons donc évidemment il serait meilleure pointe deforeach
Règle de base :
foreachPartition
doit être utilisé lorsque vous accédez coûteux ressources telles que les connexions aux bases de données, etc.. qui aurait initialiser une par partition plutôt qu'une par élément(foreach
). quand il vient aux accumulateurs vous pouvez mesurer la performance par test ci-dessus méthodes, qui devraient fonctionner plus rapidement en cas d'accumulateurs comme bien..
aussi... voir carte vs mappartitions qui ont un concept similaire mais qui sont des transformations.
foreachPartition
ne signifie pas qu'il s'agit d'une activité par noeud mais plutôt qu'elle est exécutée pour chaque partition et il est possible que vous ayez un grand nombre de partitions par rapport au nombre de noeuds dans ce cas votre performance peut être dégradée. Si vous avez l'intention de faire une activité au niveau du noeud, la solution expliquée ici peut être utile même si elle n'est pas testé par moi
foreachPartition
n'est utile que lorsque vous itérez des données que vous Agrégez par partition.
un bon exemple est le traitement de Clickstream par utilisateur. Vous voudriez vider votre cache de calcul à chaque fois que vous terminez le flux d'événements d'un utilisateur, mais le garder entre les enregistrements du même utilisateur afin de calculer quelques aperçus du comportement de l'utilisateur.