Comment faire fondre Spark DataFrame?
y a-t-il un équivalent de la fonction de fusion des Pandas dans Apache Spark dans PySpark ou au moins dans Scala?
j'étais en train d'exécuter un ensemble de données sample jusqu'à maintenant en python et maintenant je veux utiliser Spark pour l'ensemble des données.
Merci d'avance.
2 réponses
il n'y a pas de fonction intégrée (si vous travaillez avec SQL et que le support De La Ruche est activé , vous pouvez utiliser stack
fonction , mais elle n'est pas exposée dans Spark et n'a pas d'implémentation native) mais il est trivial de lancer la vôtre. Importations requises:
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable
exemple de mise en œuvre:
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
et certains essais (basés sur Pandas doctests ):
import pandas as pd
pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
'B': {0: 1, 1: 3, 2: 5},
'C': {0: 2, 1: 4, 2: 6}})
pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
A variable value
0 a B 1
1 b B 3
2 c B 5
3 a C 2
4 b C 4
5 c C 6
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1|
| a| C| 2|
| b| B| 3|
| b| C| 4|
| c| B| 5|
| c| C| 6|
+---+--------+-----+
Note: pour une utilisation avec les versions héritées de Python supprimer les annotations de type.
est tombé sur cette question dans ma recherche pour une implémentation de melt
dans Spark pour Scala.
postant mon port Scala au cas où quelqu'un trébucherait là-dessus.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
*
* @param df the data frame to melt
*/
implicit class DataFrameFunctions(df: DataFrame) {
/** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
*
* melt is (kind of) the inverse of pivot
* melt is currently (02/2017) not implemented in spark
*
* @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
* @see this is a scala adaptation of /q/how-to-melt-spark-dataframe-42055/"variable", value_name: String = "value") : DataFrame = {
// Create array<struct<variable: str, value: ...>>
val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)
// Add to the DataFrame and explode
val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}
return _tmp.select(cols: _*)
}
}
comme je ne suis pas si avancé que ça en considérant Scala
, je suis sûr qu'il y a place à amélioration.
Tous les commentaires sont les bienvenus.