Spark: ajouter une colonne à dataframe sous condition
je suis en train de prendre mes données d'entrée:
A B C
--------------
4 blah 2
2 3
56 foo 3
Et d'ajouter une colonne à la fin, selon si B est vide ou pas:
A B C D
--------------------
4 blah 2 1
2 3 0
56 foo 3 1
je peux le faire facilement en enregistrant le datagramme d'entrée comme une table de température, puis en tapant une requête SQL.
mais j'aimerais vraiment savoir comment faire avec les méthodes Scala et ne pas avoir à taper une requête SQL dans Scala.
j'ai essayé <!-Mais je n'arrive pas à faire ce que je veux.
3 réponses
withColumn
avec la fonction when
comme suit:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`
val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5)))
.toDF("A", "B", "C")
val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1))
newDf.show()
spectacles
+---+----+---+---+
| A| B| C| D|
+---+----+---+---+
| 4|blah| 2| 1|
| 2| | 3| 0|
| 56| foo| 3| 1|
|100|null| 5| 0|
+---+----+---+---+
j'ai ajouté (100, null, 5)
ligne pour tester le isNull
cas.
j'ai essayé ce code avec Spark 1.6.0
mais commentée dans le code de when
, il fonctionne sur les versions après 1.4.0
.
ma faute, j'avais manqué une partie de la question.
Meilleur, le plus propre est d'utiliser un UDF
.
Explication dans le code.
// create some example data...BY DataFrame
// note, third record has an empty string
case class Stuff(a:String,b:Int)
val d= sc.parallelize(Seq( ("a",1),("b",2),
("",3) ,("d",4)).map { x => Stuff(x._1,x._2) }).toDF
// now the good stuff.
import org.apache.spark.sql.functions.udf
// function that returns 0 is string empty
val func = udf( (s:String) => if(s.isEmpty) 0 else 1 )
// create new dataframe with added column named "notempty"
val r = d.select( $"a", $"b", func($"a").as("notempty") )
scala> r.show
+---+---+--------+
| a| b|notempty|
+---+---+--------+
| a| 1| 1111|
| b| 2| 1111|
| | 3| 0|
| d| 4| 1111|
+---+---+--------+
Comment faire quelque chose comme cela?
val newDF = df.filter($"B" === "").take(1) match {
case Array() => df
case _ => df.withColumn("D", $"B" === "")
}
en utilisant take(1)
doit avoir un minimum de frapper