Effectuer une jointure dactylographiée dans Scala avec les ensembles de données Spark

j'aime les ensembles de données Spark car ils me donnent des erreurs d'analyse et des erreurs de syntaxe au moment de la compilation et me permettent également de travailler avec getters au lieu de noms/nombres codés en dur. La plupart des calculs peuvent être effectués avec les API de haut niveau de Dataset. Par exemple, il est beaucoup plus simple d'effectuer des opérations agg, select, sum, avg, map, filter ou groupBy en accédant à un ensemble d'objets dactylographiés que d'utiliser les champs de données des lignes RDD.

cependant la jointure opération est absent de ceci, je lis que je peux faire une jointure comme ceci

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")

mais ce n'est pas ce que je veux car je préférerais le faire via l'interface de classe de cas, donc quelque chose de plus comme ça

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

la meilleure alternative pour le moment semble créer un objet à côté de la classe case et donner ces fonctions pour me fournir le nom de colonne de droite comme une chaîne. Donc j'utiliserais la première ligne de code mais je mettrais une fonction au lieu d'une nom de colonne codé en dur. Mais qui ne se sentent pas assez élégant..

quelqu'un peut-il me conseiller sur d'autres options? Le but est d'avoir une abstraction à partir des noms de colonne réels et de travailler de préférence via les getters de la classe de cas.

J'utilise Spark 1.6.1 et Scala 2.10

20
demandé sur Community 2016-11-15 11:30:04

1 réponses

Observation

Spark SQL ne peut optimiser jointure que si la condition jointure est basée sur l'opérateur d'égalité. Cela signifie que nous pouvons considérer equijoins et non equijoins séparément.

équi-jointure

Equijoin peut être implémenté d'une manière sûre en cartographiant à la fois Datasets et "tuples" (clé, valeur), en effectuant des jointures basées sur des clés, et en remodelant le résultat:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset

def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
    (f: T => K, g: U => K)
    (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
  val ds1_ = ds1.map(x => (f(x), x))
  val ds2_ = ds2.map(x => (g(x), x))
  ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}

Non équi-jointure

peut être exprimé en utilisant les opérateurs de l'algèbre relationnelle comme r: θ s = σθ(R × S) et converti directement en code.

Spark 2.0

activer crossJoin et utiliser joinWith avec le prédicat trivialement égal:

spark.conf.set("spark.sql.crossJoin.enabled", true)

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
                         (p: (T, U) => Boolean) = {
  ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}

Étincelle 2.1

Utiliser crossJoin la méthode:

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
    (p: (T, U) => Boolean)
    (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
  ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}

exemples

case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)

val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
  LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS

safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)

Notes

  • il convient de noter que ces méthodes sont qualitativement différentes d'une application directe joinWith et nécessitent des transformations coûteuses DeserializeToObject / SerializeFromObject (par rapport à ce direct joinWith peut utiliser des opérations logiques sur les données).

    c'est similaire au comportement décrit dans Spark 2.0 Dataset vs DataFrame .

  • si vous n'êtes pas limité à L'API SQL Spark frameless fournit des extensions de sécurité de type intéressant pour Datasets (à partir d'aujourd'hui ses supports seulement Spark 2.0):

    import frameless.TypedDataset
    
    val typedPoints1 = TypedDataset.create(points1)
    val typedPoints2 = TypedDataset.create(points2)
    
    typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
    
  • Dataset L'API n'est pas stable en 1.6 donc je ne pense pas qu'il soit logique de l'utiliser là.

  • bien sûr, ce dessin et les noms descriptifs ne sont pas nécessaires. Vous pouvez facilement utilisez la classe type pour ajouter implicitement ces méthodes à Dataset . il n'y a pas de conflit avec les signatures intégrées, donc les deux peuvent être appelées joinWith .

23
répondu user6910411 2017-05-23 12:10:05