Créer une nouvelle colonne avec la fonction dans Spark Dataframe
J'essaie de comprendre la nouvelle API dataframe dans Spark. cela semble être un bon pas en avant, mais avoir du mal à faire quelque chose qui devrait être assez simple. J'ai un dataframe avec 2 colonnes, "ID" et "Montant". Comme exemple générique, disons que je veux retourner une nouvelle colonne appelée "code" qui renvoie un code basé sur la valeur de "Amt". Je peux écrire un functiin quelque chose comme ceci:
def coder(myAmt:Integer):String {
if (myAmt > 100) "Little"
else "Big"
}
Quand j'essaie de l'utiliser comme ceci:
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
myDF.withColumn("Code", coder(myDF("Amt")))
Je reçois une incompatibilité de type erreurs
found : org.apache.spark.sql.Column
required: Integer
J'ai essayé de changer le type d'entrée sur ma fonction en org.Apache.étincelle.SQL.Colonne mais je commence alors à obtenir des wrrors avec la compilation de la fonction car il veut un booléen dans l'instruction if.
Est-ce que je fais ça mal? Y a-t-il une meilleure/une autre façon de le faire que d'utiliser withColumn?
Merci pour votre aide.
3 réponses
Disons que vous avez une colonne " Amt " dans votre schéma:
import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))
Je pense que withColumn est la bonne façon d'ajouter une colonne
, Nous devrions éviter de définir udf
fonctions, autant que possible, en raison de sa surcharge de serialization
et deserialization
de colonnes.
Vous pouvez obtenir la solution simple when
étincelle fonction comme ci-dessous
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))
Une autre façon de le faire: Vous pouvez créer n'importe quelle fonction mais selon l'erreur ci-dessus, vous devez définir la fonction comme une variable
Exemple:
val coder = udf((myAmt:Integer) => {
if (myAmt > 100) "Little"
else "Big"
})
Maintenant, cette déclaration fonctionne parfaitement:
myDF.withColumn("Code", coder(myDF("Amt")))