Créer une nouvelle colonne avec la fonction dans Spark Dataframe

J'essaie de comprendre la nouvelle API dataframe dans Spark. cela semble être un bon pas en avant, mais avoir du mal à faire quelque chose qui devrait être assez simple. J'ai un dataframe avec 2 colonnes, "ID" et "Montant". Comme exemple générique, disons que je veux retourner une nouvelle colonne appelée "code" qui renvoie un code basé sur la valeur de "Amt". Je peux écrire un functiin quelque chose comme ceci:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}

Quand j'essaie de l'utiliser comme ceci:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", coder(myDF("Amt")))

Je reçois une incompatibilité de type erreurs

found   : org.apache.spark.sql.Column
required: Integer

J'ai essayé de changer le type d'entrée sur ma fonction en org.Apache.étincelle.SQL.Colonne mais je commence alors à obtenir des wrrors avec la compilation de la fonction car il veut un booléen dans l'instruction if.

Est-ce que je fais ça mal? Y a-t-il une meilleure/une autre façon de le faire que d'utiliser withColumn?

Merci pour votre aide.

30
demandé sur J Calbreath 2015-05-13 18:44:10

3 réponses

Disons que vous avez une colonne " Amt " dans votre schéma:

import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))

Je pense que withColumn est la bonne façon d'ajouter une colonne

47
répondu Yijie Shen 2015-05-13 16:44:40

, Nous devrions éviter de définir udf fonctions, autant que possible, en raison de sa surcharge de serialization et deserialization de colonnes.

Vous pouvez obtenir la solution simple when étincelle fonction comme ci-dessous

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))
7
répondu Ramesh Maharjan 2017-06-17 18:19:03

Une autre façon de le faire: Vous pouvez créer n'importe quelle fonction mais selon l'erreur ci-dessus, vous devez définir la fonction comme une variable

Exemple:

val coder = udf((myAmt:Integer) => {
  if (myAmt > 100) "Little"
  else "Big"
})

Maintenant, cette déclaration fonctionne parfaitement:

myDF.withColumn("Code", coder(myDF("Amt")))
1
répondu imran 2018-03-30 17:17:57