Spark-foreach Vs foreachPartitions quand utiliser quoi?

je voudrais savoir si le foreachPartitions se traduira par une meilleure performance, en raison d'un niveau plus élevé de parallélisme, par rapport à la foreach méthode considérant le cas dans lequel je traverse un RDD pour effectuer quelques sommes dans une variable d'accumulateur.

26
demandé sur user3190018 2015-05-27 16:57:45

5 réponses

foreach exécuter automatiquement la boucle sur plusieurs noeuds.

Cependant, parfois, vous voulez faire quelques opérations sur chaque nœud. Par exemple, faites une connexion à la base de données. Vous ne pouvez pas juste faire une connexion et le passer dans le foreach fonction: la connexion se fait sur un seul noeud.

foreachPartition, vous pouvez faire une connexion à la base de données sur chaque noeud avant d'exécuter la boucle.

14
répondu Bin Wang 2015-06-03 10:21:19

Il n'y a vraiment pas beaucoup de différence entre foreach et foreachPartitions. Sous les couvertures, tout ce qui foreach fait appel à foreach en utilisant la fonction fournie. foreachPartition vous donne juste la possibilité de faire quelque chose en dehors de la boucle de l'itérateur, habituellement quelque chose de cher comme lancer une connexion de base de données ou quelque chose dans le même sens. Donc, si vous n'avez rien qui pourrait être fait une fois pour chaque itérateur de noeud et réutilisé tout au long, alors Je voudrais suggérer à l'aide de foreach pour plus de clarté et moins de complexité.

10
répondu Justin Pihony 2015-05-27 15:40:06

foreach et foreachPartitions sont des actions.

foreach(fonction): Unité

une fonction générique pour invoquer des opérations avec effets secondaires. Pour chaque élément de la RDD, il appelle la fonction . Ceci est généralement utilisé pour manipuler des accumulateurs ou pour écrire à des commerces.

Note: modifier les variables autres que les accumulateurs à l'extérieur de la foreach() peut résulter non défini comportement. Voir Comprendre les fermetures pour plus de détails.

exemple:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

foreachPartition(fonction): Unité

semblable à foreach() , mais au lieu d'appeler la fonction pour chaque élément, il l'appelle pour chaque partition. La fonction doit être capable pour accepter un itérateur. C'est plus efficace que foreach() parce que il réduit le nombre d'appels de fonction (comme mapPartitions() ).

Exemple d'Utilisation de foreachPartition: pour chaque partition une connexion de base de données (à l'intérieur pour chaque bloc de partition) vous voulez utiliser alors ceci est un exemple d'utilisation de la façon dont il peut être fait en utilisant scala.

df.repartition(numofpartitionsyouwant) // numPartitions ~ number of simultaneous DB connections you can planning to give...
def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

      val tableHeader: String = dataFrame.columns.mkString(",")    
        dataFrame.foreachPartition { partition =>    
//NOTE : EACH PARTITION ONE CONNECTION (more better way is to use connection pools)
val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)    
        //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
          partition.grouped(1000).foreach {
            group =>    
              val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()

              group.foreach {
                record => insertString.append("('" + record.mkString(",") + "'),")
              }

              sqlExecutorConnection.createStatement()
                .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
                  + insertString.stripSuffix(","))              }


sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
            }
        }

Accumulateur échantillons extrait de jouer avec cela... par qui vous pouvez tester la performance

     test("Foreach - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
        assert(accum.value == 6L)
      }

      test("Foreach partition - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
        assert(accum.value == 6L)
      }

Conclusion:

foreachPartition opérations sur les cloisons donc évidemment il serait meilleure pointe de foreach

Règle de base :

foreachPartition doit être utilisé lorsque vous accédez coûteux ressources telles que les connexions aux bases de données, etc.. qui aurait initialiser une par partition plutôt qu'une par élément(foreach). quand il vient aux accumulateurs vous pouvez mesurer la performance par test ci-dessus méthodes, qui devraient fonctionner plus rapidement en cas d'accumulateurs comme bien..

aussi... voir carte vs mappartitions qui ont un concept similaire mais qui sont des transformations.

9
répondu Ram Ghadiyaram 2017-11-13 11:09:46

foreachPartition ne signifie pas qu'il s'agit d'une activité par noeud mais plutôt qu'elle est exécutée pour chaque partition et il est possible que vous ayez un grand nombre de partitions par rapport au nombre de noeuds dans ce cas votre performance peut être dégradée. Si vous avez l'intention de faire une activité au niveau du noeud, la solution expliquée ici peut être utile même si elle n'est pas testé par moi

1
répondu deenbandhu 2017-09-14 05:29:00

foreachPartition n'est utile que lorsque vous itérez des données que vous Agrégez par partition.

un bon exemple est le traitement de Clickstream par utilisateur. Vous voudriez vider votre cache de calcul à chaque fois que vous terminez le flux d'événements d'un utilisateur, mais le garder entre les enregistrements du même utilisateur afin de calculer quelques aperçus du comportement de l'utilisateur.

0
répondu Oren 2017-06-05 17:04:58