Comment puis-je lire un grand fichier CSV avec la classe Scala Stream?
Comment lire un gros fichier CSV (> 1 Go) avec un flux Scala? Avez-vous un exemple de code? Ou utiliseriez-vous une autre façon de lire un grand fichier CSV sans le charger en mémoire?
3 réponses
utilisez juste Source.fromFile(...).getLines
comme vous l'avez déjà dit.
qui renvoie un itérateur, qui est déjà paresseux (vous utiliseriez stream comme une collection paresseuse où vous vouliez que les valeurs précédemment récupérées soient mémoizées, de sorte que vous pouvez les lire à nouveau)
si vous avez des problèmes de mémoire, alors le problème réside dans ce que vous faites après getLines. Toute opération comme toList
, qui impose une collecte stricte, provoquera problème.
j'espère que vous ne voulez pas dire collection.immutable.Stream
de Scala avec Stream. C'est pas ce que vous voulez. Stream est paresseux, mais fait de la mémorisation.
Je ne sais pas ce que vous prévoyez de faire, mais juste la lecture du fichier ligne-par-ligne devrait fonctionner très bien sans utiliser de grandes quantités de mémoire.
getLines
devrait évaluer paresseusement et ne devrait pas se planter (tant que votre fichier n'a pas plus de 232 lignes, afaik). Si c'est le cas, demandez sur # scala ou déposer un ticket de bug (ou faire les deux).
si vous cherchez à traiter le gros Fichier ligne par ligne tout en évitant d'exiger que le contenu du fichier entier soit chargé en mémoire en une seule fois, vous pouvez utiliser le Iterator
retourné par scala.io.Source
.
j'ai une petite fonction, tryProcessSource
(contenant deux sous-fonctions), que j'utilise pour exactement ces types de cas d'utilisation. La fonction prend jusqu'à quatre paramètres, dont seul le premier est obligatoire. Les autres paramètres ont la valeur par défaut les valeurs fournies.
Voici le profil de la fonction (de pleine fonction de la mise en œuvre est en bas):
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
???
}
le premier paramètre, file: File
, est requis. Et c'est juste n'importe quelle instance valide de java.io.File
qui pointe vers un fichier texte orienté vers la ligne, comme un CSV.
le second paramètre, parseLine: (Int, String) => Option[List[String]]
, est facultatif. Et si, il doit être une fonction attend deux paramètres d'entrée; index: Int
, unparsedLine: String
. Et ensuite retourner un Option[List[String]]
. La fonction peut retourner un Some
enveloppé List[String]
composé des valeurs de colonne valides. Ou il peut retourner un None
qui indique que tout le processus de streaming est en train d'avorter. Si ce paramètre n'est pas fourni, une valeur par défaut de (index, line) => Some(List(line))
est fournie. Par défaut, la ligne entière est retournée sous la forme d'une seule valeur String
.
le troisième paramètre, filterLine: (Int, List[String]) => Option[Boolean]
, est facultatif. Et si, il doit être une fonction attend deux paramètres d'entrée; index: Int
, parsedValues: List[String]
. Et ensuite retourner un Option[Boolean]
. La fonction peut renvoyer un Some
enveloppé Boolean
indiquant si cette ligne particulière doit être incluse dans la sortie. Ou il peut retourner un None
qui indique que tout le processus de streaming est en train d'avorter. Si ce paramètre n'est pas fourni, une valeur par défaut de (index, values) => Some(true)
est fournie. Par défaut, toutes les lignes sont incluses.
Le quatrième et dernier paramètre, retainValues: (Int, List[String]) => Option[List[String]]
, est facultative. Et si, il doit être une fonction attend deux paramètres d'entrée; index: Int
, parsedValues: List[String]
. Et ensuite retourner un Option[List[String]]
. La fonction peut retourner un Some
enveloppé List[String]
consistant en un sous-ensemble et/ou une modification des valeurs de colonne existantes. Ou il peut retourner un None
qui indique la totalité de la diffusion processus de l'abandon précoce. Si ce paramètre n'est pas fourni, une valeur par défaut de (index, values) => Some(values)
est fournie. Par défaut, il en résulte les valeurs interprétées par le second paramètre, parseLine
.
Envisager un fichier avec le contenu suivant (4 lignes):
street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240
le profil d'appel suivant...
val tryLinesDefaults =
tryProcessSource(new File("path/to/file.csv"))
...résultats dans cette sortie pour tryLinesDefaults
(le contenu non modifié de le dossier):
Success(
List(
List("street,street2,city,state,zip"),
List("100 Main Str,,Irving,TX,75039"),
List("231 Park Ave,,Irving,TX,75039"),
List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
)
)
le profil d'appel suivant...
val tryLinesParseOnly =
tryProcessSource(
new File("path/to/file.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
)
...résultats dans cette sortie pour tryLinesParseOnly
(chaque ligne divisée en valeurs de colonne individuelles):
Success(
List(
List("street","street2","city","state","zip"),
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
)
)
le profil d'appel suivant...
val tryLinesIrvingTxNoHeader =
tryProcessSource(
new File("C:/Users/Jim/Desktop/test.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
, filterLine =
(index, parsedValues) =>
Some(
(index != 0) && //skip header line
(parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
(parsedValues(3).toLowerCase == "Tx".toLowerCase)
)
)
...résultats dans cette sortie pour tryLinesIrvingTxNoHeader
(chaque ligne divisée dans la colonne individuelle valeurs, aucun en-tête et seulement les deux lignes dans Irving, Tx):
Success(
List(
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
)
)
Voici la totalité tryProcessSource
mise en œuvre de la fonction:
import scala.io.Source
import scala.util.Try
import java.io.File
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
try {Try(transfer(source))} finally {source.close()}
def recursive(
remaining: Iterator[(String, Int)],
accumulator: List[List[String]],
isEarlyAbort: Boolean =
false
): List[List[String]] = {
if (isEarlyAbort || !remaining.hasNext)
accumulator
else {
val (line, index) =
remaining.next
parseLine(index, line) match {
case Some(values) =>
filterLine(index, values) match {
case Some(keep) =>
if (keep)
retainValues(index, values) match {
case Some(valuesNew) =>
recursive(remaining, valuesNew :: accumulator) //capture values
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
else
recursive(remaining, accumulator) //discard row
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
}
}
Try(Source.fromFile(file)).flatMap(
bufferedSource =>
usingSource(bufferedSource) {
source =>
recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
}
)
}
bien que cette solution soit relativement succincte, il m'a fallu beaucoup de temps et de nombreux passages de remaniement avant que je puisse finalement me rendre ici. S'il vous plaît, faites-moi savoir si vous voyez des moyens de l'améliorer.
mise à JOUR: Je viens de poser la question ci-dessous comme c'est propre question de débordement des piles . Et il maintenant a une réponse fixant l'erreur mentionnée ci-dessous.
j'ai eu l'idée d'essayer de le rendre encore plus générique en changeant le paramètre retainValues
en transformLine
avec la nouvelle définition de la fonction générique ci-dessous. Cependant, je continue à obtenir l'erreur de surbrillance dans IntelliJ " Expression de type Some [List [String]] ne se conforme pas à expected type Option[A] " et n'a pas été en mesure de comprendre comment changer la valeur par défaut afin que l'erreur disparaisse.
def tryProcessSource2[A <: AnyRef](
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
transformLine: (Int, List[String]) => Option[A] =
(index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
???
}
Toute l'aide sur la façon de faire ce travail serait grandement apprécié.