Effectuer une jointure dactylographiée dans Scala avec les ensembles de données Spark
j'aime les ensembles de données Spark car ils me donnent des erreurs d'analyse et des erreurs de syntaxe au moment de la compilation et me permettent également de travailler avec getters au lieu de noms/nombres codés en dur. La plupart des calculs peuvent être effectués avec les API de haut niveau de Dataset. Par exemple, il est beaucoup plus simple d'effectuer des opérations agg, select, sum, avg, map, filter ou groupBy en accédant à un ensemble d'objets dactylographiés que d'utiliser les champs de données des lignes RDD.
cependant la jointure opération est absent de ceci, je lis que je peux faire une jointure comme ceci
ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
mais ce n'est pas ce que je veux car je préférerais le faire via l'interface de classe de cas, donc quelque chose de plus comme ça
ds1.joinWith(ds2, ds1.key === ds2.key, "inner")
la meilleure alternative pour le moment semble créer un objet à côté de la classe case et donner ces fonctions pour me fournir le nom de colonne de droite comme une chaîne. Donc j'utiliserais la première ligne de code mais je mettrais une fonction au lieu d'une nom de colonne codé en dur. Mais qui ne se sentent pas assez élégant..
quelqu'un peut-il me conseiller sur d'autres options? Le but est d'avoir une abstraction à partir des noms de colonne réels et de travailler de préférence via les getters de la classe de cas.
J'utilise Spark 1.6.1 et Scala 2.10
1 réponses
Observation
Spark SQL ne peut optimiser jointure que si la condition jointure est basée sur l'opérateur d'égalité. Cela signifie que nous pouvons considérer equijoins et non equijoins séparément.
équi-jointure
Equijoin peut être implémenté d'une manière sûre en cartographiant à la fois Datasets
et "tuples" (clé, valeur), en effectuant des jointures basées sur des clés, et en remodelant le résultat:
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset
def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
(f: T => K, g: U => K)
(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
val ds1_ = ds1.map(x => (f(x), x))
val ds2_ = ds2.map(x => (g(x), x))
ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}
Non équi-jointure
peut être exprimé en utilisant les opérateurs de l'algèbre relationnelle comme r: θ s = σθ(R × S) et converti directement en code.
Spark 2.0
activer crossJoin
et utiliser joinWith
avec le prédicat trivialement égal:
spark.conf.set("spark.sql.crossJoin.enabled", true)
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean) = {
ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}
Étincelle 2.1
Utiliser crossJoin
la méthode:
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean)
(implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}
exemples
case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)
val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS
safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)
Notes
-
il convient de noter que ces méthodes sont qualitativement différentes d'une application directe
joinWith
et nécessitent des transformations coûteusesDeserializeToObject
/SerializeFromObject
(par rapport à ce directjoinWith
peut utiliser des opérations logiques sur les données).c'est similaire au comportement décrit dans Spark 2.0 Dataset vs DataFrame .
-
si vous n'êtes pas limité à L'API SQL Spark
frameless
fournit des extensions de sécurité de type intéressant pourDatasets
(à partir d'aujourd'hui ses supports seulement Spark 2.0):import frameless.TypedDataset val typedPoints1 = TypedDataset.create(points1) val typedPoints2 = TypedDataset.create(points2) typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
-
Dataset
L'API n'est pas stable en 1.6 donc je ne pense pas qu'il soit logique de l'utiliser là. -
bien sûr, ce dessin et les noms descriptifs ne sont pas nécessaires. Vous pouvez facilement utilisez la classe type pour ajouter implicitement ces méthodes à
Dataset
. il n'y a pas de conflit avec les signatures intégrées, donc les deux peuvent être appeléesjoinWith
.