Inclure des valeurs null dans une jointure Apache Spark

Je voudrais inclure des valeurs null dans une jointure Apache Spark. Spark n'inclut pas les lignes avec null par défaut.

Voici le comportement D'étincelle par défaut.

val numbersDf = Seq(
  ("123"),
  ("456"),
  (null),
  ("")
).toDF("numbers")

val lettersDf = Seq(
  ("123", "abc"),
  ("456", "def"),
  (null, "zzz"),
  ("", "hhh")
).toDF("numbers", "letters")

val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))

Voici la sortie de joinedDf.show():

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
+-------+-------+

C'est la sortie que je voudrais:

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
|   null|    zzz|
+-------+-------+
25
demandé sur Community 2017-01-18 23:21:34

2 réponses

Spark fournit un opérateur d'égalité NULL sécurisé spécial:

numbersDf
  .join(lettersDf, numbersDf("numbers") <=> lettersDf("numbers"))
  .drop(lettersDf("numbers"))
+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|   null|    zzz|
|       |    hhh|
+-------+-------+

Veillez à ne pas l'utiliser avec Spark 1.5 ou une version antérieure. Avant l'Étincelle de 1,6 qu'il fallait un produit Cartésien (ÉTINCELLE-11111 - Rapide null-safe rejoindre).

Dans Étincelle 2.3.0, ou plus tard, vous pouvez utiliser Column.eqNullSafe dans PySpark:

numbers_df = sc.parallelize([
    ("123", ), ("456", ), (None, ), ("", )
]).toDF(["numbers"])

letters_df = sc.parallelize([
    ("123", "abc"), ("456", "def"), (None, "zzz"), ("", "hhh")
]).toDF(["numbers", "letters"])

numbers_df.join(letters_df, numbers_df.numbers.eqNullSafe(letters_df.numbers))
+-------+-------+-------+
|numbers|numbers|letters|
+-------+-------+-------+
|    456|    456|    def|
|   null|   null|    zzz|
|       |       |    hhh|
|    123|    123|    abc|
+-------+-------+-------+

Et %<=>% dans SparkR:

numbers_df <- createDataFrame(data.frame(numbers = c("123", "456", NA, "")))
letters_df <- createDataFrame(data.frame(
  numbers = c("123", "456", NA, ""),
  letters = c("abc", "def", "zzz", "hhh")
))

head(join(numbers_df, letters_df, numbers_df$numbers %<=>% letters_df$numbers))
  numbers numbers letters
1     456     456     def
2    <NA>    <NA>     zzz
3                     hhh
4     123     123     abc

Avec SQL (Spark 2.2.0+) vous pouvez utiliser IS NOT DISTINCT FROM:

SELECT * FROM numbers JOIN letters 
ON numbers.numbers IS NOT DISTINCT FROM letters.numbers

Ceci peut également être utilisé avec L'API DataFrame:

numbersDf.alias("numbers")
  .join(lettersDf.alias("letters"))
  .where("numbers.numbers IS NOT DISTINCT FROM letters.numbers")
39
répondu user6910411 2018-09-24 22:22:12
val numbers2 = numbersDf.withColumnRenamed("numbers","num1") //rename columns so that we can disambiguate them in the join
val letters2 = lettersDf.withColumnRenamed("numbers","num2")
val joinedDf = numbers2.join(letters2, $"num1" === $"num2" || ($"num1".isNull &&  $"num2".isNull) ,"outer")
joinedDf.select("num1","letters").withColumnRenamed("num1","numbers").show  //rename the columns back to the original names
6
répondu jasonS 2017-01-18 21:15:22