pyspark en utilisant une tâche pour les mapPartitions lors de la conversion rdd en dataframe
Je ne comprends pas pourquoi il semble que Spark utilise 1 tâche pour rdd.mapPartitions
lors de la conversion du RDD résultant en une base de données.
C'est un problème pour moi parce que je voudrais faire :
DataFrame
--> RDD
--> rdd.mapPartitions
--> DataFrame
pour que je puisse lire dans data (DataFrame), appliquer une fonction non-SQL à des morceaux de données (mapPartitions sur RDD) et ensuite convertir de nouveau dans une DataFrame pour que je puisse utiliser le DataFrame.write
processus.
je suis en mesure d'aller de DataFrame --> mapPartitions et ensuite utiliser un RDD écrivain comme saveAsTextFile mais c'est pas l'idéal car le DataFrame.write
le processus peut faire des choses comme écraser et sauver des données dans le format Orc. Donc je voudrais savoir pourquoi c'est ce qui se passe, mais d'un point de vue pratique, je m'inquiète surtout de pouvoir passer d'une base de données --> mapParitions --> à l'utilisation de la base de données.le processus d'écriture.
Voici une reproduction exemple. Les travaux suivants comme prévu, avec 100 tâches pour le mapPartitions
:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession
.builder
.master("yarn-client")
.enableHiveSupport()
.getOrCreate()
sc = spark.sparkContext
df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)
def f(part):
return [(1,2)]
spark_df.rdd.mapPartitions(f).collect()
cependant si la dernière ligne est changée en quelque chose comme spark_df.rdd.mapPartitions(f).toDF().show()
ensuite, il n'y aura qu'une seule tâche pour l' mapPartitions
travail.
1 réponses
DataFrame.show()
affiche seulement le premier nombre de lignes de votre dataframe, par défaut seulement les 20 premiers. Si ce nombre est plus petit que le nombre de lignes par partition, Spark est paresseux et n'évalue qu'une seule partition, ce qui équivaut à une seule tâche.
Vous pouvez aussi le faire collect
sur une base de données, pour calculer et collecter toutes les partitions et voir 100 tâches à nouveau.
Vous pourrez toujours voir l' runJob
tâche d'abord comme avant, qui est causée par le toDF
appel pour pouvoir pour déterminer le schéma de la dataframe résultante: il doit traiter une seule partition pour être en mesure de déterminer les types de sortie de votre fonction de mappage. Après cette étape initiale l'action réelle telle que collect
se produira sur tous les partitons. Par exemple, pour moi qui exécute votre snippet avec la dernière ligne remplacée par spark_df.rdd.mapPartitions(f).toDF().collect()
les résultats de ces étapes: