Moyenne Mobile Apache Spark

j'ai un énorme fichier en HDFS avec des points de données de séries chronologiques (Yahoo Stock prices).

je veux trouver la moyenne mobile de la série chronologique Comment dois-je m'y prendre pour écrire le travail D'Apache Spark pour le faire .

32
demandé sur Ahmed Shabib 2014-05-01 09:03:41

3 réponses

Vous pouvez utiliser la fonction coulissante de MLLIB qui fait probablement la même chose que la réponse de Daniel. Vous aurez à trier les données par l'heure avant d'utiliser la fonction coulissante.

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()
26
répondu Arvind 2015-03-04 19:26:45

moyenne mobile est un problème délicat pour Spark, et tout système distribué. Lorsque les données sont réparties sur plusieurs machines, il y aura des fenêtres de temps qui croisent les partitions. Nous devons dupliquer les données au début des partitions, de sorte que le calcul de la moyenne mobile par partition donne une couverture complète.

Voici une façon de faire ceci en Spark. Les données de l'exemple:

val ts = sc.parallelize(0 to 100, 10)
val window = 3

un simple partitionneur qui place chaque ligne dans la partition que nous spécifions par la clé:

class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
  def numPartitions = p
  def getPartition(key: Any) = key.asInstanceOf[Int]
}

Créer les données avec le premier window - 1 lignes copiées sur la partition précédente:

val partitioned = ts.mapPartitionsWithIndex((i, p) => {
  val overlap = p.take(window - 1).toArray
  val spill = overlap.iterator.map((i - 1, _))
  val keep = (overlap.iterator ++ p).map((i, _))
  if (i == 0) keep else keep ++ spill
}).partitionBy(new StraightPartitioner(ts.partitions.length)).values

Simplement calculer la moyenne mobile sur chaque partition:

val movingAverage = partitioned.mapPartitions(p => {
  val sorted = p.toSeq.sorted
  val olds = sorted.iterator
  val news = sorted.iterator
  var sum = news.take(window - 1).sum
  (olds zip news).map({ case (o, n) => {
    sum += n
    val v = sum
    sum -= o
    v
  }})
})

en raison des segments en double, il n'y aura pas de lacunes dans la couverture.

scala> movingAverage.collect.sameElements(3 to 297 by 3)
res0: Boolean = true
19
répondu Daniel Darabos 2014-05-02 20:35:31

Spark 1.4 introduit les fonctions de fenêtrage, ce qui signifie que vous pouvez faire la moyenne mobile comme suit ajuster le fenêtrage avec les rangs entre:

val schema = Seq("id", "cykle", "value")
 val data = Seq(
        (1, 1, 1),
        (1, 2, 11),
        (1, 3, 1),
        (1, 4, 11),
        (1, 5, 1),
        (1, 6, 11),
        (2, 1, 1),
        (2, 2, 11),
        (2, 3, 1),
        (2, 4, 11),
        (2, 5, 1),
        (2, 6, 11)
      )

val dft = sc.parallelize(data).toDF(schema: _*)

dft.select('*).show

// PARTITION BY id  ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5)
val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(-2, 2)

val x = dft.select($"id",$"cykle",avg($"value").over(w))
x.show

Sortie (zeppelin):

schema: Seq[String] = List(id, cykle, value)
data: Seq[(Int, Int, Int)] = List((1,1,1), (1,2,11), (1,3,1), (1,4,11), (1,5,1), (1,6,11), (2,1,1), (2,2,11), (2,3,1), (2,4,11), (2,5,1), (2,6,11))
dft: org.apache.spark.sql.DataFrame = [id: int, cykle: int, value: int]
+---+-----+-----+
| id|cykle|value|
+---+-----+-----+
|  1|    1|    1|
|  1|    2|   11|
|  1|    3|    1|
|  1|    4|   11|
|  1|    5|    1|
|  1|    6|   11|
|  2|    1|    1|
|  2|    2|   11|
|  2|    3|    1|
|  2|    4|   11|
|  2|    5|    1|
|  2|    6|   11|
+---+-----+-----+
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@55cd666f
x: org.apache.spark.sql.DataFrame = [id: int, cykle: int, 'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: double]
+---+-----+-------------------------------------------------------------------------+
| id|cykle|'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING|
+---+-----+-------------------------------------------------------------------------+
|  1|    1|                                                        4.333333333333333|
|  1|    2|                                                                      6.0|
|  1|    3|                                                                      5.0|
|  1|    4|                                                                      7.0|
|  1|    5|                                                                      6.0|
|  1|    6|                                                        7.666666666666667|
|  2|    1|                                                        4.333333333333333|
|  2|    2|                                                                      6.0|
|  2|    3|                                                                      5.0|
|  2|    4|                                                                      7.0|
|  2|    5|                                                                      6.0|
|  2|    6|                                                        7.666666666666667|
+---+-----+————————————————————————————————————+
14
répondu oluies 2016-02-29 15:12:41