Comment définir le partitionnement de DataFrame?

J'ai commencé à utiliser Spark SQL et DataFrames dans Spark 1.4.0. Je veux définir un partitionneur personnalisé sur les DataFrames, dans Scala, mais je ne vois pas comment le faire.

L'une des tables de données avec lesquelles je travaille contient une liste de transactions, par compte, silimar à l'exemple suivant.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

Au moins au début, la plupart des calculs se feront entre les transactions au sein d'un compte. Donc, je voudrais avoir les données segmentées de sorte que toutes les transactions pour un compte se trouve dans la même partition Spark.

Mais je ne vois pas de moyen de définir cela. La classe DataFrame a une méthode appelée ' repartition (Int)', où vous pouvez spécifier le nombre de partitions à créer. Mais je ne vois aucune méthode disponible pour définir un partitionneur personnalisé pour un DataFrame, tel que peut être spécifié pour un RDD.

Les données source sont stockées dans Parquet. J'ai vu que lors de l'écriture d'un DataFrame sur Parquet, vous pouvez spécifier une colonne à partitionner, donc vraisemblablement Je pourrais dire à Parquet de partitionner ses données par la colonne 'Compte'. Mais il pourrait y avoir des millions de comptes, et si je comprends bien Parquet, cela créerait un répertoire distinct pour chaque compte, ce qui ne semblait pas être une solution raisonnable.

Existe-t-il un moyen D'obtenir Spark pour partitionner ce DataFrame afin que toutes les données d'un compte soient dans la même partition?

98
demandé sur Community 2015-06-23 09:48:22

5 réponses

Étincelle >= 2.3.0

SPARK-22614 expose le partitionnement de plage.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 expose le partitionnement de format externe dans l'API de source de données V2 .

Étincelle >= 1.6.0

Dans Spark > = 1.6, il est possible d'utiliser le partitionnement par colonne pour la requête et la mise en cache. Voir: SPARK-11410 et SPARK-4849 utiliser repartition méthode:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

Contrairement à RDDs Spark Dataset (y compris Dataset[Row] alias DataFrame) ne peut pas utilisez le partitionneur personnalisé comme pour l'instant. Vous pouvez généralement résoudre ce problème en créant une colonne de partitionnement artificiel, mais cela ne vous donnera pas la même flexibilité.

Étincelle

Une chose que vous pouvez faire est de pré-partitionner les données d'entrée avant de créer un DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Puisque DataFrame la création à partir d'un RDD ne nécessite qu'une simple phase de map, la disposition de partition existante doit être préservée*:

assert(df.rdd.partitions == partitioned.partitions)

De la même manière que vous pouvez repartitionner existant DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

Il semble donc que ce n'est pas impossible. La question reste de savoir si cela a du sens. Je dirais que la plupart du temps, il ne le fait pas:

  1. Le repartitionnement est un processus coûteux. Dans un scénario typique, la plupart des données doivent être sérialisées, mélangées et désérialisées. D'autre part, le nombre d'opérations pouvant bénéficier d'une donnée pré-partitionnée est relativement faible et est encore limité si L'API interne n'est pas conçue pour tirer parti de cela propriété.

    • se joint à certains scénarios, mais cela nécessiterait un support interne,
    • les fonctions de fenêtre appellent avec le partitionneur correspondant. Comme ci-dessus, limité à une définition de guichet unique. Il est déjà partitionné en interne, donc le pré-partitionnement peut être redondant,
    • agrégations simples avec GROUP BY - Il est possible de réduire l'empreinte mémoire des tampons temporaires**, mais le coût global est beaucoup plus élevé. Plus ou moins équivalent à groupByKey.mapValues(_.reduce) (comportement actuel) vs reduceByKey (pré-partitionnement). Peu probable d'être utile dans la pratique.
    • compression de données avec SqlContext.cacheTable. Comme il semble qu'il utilise l'encodage de longueur d'exécution, l'application de OrderedRDDFunctions.repartitionAndSortWithinPartitions pourrait améliorer le taux de compression.
  2. Les performances dépendent fortement d'une distribution des clés. S'il est biaisé, il en résultera une utilisation des ressources sous-optimale. Dans le pire des cas, il sera impossible de terminer le travail à tous.

  3. tout un point d'utiliser un haut L'API déclarative de niveau est de vous isoler d'un détail d'implémentation de bas niveau. Comme déjà mentionné par @dwysakowicz et @RomiKuntsman, une optimisation est un travail de l' Catalyseur Optimiseur de. C'est une bête assez sophistiquée et je doute vraiment que vous puissiez facilement améliorer cela sans plonger beaucoup plus profondément dans ses internes.

Concepts connexes

Partitionnement avec les sources JDBC :

Prise en charge des sources de données JDBC predicates l'argument. Il peut être utilisé comme suit:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Il crée une seule partition JDBC par prédicat. Gardez à l'esprit que si les ensembles créés à l'aide de prédicats individuels ne sont pas disjoints, vous verrez des doublons dans la table résultante.

partitionBy méthode DataFrameWriter:

Spark DataFrameWriter fournit la méthode partitionBy qui peut être utilisée pour "partitionner" les données en écriture. Il sépare les données en écriture en utilisant l'ensemble de colonnes fourni

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

Cela permet prédicat pousser vers le bas en lecture pour les requêtes basées sur la clé:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

Mais ce n'est pas équivalent à DataFrame.repartition. En particulier les agrégations comme:

val cnts = df1.groupBy($"k").sum()

Nécessitera toujours TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy méthode DataFrameWriter (Spark >= 2.0):

bucketBy a des applications similaires à partitionBy mais il n'est disponible que pour les tables (saveAsTable). Écopage informations peuvent être utilisées pour optimiser les jointures:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* Par disposition de partition je veux dire seulement de données distribution. partitioned RDD n'a plus de partitionneur. ** En supposant qu'il n'y ait pas de projection anticipée. Si l'agrégation ne couvre qu'un petit sous-ensemble de colonnes, il n'y a probablement aucun gain.

144
répondu zero323 2018-04-12 22:42:14

Dans Spark HiveContext, pas l'ancien SqlContext Vous pouvez utiliser le HiveQL DISTRIBUTE BY colX... (assure que chacun des n réducteurs obtient des plages de X qui ne se chevauchent pas) & CLUSTER BY colX... (raccourci pour distribuer par et Trier par) par exemple;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

Je ne sais pas comment cela s'intègre avec L'api Spark DF. Ces mots-clés ne sont pas pris en charge dans le SqlContext normal (notez que vous n'avez pas besoin d'avoir un méta-magasin hive pour utiliser le HiveContext)

EDIT: Spark 1.6 + a maintenant ceci dans le natif API DataFrame

8
répondu NightWolf 2016-01-21 04:11:43

Utilisez le DataFrame renvoyé par:

yourDF.orderBy(account)

Il N'y a pas de moyen explicite d'utiliser partitionBy sur un DataFrame, seulement sur un PairRDD, mais lorsque vous triez un DataFrame, il l'utilisera dans son LogicalPlan et cela vous aidera lorsque vous avez besoin de faire des calculs sur chaque compte.

Je suis juste tombé sur le même problème exact, avec un dataframe que je veux partitionner par compte. Je suppose que lorsque vous dites " voulez que les données soient partitionnées de sorte que toutes les transactions pour un compte soient en la même partition Spark", vous le voulez pour l'échelle et la performance, mais votre code n'en dépend pas (comme utiliser mapPartitions() etc), n'est-ce pas?

6
répondu Romi Kuntsman 2015-08-06 08:42:51

J'ai pu le faire en utilisant RDD. Mais je ne sais pas si c'est une solution acceptable pour vous. Une fois que vous avez le DF disponible en tant que RDD, vous pouvez postuler repartitionAndSortWithinPartitions pour effectuer un repartitionnement personnalisé des données.

Voici un échantillon que j'ai utilisé:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)
4
répondu Developer 2015-10-03 22:53:02

Donc, pour commencer avec une sorte de réponse : ) - Vous ne pouvez pas

Je ne suis pas un expert, mais pour autant que je comprenne les DataFrames, ils ne sont pas égaux à rdd et DataFrame n'a pas de partitionneur.

Généralement, L'idée de DataFrame est de fournir un autre niveau d'abstraction qui gère de tels problèmes lui-même. Les requêtes sur DataFrame sont traduites en plan logique qui est traduit en opérations sur RDDs. Le partitionnement que vous avez suggéré sera probablement appliqué automatiquement ou au moins devrait être.

Si vous ne faites pas confiance à SparkSQL qu'il fournira une sorte de travail optimal, vous pouvez toujours transformer DataFrame en RDD[Row] comme suggéré dans les commentaires.

2
répondu Dawid Wysakowicz 2015-09-29 20:26:49