Comment faire fondre Spark DataFrame?

y a-t-il un équivalent de la fonction de fusion des Pandas dans Apache Spark dans PySpark ou au moins dans Scala?

j'étais en train d'exécuter un ensemble de données sample jusqu'à maintenant en python et maintenant je veux utiliser Spark pour l'ensemble des données.

Merci d'avance.

24
demandé sur user6910411 2017-01-16 08:42:05

2 réponses

il n'y a pas de fonction intégrée (si vous travaillez avec SQL et que le support De La Ruche est activé , vous pouvez utiliser stack fonction , mais elle n'est pas exposée dans Spark et n'a pas d'implémentation native) mais il est trivial de lancer la vôtre. Importations requises:

from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

exemple de mise en œuvre:

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

et certains essais (basés sur Pandas doctests ):

import pandas as pd

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6}})

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5
3  a        C      2
4  b        C      4
5  c        C      6
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

Note: pour une utilisation avec les versions héritées de Python supprimer les annotations de type.

34
répondu user6910411 2017-09-21 00:18:48

est tombé sur cette question dans ma recherche pour une implémentation de melt dans Spark pour Scala.

postant mon port Scala au cas où quelqu'un trébucherait là-dessus.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
 *
 *  @param df the data frame to melt
 */
implicit class DataFrameFunctions(df: DataFrame) {

    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
     * 
     *  melt is (kind of) the inverse of pivot
     *  melt is currently (02/2017) not implemented in spark
     *
     *  @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
     *  @see this is a scala adaptation of /q/how-to-melt-spark-dataframe-42055/"variable", value_name: String = "value") : DataFrame = {

        // Create array<struct<variable: str, value: ...>>
        val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)

        // Add to the DataFrame and explode
        val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

        val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}

        return _tmp.select(cols: _*)

    }
}

comme je ne suis pas si avancé que ça en considérant Scala , je suis sûr qu'il y a place à amélioration.

Tous les commentaires sont les bienvenus.

12
répondu Ahue 2018-04-28 05:59:45