Spark / Scala: remplissage en avant avec la dernière observation

En Utilisant Spark 1.4.0, Scala 2.10

J'ai essayé de trouver un moyen de transférer les valeurs null avec la dernière observation connue, mais je ne vois pas un moyen facile. Je pense que c'est une chose assez commune à faire, mais je ne trouve pas d'exemple montrant comment faire cela.

Je vois des fonctions pour transférer remplir NaN avec une valeur, ou des fonctions lag / lead pour remplir ou décaler les données par un décalage, mais rien pour ramasser la dernière valeur connue.

En regardant en ligne, je vois beaucoup de Questions-Réponses sur le même chose dans R, mais pas dans Spark / Scala.

Je pensais à mapper sur une plage de dates, filtrer les Nan des résultats et choisir le dernier élément mais je suppose que je suis confus au sujet de la syntaxe.

En utilisant des DataFrames, j'essaie quelque chose comme

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)

Mais ça ne m'amène nulle part.

La partie filtre ne fonctionne pas; la fonction map renvoie une séquence de spark.SQL.Les colonnes, mais la fonction de filtre attend pour retourner un Booléen, donc j'ai besoin d'obtenir une valeur de la Colonne à tester mais il semble seulement y avoir des méthodes de colonne qui renvoient une colonne.

Y a-t-il un moyen de le faire plus "simplement" sur Spark?

Merci pour votre contribution

Modifier:

Exemple simple exemple d'entrée:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...

Résultats escomptés:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22

NOTE: 1) j'ai beaucoup de colonnes, dont beaucoup ont ce modèle de données manquant, mais pas à la même date/heure. Si j'en ai besoin je ferai la transformation d'une colonne à un temps.

Modifier :

Suite à la réponse de @zero323, j'ai essayé de cette façon:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
   case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap

    val toCarryBd = sc.broadcast(toCarry)

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }

    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}

La variable broadcast se termine par une liste de valeurs sans null. C'est un progrès mais je ne peux toujours pas faire fonctionner la cartographie. mais je n'obtiens rien, car l'index i dans le ne correspond pas aux données d'origine, il correspond au sous-ensemble sans null.

Qu'est-ce qui me manque ici?

EDIT et solution (comme déduit de la réponse de @zero323):

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))

Voir la réponse de zero323 ci-dessous pour plus d'options si vous utilisez des RDDs au lieu de DataFrames. La solution ci-dessus peut ne pas être la plus efficace mais fonctionne pour moi. Si vous cherchez à optimiser, consultez la solution RDD.

22
demandé sur MrE 2015-11-10 04:24:21

1 réponses

Réponse initiale (hypothèse d'une seule série temporelle):

Essayez D'abord d'éviter les fonctions de fenêtre si vous ne pouvez pas fournir la clause PARTITION BY. Il déplace les données vers une seule partition, donc la plupart du temps, ce n'est tout simplement pas faisable.

Ce que vous pouvez faire est de combler les lacunes sur RDD utiliser mapPartitionsWithIndex. Puisque vous n'avez pas fourni d'exemple de données ou de sortie attendue, considérez ceci comme un pseudocode pas un vrai programme Scala:

  • Permet D'abord de commander DataFrame par date et de convertir en RDD

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val rows: RDD[Row] = df.orderBy($"Date").rdd
    
  • Suivant permet de trouver la dernière observation non nulle par partition

    def notMissing(row: Row): Boolean = ???
    
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
      .mapPartitionsWithIndex{ case (i, iter) => 
        Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
      .collectAsMap
    
  • Et convertissez ceci Map en diffusion

    val toCarryBd = sc.broadcast(toCarry)
    
  • Enfin mapper sur les partitions une fois de plus combler les lacunes:

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
      // If it is the beginning of partition and value is missing
      // extract value to fill from toCarryBd.value
      // Remember to correct for empty / only missing partitions
      // otherwise take last not-null from the current partition
    }
    
    val imputed: RDD[Row] = rows
      .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
    
  • Enfin, convertissez en DataFrame

Modifier (partitionné / série temporelle par données de groupe):

Le diable est dans les détails. Si vos données sont partitionnées après tout alors un ensemble le problème peut être résolu en utilisant groupBy. Supposons que vous partitionnez simplement par colonne " v " de type T et Date est un horodatage entier:

def fill(iter: List[Row]): List[Row] = {
  // Just go row by row and fill with last non-empty value
  ???
}

val groupedAndSorted = df.rdd
  .groupBy(_.getAs[T]("k"))
  .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))

val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)

val dfFilled = sqlContext.createDataFrame(rows, df.schema)

De Cette façon, vous pouvez remplir toutes les colonnes en même temps.

Cela peut-il être fait avec des DataFrames au lieu de convertir des va-et-vient en RDD?

Cela dépend, bien qu'il soit peu probable d'être efficace. Si l'écart maximum est relativement faible, vous pouvez faire quelque chose comme ceci:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column

val maxGap: Int = ???  // Maximum gap between observations
val columnsToFill: List[String] = ???  // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed 

// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
  // Generate lag values between 1 and maxGap
  val lags = (1 to maxGap).map(lag(col(c), _)over(w))
  // Add current, coalesce and set alias
  coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}


// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))


// Finally select
val dfImputed = df.select($"*" :: lags: _*)

Il peut être facilement ajusté à utilisez un écart maximum différent par colonne.

Un moyen plus simple d'obtenir un résultat similaire dans la dernière version de Spark est d'utiliser last avec ignoreNulls:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"k").orderBy($"Date")
  .rowsBetween(Window.unboundedPreceding, -1)

df.withColumn("value", coalesce($"value", last($"value", true).over(w)))

Bien qu'il soit possible de supprimer la clause partitionBy et d'appliquer cette méthode globalement, cela coûterait prohibitif avec de grands ensembles de données.

15
répondu zero323 2017-08-08 15:24:53