Performance Spark pour Scala vs Python

je préfère Python à Scala. Mais, comme Spark est nativement écrit en Scala, Je m'attendais à ce que mon code tourne plus vite en Scala qu'en Python pour des raisons évidentes.

avec cette hypothèse, j'ai pensé apprendre et écrire la version Scala d'un code de prétraitement très commun pour environ 1 Go de données. Les données sont tirées du concours de SpringLeaf sur Kaggle . Juste pour donner un aperçu des données (il contient 1936 dimensions et 145232 lignes). Les données sont composées de différents types comme int, float, string, boolean. J'utilise 6 noyaux sur 8 pour le traitement des étincelles; c'est pourquoi j'ai utilisé minPartitions=6 pour que chaque noyau ait quelque chose à traiter.

Code Scala

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = ""151900920"1"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Code Python

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '"151910920"1'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala Performance Étape 0 (38 minutes), Étape 1 (18 secondes)) enter image description here

Python Performance Étape 0 (11 minutes), Étape 1 (7 sec) enter image description here

les deux produits graphiques de visualisation dag différents (en raison de laquelle les deux photos montrent différentes fonctions de stade 0 pour Scala ( map ) et Python ( reduceByKey ))

mais, essentiellement les deux codes tentent de transformer les données en (dimension_id, chaîne de la liste des valeurs) RDD et enregistrer sur disque. Le résultat sera utilisé pour calculer diverses statistiques pour chaque dimension.

du point de vue de la Performance, le code Scala pour ce genre de données réelles semble s'exécuter 4 fois plus lent que la version Python. La bonne nouvelle pour moi est que cela m'a donné une bonne motivation pour rester avec Python. La mauvaise nouvelle, c'est que je n'ai pas tout à fait compris pourquoi?

135
demandé sur Community 2015-09-08 20:46:02

1 réponses


la réponse originale concernant le code se trouve ci-dessous.


tout d'abord, vous devez faire la distinction entre différents types D'API, chacun avec ses propres considérations de performance.

RDD API

(pur Python structures avec JVM en fonction d'orchestration)

C'est le composant qui sera le plus affecté par la performance du code Python et les détails de L'implémentation de PySpark. Bien que la performance de Python soit plutôt peu susceptible d'être un problème, il y a au moins quelques facteurs dont vous devez tenir compte:

    "1519450920-dessus de la tête de la JVM de la communication. Pratiquement toutes les données provenant de Python executor doivent passer par un socket et un worker JVM. Bien qu'il s'agisse d'une communication locale relativement efficace, elle n'est toujours pas gratuite.
  • basée sur les Processus exécuteurs testamentaires (Python) par rapport à filetage selon (une seule JVM plusieurs threads) les exécuteurs testamentaires (Scala). Chaque exécuteur Python exécute son propre processus. Comme effet secondaire, il fournit une isolation plus forte que son homologue JVM et un certain contrôle sur le cycle de vie de l'exécuteur, mais potentiellement une utilisation de mémoire significativement plus élevée:

    • empreinte mémoire de l'interprète
    • empreinte des bibliothèques chargées
    • moins diffusion efficace (chaque procédé nécessite une copie propre d'une émission)
  • Performance du code Python lui-même. En général, Scala est plus rapide que Python, mais il varie d'une tâche à l'autre. En outre, vous avez plusieurs options, y compris les JITs comme Numba , c extensions ( Cython ) ou les bibliothèques spécialisées comme Theano . Enfin, si vous n'utilisez pas ML / MLlib (ou simplement la pile de NumPy) , envisagez d'utiliser PyPy comme interprète alternatif. Voir SPARK-3094 .

  • la configuration PySpark fournit l'option spark.python.worker.reuse qui peut être utilisée pour choisir entre le processus de bifurcation Python pour chaque tâche et la réutilisation du processus existant. Cette dernière option semble utile pour éviter un ramassage coûteux des ordures (c'est plus une impression qu'une résultat de tests systématiques), tandis que le premier (par défaut) est optimal pour en cas d'Émissions et d'importations coûteuses.
  • Le comptage de référence
  • , utilisé comme méthode de collecte des ordures de première ligne dans le CPython, fonctionne assez bien avec les charges de travail à étincelles typiques (traitement en flux, pas de cycles de référence) et réduit le risque de longues pauses GC.

MLlib

(mix Python and JVM execution)

considérations de Base sont à peu près la même qu'avant avec quelques questions supplémentaires. Alors que les structures de base utilisées avec MLlib sont des objets RDD Python, tous les algorithmes sont exécutés directement en utilisant Scala.

signifie un coût supplémentaire pour convertir des objets Python en objets Scala et l'inverse, une utilisation accrue de la mémoire et quelques limitations supplémentaires que nous couvrirons plus tard.

dès maintenant (Spark 2.x), le L'API basée sur le RDD est en mode maintenance et doit être retirée de dans Spark 3.0 .

API DataFrame et Spark ML

(exécution JVM avec le code Python limité au pilote)

ce sont probablement les meilleurs choix pour les tâches standard de traitement de données. Puisque le code Python est principalement limité aux opérations logiques de haut niveau sur le pilote, il ne devrait pas y avoir de performances différence entre Python et Scala.

une seule exception est l'utilisation de UDFs Python en ligne qui sont significativement moins efficaces que leurs équivalents Scala. Bien qu'il y ait une certaine chance d'amélioration (il y a eu un développement important dans Spark 2.0.0), la plus grande limitation est un aller-retour complet entre la représentation interne (JVM) et l'interpréteur Python. Si possible, vous devriez préférer une composition d'expressions intégrées ( exemple . Le comportement de Python UDF a été amélioré dans Spark 2.0.0, mais il est encore sous-optimal par rapport à l'exécution native. Cela pourrait s'améliorer à l'avenir avec l'introduction du UDFs vectorisés (SPARK-21190) .

veillez également à éviter toute transmission inutile de données entre DataFrames et RDDs . Cela nécessite une sérialisation et une desérialisation coûteuses, sans parler du transfert de données vers et depuis l'interpréteur Python.

It il est à noter que les appels Py4J ont une latence assez élevée. Cela inclut des appels simples comme:

from pyspark.sql.functions import col

col("foo")

en général, cela ne devrait pas avoir d'importance (les frais généraux sont constants et ne dépendent pas de la quantité de données) mais dans le cas d'applications en temps réel, vous pouvez envisager de mettre en cache/réutiliser des paquets Java.

GraphX d'Allumage et de jeux de données

As for now (Spark 1.6 2.1) nobody provides PySpark API donc vous pouvez dire que PySpark est infiniment pire que Scala.

GraphX

dans la pratique, le développement de GraphX s'est arrêté presque complètement et le projet est actuellement en mode maintenance avec billets JIRA liés fermés en tant que ne sera pas fixer . La bibliothèque GraphFrames fournit une bibliothèque de traitement graphique alternative avec des fixations Python.

Dataset

Subjectivement parlant, il y n'est pas beaucoup de place pour statically typed Datasets en Python et même s'il y avait l'implémentation actuelle de Scala est trop simpliste et ne fournit pas les mêmes avantages de performance que DataFrame .

Streaming

D'après ce que j'ai vu jusqu'à présent, je recommande fortement D'utiliser Scala sur Python. Elle pourrait changer dans le futur si PySpark obtient le support pour les flux structurés mais L'API Scala semble être beaucoup plus robuste, complète et efficace. Mon expérience est assez limitée.

Structuré streaming Spark 2.x semblent réduire l'écart entre les langues, mais pour l'instant, on en est encore à ses premiers jours. Néanmoins, L'API basée sur la DRD est déjà référencée comme "legacy streaming" dans la "Databricks Documentation (date d'accès 2017-03-03)) de sorte qu'il est raisonnable de s'attendre à d'autres efforts d'unification.

Non-performance "considérations sur le 1519370920" Caractéristique parité

toutes les fonctionnalités de Spark ne sont pas exposées via L'API PySpark. Assurez-vous de vérifier si les pièces dont vous avez besoin sont déjà implémentées et essayez de comprendre les limitations possibles.

il est particulièrement important lorsque vous utilisez MLlib et des contextes mixtes similaires (voir appelant la fonction Java/Scala à partir d'une tâche ). Pour être honnête, certaines parties de L'API PySpark, comme mllib.linalg , fournit un ensemble de méthodes plus complet que Scala.

Conception d'API

l'API PySpark reflète fidèlement son équivalent Scala et n'est donc pas exactement pythonique. Cela signifie qu'il est assez facile de faire des correspondances entre les langues, mais en même temps, le code Python peut être beaucoup plus difficile à comprendre.

L'architecture complexe

PySpark de flux de données est relativement complexe par rapport à la pure exécution de la JVM. Il est beaucoup plus difficile de raisonner sur les programmes PySpark ou de déboguer. En outre au moins compréhension de base de Scala et JVM en le général est un must.

Spark 2.x et au-delà

le passage continu vers L'API Dataset , avec L'API RDD gelée, apporte à la fois des opportunités et des défis pour les utilisateurs de Python. Alors que les parties de haut niveau de L'API sont beaucoup plus faciles à exposer en Python, les fonctionnalités plus avancées sont à peu près impossibles à utiliser directement .

de plus les fonctions natives Python continuent d'être des citoyens de seconde classe dans le SQL monde. Espérons que cela s'améliorera à l'avenir avec la sérialisation Apache Arrow ( current efforts target data collection mais UDF serde est un long term goal ).

pour les projets qui dépendent fortement de la base de codes Python, les alternatives pures Python (comme Dask ou Ray ) pourraient être une alternative intéressante.

il ne doit pas être un contre l'autre

L'API Spark DataFrame (SQL, Dataset) fournit une façon élégante d'intégrer le code Scala/Java dans L'application PySpark. Vous pouvez utiliser DataFrames pour exposer des données à un code JVM natif et relire les résultats. J'ai expliqué quelques options ailleurs et vous pouvez trouver un exemple de travail de Python-Scala roundtrip dans comment utiliser une classe Scala à l'intérieur de Pyspark .

il peut être en outre augmenté par l'introduction de Types définis par L'Utilisateur (voir comment définir le schéma pour le type personnalisé dans Spark SQL? ).


Quel est le problème avec le code fourni dans la question

(Avertissement: Pythonista point de vue. Très probablement j'ai manqué quelques trucs de Scala)

tout d'abord, il y a une partie de votre code qui n'a aucun sens. Si vous avez déjà des paires (key, value) créées en utilisant zipWithIndex ou enumerate à quoi bon créer une chaîne de caractères juste pour la diviser juste après? flatMap ne fonctionne pas de façon récursive donc vous pouvez tout simplement céder tuples et sauter la suite map que ce soit.

une autre partie que je trouve problématique est reduceByKey . En général, reduceByKey est utile si l'application de la fonction agrégée peut réduire la quantité de données qui doit être mélangée. Puisque vous simplement concaténer des chaînes il n'y a rien à gagner ici. En ignorant les choses de bas niveau, comme le nombre de références, la quantité de données que vous devez transférer est exactement la même que pour groupByKey .

normalement Je ne m'attarderais pas là-dessus, mais pour autant que je puisse dire c'est un goulot d'étranglement dans votre code Scala. Joindre des chaînes sur JVM est une opération assez coûteuse (voir par exemple: est-ce que la concaténation de chaînes en scala est aussi coûteuse qu'en Java? ). Il cela signifie que quelque chose comme ce _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) qui est équivalent à input4.reduceByKey(valsConcat) dans votre code n'est pas une bonne idée.

Si vous voulez éviter de groupByKey vous pouvez essayer d'utiliser aggregateByKey avec StringBuilder . Quelque chose de semblable devrait faire l'affaire:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

mais je doute que ça en vaille la peine.

compte tenu de ce qui précède, j'ai réécrit votre code comme suit:

Scala :

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python :

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

résultats

in local[6] mode (Intel( R) Xeon (R) CPU E3-1245 V2 @ 3.40 GHz) with 4GB memory per executor it takes (n = 3):

  • Scala - dire: 250.00 s, la fonction ecartype: 12.49
  • Python - dire: 246.66 s, la fonction ecartype: 1.15

Je suis à peu près sûr que la plupart de ce temps est consacré à mélanger, sérialiser, désérialiser et d'autres tâches secondaires. Juste pour le plaisir, voici du code simple fileté naïf en Python qui exécute la même tâche sur cette machine en moins d'une minute:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])
269
répondu zero323 2017-09-01 10:44:44