Comment puis-je lire un grand fichier CSV avec la classe Scala Stream?

Comment lire un gros fichier CSV (> 1 Go) avec un flux Scala? Avez-vous un exemple de code? Ou utiliseriez-vous une autre façon de lire un grand fichier CSV sans le charger en mémoire?

35
demandé sur Jan Willem Tulp 2010-11-23 13:23:04

3 réponses

utilisez juste Source.fromFile(...).getLines comme vous l'avez déjà dit.

qui renvoie un itérateur, qui est déjà paresseux (vous utiliseriez stream comme une collection paresseuse où vous vouliez que les valeurs précédemment récupérées soient mémoizées, de sorte que vous pouvez les lire à nouveau)

si vous avez des problèmes de mémoire, alors le problème réside dans ce que vous faites après getLines. Toute opération comme toList , qui impose une collecte stricte, provoquera problème.

65
répondu Kevin Wright 2010-11-23 11:05:33

j'espère que vous ne voulez pas dire collection.immutable.Stream de Scala avec Stream. C'est pas ce que vous voulez. Stream est paresseux, mais fait de la mémorisation.

Je ne sais pas ce que vous prévoyez de faire, mais juste la lecture du fichier ligne-par-ligne devrait fonctionner très bien sans utiliser de grandes quantités de mémoire.

getLines devrait évaluer paresseusement et ne devrait pas se planter (tant que votre fichier n'a pas plus de 232 lignes, afaik). Si c'est le cas, demandez sur # scala ou déposer un ticket de bug (ou faire les deux).

11
répondu soc 2010-11-23 11:03:23

si vous cherchez à traiter le gros Fichier ligne par ligne tout en évitant d'exiger que le contenu du fichier entier soit chargé en mémoire en une seule fois, vous pouvez utiliser le Iterator retourné par scala.io.Source .

j'ai une petite fonction, tryProcessSource (contenant deux sous-fonctions), que j'utilise pour exactement ces types de cas d'utilisation. La fonction prend jusqu'à quatre paramètres, dont seul le premier est obligatoire. Les autres paramètres ont la valeur par défaut les valeurs fournies.

Voici le profil de la fonction (de pleine fonction de la mise en œuvre est en bas):

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
  ???
}

le premier paramètre, file: File , est requis. Et c'est juste n'importe quelle instance valide de java.io.File qui pointe vers un fichier texte orienté vers la ligne, comme un CSV.

le second paramètre, parseLine: (Int, String) => Option[List[String]] , est facultatif. Et si, il doit être une fonction attend deux paramètres d'entrée; index: Int , unparsedLine: String . Et ensuite retourner un Option[List[String]] . La fonction peut retourner un Some enveloppé List[String] composé des valeurs de colonne valides. Ou il peut retourner un None qui indique que tout le processus de streaming est en train d'avorter. Si ce paramètre n'est pas fourni, une valeur par défaut de (index, line) => Some(List(line)) est fournie. Par défaut, la ligne entière est retournée sous la forme d'une seule valeur String .

le troisième paramètre, filterLine: (Int, List[String]) => Option[Boolean] , est facultatif. Et si, il doit être une fonction attend deux paramètres d'entrée; index: Int , parsedValues: List[String] . Et ensuite retourner un Option[Boolean] . La fonction peut renvoyer un Some enveloppé Boolean indiquant si cette ligne particulière doit être incluse dans la sortie. Ou il peut retourner un None qui indique que tout le processus de streaming est en train d'avorter. Si ce paramètre n'est pas fourni, une valeur par défaut de (index, values) => Some(true) est fournie. Par défaut, toutes les lignes sont incluses.

Le quatrième et dernier paramètre, retainValues: (Int, List[String]) => Option[List[String]] , est facultative. Et si, il doit être une fonction attend deux paramètres d'entrée; index: Int , parsedValues: List[String] . Et ensuite retourner un Option[List[String]] . La fonction peut retourner un Some enveloppé List[String] consistant en un sous-ensemble et/ou une modification des valeurs de colonne existantes. Ou il peut retourner un None qui indique la totalité de la diffusion processus de l'abandon précoce. Si ce paramètre n'est pas fourni, une valeur par défaut de (index, values) => Some(values) est fournie. Par défaut, il en résulte les valeurs interprétées par le second paramètre, parseLine .

Envisager un fichier avec le contenu suivant (4 lignes):

street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240

le profil d'appel suivant...

val tryLinesDefaults =
  tryProcessSource(new File("path/to/file.csv"))

...résultats dans cette sortie pour tryLinesDefaults (le contenu non modifié de le dossier):

Success(
  List(
    List("street,street2,city,state,zip"),
    List("100 Main Str,,Irving,TX,75039"),
    List("231 Park Ave,,Irving,TX,75039"),
    List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
  )
)

le profil d'appel suivant...

val tryLinesParseOnly =
  tryProcessSource(
      new File("path/to/file.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
  )

...résultats dans cette sortie pour tryLinesParseOnly (chaque ligne divisée en valeurs de colonne individuelles):

Success(
  List(
    List("street","street2","city","state","zip"),
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
    List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
  )
)

le profil d'appel suivant...

val tryLinesIrvingTxNoHeader =
  tryProcessSource(
      new File("C:/Users/Jim/Desktop/test.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
    , filterLine =
        (index, parsedValues) =>
          Some(
            (index != 0) && //skip header line
            (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
            (parsedValues(3).toLowerCase == "Tx".toLowerCase)
          )
  )

...résultats dans cette sortie pour tryLinesIrvingTxNoHeader (chaque ligne divisée dans la colonne individuelle valeurs, aucun en-tête et seulement les deux lignes dans Irving, Tx):

Success(
  List(
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
  )
)

Voici la totalité tryProcessSource mise en œuvre de la fonction:

import scala.io.Source
import scala.util.Try

import java.io.File

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
  def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
    try {Try(transfer(source))} finally {source.close()}
  def recursive(
    remaining: Iterator[(String, Int)],
    accumulator: List[List[String]],
    isEarlyAbort: Boolean =
      false
  ): List[List[String]] = {
    if (isEarlyAbort || !remaining.hasNext)
      accumulator
    else {
      val (line, index) =
        remaining.next
      parseLine(index, line) match {
        case Some(values) =>
          filterLine(index, values) match {
            case Some(keep) =>
              if (keep)
                retainValues(index, values) match {
                  case Some(valuesNew) =>
                    recursive(remaining, valuesNew :: accumulator) //capture values
                  case None =>
                    recursive(remaining, accumulator, isEarlyAbort = true) //early abort
                }
              else
                recursive(remaining, accumulator) //discard row
            case None =>
              recursive(remaining, accumulator, isEarlyAbort = true) //early abort
          }
        case None =>
          recursive(remaining, accumulator, isEarlyAbort = true) //early abort
      }
    }
  }
  Try(Source.fromFile(file)).flatMap(
    bufferedSource =>
      usingSource(bufferedSource) {
        source =>
          recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
      }
  )
}

bien que cette solution soit relativement succincte, il m'a fallu beaucoup de temps et de nombreux passages de remaniement avant que je puisse finalement me rendre ici. S'il vous plaît, faites-moi savoir si vous voyez des moyens de l'améliorer.


mise à JOUR: Je viens de poser la question ci-dessous comme c'est propre question de débordement des piles . Et il maintenant a une réponse fixant l'erreur mentionnée ci-dessous.

j'ai eu l'idée d'essayer de le rendre encore plus générique en changeant le paramètre retainValues en transformLine avec la nouvelle définition de la fonction générique ci-dessous. Cependant, je continue à obtenir l'erreur de surbrillance dans IntelliJ " Expression de type Some [List [String]] ne se conforme pas à expected type Option[A] " et n'a pas été en mesure de comprendre comment changer la valeur par défaut afin que l'erreur disparaisse.

def tryProcessSource2[A <: AnyRef](
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  transformLine: (Int, List[String]) => Option[A] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
  ???
}

Toute l'aide sur la façon de faire ce travail serait grandement apprécié.

3
répondu chaotic3quilibrium 2017-05-23 12:02:56