pyspark en utilisant une tâche pour les mapPartitions lors de la conversion rdd en dataframe

Je ne comprends pas pourquoi il semble que Spark utilise 1 tâche pour rdd.mapPartitions lors de la conversion du RDD résultant en une base de données.

C'est un problème pour moi parce que je voudrais faire :

DataFrame --> RDD--> rdd.mapPartitions--> DataFrame

pour que je puisse lire dans data (DataFrame), appliquer une fonction non-SQL à des morceaux de données (mapPartitions sur RDD) et ensuite convertir de nouveau dans une DataFrame pour que je puisse utiliser le DataFrame.write processus.

je suis en mesure d'aller de DataFrame --> mapPartitions et ensuite utiliser un RDD écrivain comme saveAsTextFile mais c'est pas l'idéal car le DataFrame.write le processus peut faire des choses comme écraser et sauver des données dans le format Orc. Donc je voudrais savoir pourquoi c'est ce qui se passe, mais d'un point de vue pratique, je m'inquiète surtout de pouvoir passer d'une base de données --> mapParitions --> à l'utilisation de la base de données.le processus d'écriture.

Voici une reproduction exemple. Les travaux suivants comme prévu, avec 100 tâches pour le mapPartitions:

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession 
    .builder 
    .master("yarn-client") 
    .enableHiveSupport() 
    .getOrCreate()

sc = spark.sparkContext

df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)

def f(part):
    return [(1,2)]

spark_df.rdd.mapPartitions(f).collect()

cependant si la dernière ligne est changée en quelque chose comme spark_df.rdd.mapPartitions(f).toDF().show() ensuite, il n'y aura qu'une seule tâche pour l' mapPartitions travail.

quelques screenshots illustrant ceci ci-dessous: enter image description here enter image description here

14
demandé sur bobolafrite 2016-11-22 19:51:15

1 réponses

DataFrame.show() affiche seulement le premier nombre de lignes de votre dataframe, par défaut seulement les 20 premiers. Si ce nombre est plus petit que le nombre de lignes par partition, Spark est paresseux et n'évalue qu'une seule partition, ce qui équivaut à une seule tâche.

Vous pouvez aussi le faire collect sur une base de données, pour calculer et collecter toutes les partitions et voir 100 tâches à nouveau.

Vous pourrez toujours voir l' runJob tâche d'abord comme avant, qui est causée par le toDF appel pour pouvoir pour déterminer le schéma de la dataframe résultante: il doit traiter une seule partition pour être en mesure de déterminer les types de sortie de votre fonction de mappage. Après cette étape initiale l'action réelle telle que collect se produira sur tous les partitons. Par exemple, pour moi qui exécute votre snippet avec la dernière ligne remplacée par spark_df.rdd.mapPartitions(f).toDF().collect() les résultats de ces étapes:

enter image description here

6
répondu sgvd 2016-11-22 17:41:31