Mise à jour d'une colonne dataframe dans spark

En regardant la nouvelle api spark dataframe, on ne sait pas s'il est possible de modifier les colonnes dataframe.

Comment pourrais-je changer une valeur dans la ligne x colonne y d'un dataframe?

Dans pandas ce serait df.ix[x,y] = new_value

Edit: en consolidant ce qui a été dit ci-dessous, vous ne pouvez pas modifier le dataframe existant car il est immuable, mais vous pouvez renvoyer un nouveau dataframe avec les modifications souhaitées.

Si vous voulez simplement remplacer une valeur dans une colonne condition, comme np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

Si vous souhaitez effectuer une opération sur une colonne et créer une nouvelle colonne qui est ajoutée au dataframe:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

Si vous voulez que la nouvelle colonne ait le même nom que l'ancienne colonne, vous pouvez ajouter l'étape supplémentaire:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
46
demandé sur Luke 2015-03-18 00:19:04

4 réponses

Bien que vous ne puissiez pas modifier une colonne en tant que telle, Vous pouvez opérer sur une colonne et renvoyer un nouveau DataFrame reflétant cette modification. Pour cela, vous devez d'abord créer un UserDefinedFunction implémentant l'opération à appliquer, puis appliquer sélectivement cette fonction à la colonne ciblée uniquement. En Python:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df a désormais le même schéma que old_df (en supposant que old_df.target_column de type StringType, aussi), mais toutes les valeurs dans la colonne target_column sera new_value.

52
répondu karlson 2017-02-21 22:02:49

Généralement lors de la mise à jour d'une colonne, nous voulons mapper une ancienne valeur à une nouvelle valeur. Voici un moyen de le faire dans pyspark sans UDF:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).
29
répondu Paul 2015-12-21 22:23:26

DataFrames sont basés sur RDDs. Les RDD sont des structures immuables et ne permettent pas la mise à jour des éléments sur site. Pour modifier les valeurs, vous devrez créer un nouveau DataFrame en transformant l'original en utilisant les opérations DSL ou RDD de type SQL comme map.

Une plate-forme de diapositives fortement recommandée: Introduction de DataFrames dans Spark pour la science des données à grande échelle.

12
répondu maasg 2016-02-24 21:56:18

Comme maasg dit que vous pouvez créer un nouveau DataFrame à partir du résultat d'une cartographie appliquée à l'ancien DataFrame. Un exemple pour un DataFrame donné df avec deux lignes:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Notez que si les types des colonnes changent, vous devez lui donner un schéma correct au lieu de df.schema. Découvrez l'api de org.apache.spark.sql.Row pour les méthodes disponibles: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Update] ou en utilisant UDF dans Scala:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

Et si le nom de la colonne doit rester le même, vous pouvez le renommer:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
11
répondu radek1st 2017-05-23 11:33:15