Mise à jour d'une colonne dataframe dans spark
En regardant la nouvelle api spark dataframe, on ne sait pas s'il est possible de modifier les colonnes dataframe.
Comment pourrais-je changer une valeur dans la ligne x
colonne y
d'un dataframe?
Dans pandas
ce serait df.ix[x,y] = new_value
Edit: en consolidant ce qui a été dit ci-dessous, vous ne pouvez pas modifier le dataframe existant car il est immuable, mais vous pouvez renvoyer un nouveau dataframe avec les modifications souhaitées.
Si vous voulez simplement remplacer une valeur dans une colonne condition, comme np.where
:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
Si vous souhaitez effectuer une opération sur une colonne et créer une nouvelle colonne qui est ajoutée au dataframe:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
Si vous voulez que la nouvelle colonne ait le même nom que l'ancienne colonne, vous pouvez ajouter l'étape supplémentaire:
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
4 réponses
Bien que vous ne puissiez pas modifier une colonne en tant que telle, Vous pouvez opérer sur une colonne et renvoyer un nouveau DataFrame reflétant cette modification. Pour cela, vous devez d'abord créer un UserDefinedFunction
implémentant l'opération à appliquer, puis appliquer sélectivement cette fonction à la colonne ciblée uniquement. En Python:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
a désormais le même schéma que old_df
(en supposant que old_df.target_column
de type StringType
, aussi), mais toutes les valeurs dans la colonne target_column
sera new_value
.
Généralement lors de la mise à jour d'une colonne, nous voulons mapper une ancienne valeur à une nouvelle valeur. Voici un moyen de le faire dans pyspark sans UDF:
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).
DataFrames
sont basés sur RDDs. Les RDD sont des structures immuables et ne permettent pas la mise à jour des éléments sur site. Pour modifier les valeurs, vous devrez créer un nouveau DataFrame en transformant l'original en utilisant les opérations DSL ou RDD de type SQL comme map
.
Une plate-forme de diapositives fortement recommandée: Introduction de DataFrames dans Spark pour la science des données à grande échelle.
Comme maasg dit que vous pouvez créer un nouveau DataFrame à partir du résultat d'une cartographie appliquée à l'ancien DataFrame. Un exemple pour un DataFrame donné df
avec deux lignes:
val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
Notez que si les types des colonnes changent, vous devez lui donner un schéma correct au lieu de df.schema
. Découvrez l'api de org.apache.spark.sql.Row
pour les méthodes disponibles: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[Update] ou en utilisant UDF dans Scala:
import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
Et si le nom de la colonne doit rester le même, vous pouvez le renommer:
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")