Spark spécifie plusieurs conditions de colonne pour la jointure de dataframe

comment donner plus de conditions de colonne en rejoignant deux dataframes. Par exemple je veux exécuter la commande suivante :

val Lead_all = Leads.join(Utm_Master,  
    Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
    Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")

je veux rejoindre seulement quand ces colonnes correspondent. Mais la syntaxe ci-dessus n'est pas valide car cols ne prend qu'une chaîne. Alors, comment puis-je obtenir ce que je veux.

34
demandé sur zero323 2015-07-06 10:35:53

7 réponses

Il y a une Étincelle colonne / Expression API join pour ce cas:

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
        && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
        && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
        && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
    "left"
)

<=> opérateur dans l'exemple signifie "test d'Égalité qui est sûr pour les valeurs null".

La principale différence avec de simples test D'égalité (===) c'est que le premier est sûr à utiliser dans le cas où l'une des colonnes peuvent contenir de valeurs null.

63
répondu rchukh 2016-09-21 07:49:06

à partir de la version 1.5.0 de Spark (qui n'est actuellement pas publiée), vous pouvez rejoindre sur plusieurs colonnes de DataFrame. Reportez-vous à SPARK-7990: ajouter des méthodes pour faciliter l'équi-jointure sur les touches de jointure multiple.

Python

Leads.join(
    Utm_Master, 
    ["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
    "left_outer"
)

Scala

la question demandait une réponse Scala, mais je n'utilise pas Scala. Voici ma meilleure supposition....

Leads.join(
    Utm_Master,
    Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
    "left_outer"
)
10
répondu dnlbrky 2015-08-08 02:59:11

Une chose que vous pouvez faire est d'utiliser raw SQL:

case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)

val bar = sqlContext.createDataFrame(sc.parallelize(
    Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
    Bar(3, 1, 2, "bar") :: Nil))

val foo = sqlContext.createDataFrame(sc.parallelize(
    Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
    Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))

foo.registerTempTable("foo")
bar.registerTempTable("bar")

sqlContext.sql(
    "SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")
6
répondu zero323 2016-09-21 07:59:51

Pyspark vous pouvez simplement spécifier chaque état séparément:

val Lead_all = Leads.join(Utm_Master,  
    (Leaddetails.LeadSource == Utm_Master.LeadSource) &
    (Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
    (Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
    (Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))

assurez-vous D'utiliser correctement les opérateurs et les parenthèses.

5
répondu Patricia F. 2017-05-03 11:57:43

Scala:

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
        && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
        && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
        && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
    "left"
)

Pour le rendre insensible à la casse,

import org.apache.spark.sql.functions.{lower, upper}

utilisez lower(value) dans la condition de la méthode join.

par exemple: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))

3
répondu Ani Menon 2016-09-21 08:06:52

=== les options me donnent des colonnes dupliquées. J'ai donc utiliser Seq à la place.

val Lead_all = Leads.join(Utm_Master,
    Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")

bien sûr, cela ne fonctionne que lorsque les noms des colonnes de jointure sont les mêmes.

1
répondu Climbs_lika_Spyder 2018-04-13 17:09:09

les supports SQL Spark se joignent sur des tuples de colonnes quand entre parenthèses, comme

... WHERE (list_of_columns1) = (list_of_columns2)

qui est une manière plus courte que de spécifier des expressions égales ( = ) pour chaque paire de colonnes combinées par un ensemble de "et"s.

Par exemple:

SELECT a,b,c
FROM    tab1 t1
WHERE 
   NOT EXISTS
   (    SELECT 1
        FROM    t1_except_t2_df e
        WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
   )

au lieu de

SELECT a,b,c
FROM    tab1 t1
WHERE 
   NOT EXISTS
   (    SELECT 1
        FROM    t1_except_t2_df e
        WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
   )

ce qui est moins lisible surtout quand la liste des colonnes est grande et que vous voulez traiter des NULLs facilement.

0
répondu Tagar 2017-09-06 04:28:55