Comment faire de bons exemples de cadres de données reproductibles Apache Spark

j'ai passé beaucoup de temps à lire quelques questions avec les étiquettes et et très souvent je trouve que les affiches ne fournissent pas assez d'information pour vraiment comprendre leur question. Je commente habituellement leur demander de poster un MCVE mais parfois leur faire montrer des données d'entrée/sortie d'échantillon est comme tirer des dents. Par exemple: voir les commentaires sur question .

peut-être qu'une partie du problème est que les gens ne savent tout simplement pas comment créer facilement un MCVE pour spark-dataframes. Je pense qu'il serait utile d'avoir une version spark-dataframe de cette question pandas comme un guide qui peut être lié.

alors comment créer un bon exemple reproductible?

37
demandé sur desertnaut 2018-01-24 19:24:19

4 réponses

fournissent de petites données d'échantillon, qui peuvent être facilement recréées.

à tout le moins, les posters devraient fournir quelques lignes et colonnes sur leur dataframe et code qui peuvent être utilisés pour le créer facilement. Par facile, je veux dire couper et coller. Faites-le aussi petit que possible pour démontrer votre problème.


j'ai la base de données suivante:

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

qui peut être créé avec ce code:

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

afficher la sortie désirée.

posez votre question spécifique et montrez-nous votre résultat désiré.


Comment puis-je créer une nouvelle colonne 'is_divisible' qui a la valeur 'yes' si le jour du mois du 'date' plus 7 jours est divisible par la valeur dans la colonne 'X' , et 'no' sinon?

sortie désirée:

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

Expliquer comment obtenir votre sortie.

Expliquer, en détail, comment vous obtenez votre sortie désirée. Il aide à montrer un exemple de calcul.


par exemple, dans la rangée 1, X = 1 et date = 2017-01-01. L'ajout de 7 jours à ce jour donne 2017-01-08. Le jour du mois est 8 et puisque 8 est divisible par 1, la réponse est 'oui'.

de même, pour la dernière ligne X = 7 et la date = 2017-01-04. Si on ajoute 7 à la date, on obtient 11 Comme jour du mois. Depuis 11 % 7 n'est pas 0, la réponse est "non".


partagez votre code existant.

Montrez - nous ce que vous avez fait ou essayé, y compris tout* du code même si cela ne fonctionne pas. Dites-nous où vous êtes coincé et si vous recevez un message d'erreur, veuillez inclure le message d'erreur.

(*Vous pouvez laisser le code pour créer l'étincelle contexte, mais vous devez inclure toutes les importations.)


je sais comment ajouter une nouvelle colonne qui est date plus 7 jours mais j'ai du mal à obtenir le jour du mois comme un entier.

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

Inclure les versions, les importations, et l'utilisation de la syntaxe


Pour le réglage des performances des postes, inclure le plan d'exécution


l'Analyse de l'étincelle fichiers de sortie

  • MaxU a fourni le code utile dans cette réponse pour aider à analyser les fichiers de sortie Spark dans une base de données.

autres notes.

33
répondu pault 2018-02-02 03:23:14

optimisation des Performances

si la question est liée au réglage de la performance, veuillez inclure les informations suivantes.

Plan D'Exécution

il est préférable d'inclure plan d'exécution étendu . En Python:

df.explain(True) 

En Scala:

df.explain(true)

ou étendue du plan d'exécution avec les statistiques . En Python:

print(df._jdf.queryExecution().stringWithStats())

en Scala:

df.queryExecution.stringWithStats

Mode et informations sur le cluster

  • mode - local , client , `cluster.
  • Cluster manager (le cas échéant) néant (mode local), autonome, de FIL, de Mesos, Kubernetes.
  • informations de configuration de base (nombre de cœurs, mémoire de l'exécuteur).

Timing information

slow est relatif, surtout quand vous portez application non distribuée ou vous attendez une faible latence. Les horaires exacts pour différentes tâches et étapes, peuvent être récupérés à partir de Spark UI ( sc.uiWebUrl ) jobs ou Spark REST UI.

utiliser des noms normalisés pour les contextes

L'utilisation de noms établis pour chaque contexte nous permet de reproduire rapidement le problème.

  • sc - pour SparkContext .
  • sqlContext - pour SQLContext .
  • spark - pour SparkSession .

Fournir des informations de type ( Scala )

puissante inférence de type est l'une des caractéristiques les plus utiles de Scala, mais il rend difficile d'analyser le code sorti de son contexte. Même si le type est évident, d'après le contexte, il est il est préférable d'annoter les variables. Prefer

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

plus de

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

les outils les plus utilisés peuvent vous aider:

  • spark-shell / Scala shell

    utiliser :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
    
  • InteliJ Idea

    Utiliser Alt + =

17
répondu user8371915 2018-01-27 10:41:28

Bonne question et réponse; quelques suggestions supplémentaires:

Inclure votre Spark version

Spark est encore en évolution, mais pas aussi rapidement que dans les jours de 1.x. Il est toujours (mais surtout si vous utilisez un peu ancienne version) une bonne idée d'inclure votre version de travail. Personnellement, je commence toujours mon réponses avec:

spark.version
# u'2.2.0'

ou

sc.version
# u'2.2.0'

incluant votre version Python, aussi, n'est jamais une mauvaise idée.


Comprennent toutes vos importations

si votre question n'est pas strictement sur Spark SQL & dataframes, par exemple si vous avez l'intention d'utiliser votre dataframe dans une opération d'apprentissage machine, être explicite sur vos importations - voir cette question , où les importations ont été ajoutées dans L'OP seulement après un échange approfondi dans le (maintenant supprimé) commentaires (et il s'est avéré que ces importations erronées étaient la cause profonde du problème).

Pourquoi est-ce nécessaire? Parce que, par exemple, ce LDA

from pyspark.mllib.clustering import LDA

est différents à partir de cette LDA:

from pyspark.ml.clustering import LDA

la première vient de L'ancienne API basée sur la technologie RDD (anciennement Spark MLlib), tandis que la seconde vient de la nouvelle API basée sur le datagramme (Spark ML).


Inclure le code soulignant

OK, je vais confesser que c'est subjectif: je crois que les questions PySpark ne devraient pas être étiquetées comme python par défaut ; le truc est, python tag donne automatiquement la mise en évidence du code (et je crois que c'est une raison principale pour ceux qui l'utilisent pour les questions PySpark). Quoi qu'il en soit, si vous êtes d'accord, et que vous souhaitez toujours un code en surbrillance, ajoutez simplement la directive markdown correspondante:

<!-- language-all: lang-python -->

quelque part dans votre post, avant votre premier extrait de code.

[mise à JOUR: j'ai demandé automatique de la syntaxe pour pyspark et sparkr tags - upvotes la plupart de bienvenue]

11
répondu desertnaut 2018-01-29 00:58:25

cette petite fonction d'aide pourrait aider à analyser les fichiers de sortie Spark dans DataFrame:

PySpark:

def read_spark_output(file_path):
    t = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file:///tmp/spark.out")
    # select not-null columns
    return t.select([c for c in t.columns if not c.startswith("_")])

Scala:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String) : org.apache.spark.sql.DataFrame = {
    val t = spark.read
                 .option("header","true")
                 .option("inferSchema","true")
                 .option("delimiter","|")
                 .option("parserLib","UNIVOCITY")
                 .option("ignoreLeadingWhiteSpace","true")
                 .option("ignoreTrailingWhiteSpace","true")
                 .option("comment","+")
                 .csv(filePath)
    t.select(t.columns.filterNot(_.startsWith("_c")).map(t(_)):_*)
}

Utilisation:

df = read_spark_output("file:///tmp/spark.out")
9
répondu MaxU 2018-01-26 00:15:47