Spark spécifie plusieurs conditions de colonne pour la jointure de dataframe
comment donner plus de conditions de colonne en rejoignant deux dataframes. Par exemple je veux exécuter la commande suivante :
val Lead_all = Leads.join(Utm_Master,
Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
je veux rejoindre seulement quand ces colonnes correspondent. Mais la syntaxe ci-dessus n'est pas valide car cols ne prend qu'une chaîne. Alors, comment puis-je obtenir ce que je veux.
7 réponses
Il y a une Étincelle colonne / Expression API join pour ce cas:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
<=>
opérateur dans l'exemple signifie "test d'Égalité qui est sûr pour les valeurs null".
La principale différence avec de simples test D'égalité (===
) c'est que le premier est sûr à utiliser dans le cas où l'une des colonnes peuvent contenir de valeurs null.
à partir de la version 1.5.0 de Spark (qui n'est actuellement pas publiée), vous pouvez rejoindre sur plusieurs colonnes de DataFrame. Reportez-vous à SPARK-7990: ajouter des méthodes pour faciliter l'équi-jointure sur les touches de jointure multiple.
Python
Leads.join(
Utm_Master,
["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
"left_outer"
)
Scala
la question demandait une réponse Scala, mais je n'utilise pas Scala. Voici ma meilleure supposition....
Leads.join(
Utm_Master,
Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left_outer"
)
Une chose que vous pouvez faire est d'utiliser raw SQL:
case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)
val bar = sqlContext.createDataFrame(sc.parallelize(
Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
Bar(3, 1, 2, "bar") :: Nil))
val foo = sqlContext.createDataFrame(sc.parallelize(
Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))
foo.registerTempTable("foo")
bar.registerTempTable("bar")
sqlContext.sql(
"SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")
Pyspark vous pouvez simplement spécifier chaque état séparément:
val Lead_all = Leads.join(Utm_Master,
(Leaddetails.LeadSource == Utm_Master.LeadSource) &
(Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
(Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
(Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))
assurez-vous D'utiliser correctement les opérateurs et les parenthèses.
Scala:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
Pour le rendre insensible à la casse,
import org.apache.spark.sql.functions.{lower, upper}
utilisez lower(value)
dans la condition de la méthode join.
par exemple: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))
===
les options me donnent des colonnes dupliquées. J'ai donc utiliser Seq
à la place.
val Lead_all = Leads.join(Utm_Master,
Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")
bien sûr, cela ne fonctionne que lorsque les noms des colonnes de jointure sont les mêmes.
les supports SQL Spark se joignent sur des tuples de colonnes quand entre parenthèses, comme
... WHERE (list_of_columns1) = (list_of_columns2)
qui est une manière plus courte que de spécifier des expressions égales ( = ) pour chaque paire de colonnes combinées par un ensemble de "et"s.
Par exemple:
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
)
au lieu de
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
)
ce qui est moins lisible surtout quand la liste des colonnes est grande et que vous voulez traiter des NULLs facilement.