Spark Context Textfile: charger plusieurs fichiers

j'ai besoin de traiter plusieurs fichiers dispersés sur différents annuaires. Je voudrais charger tout cela dans un RDD simple et puis effectuer la carte/réduire sur elle. Je vois que SparkContext est capable de charger plusieurs fichiers à partir d'un seul répertoire en utilisant des caractères génériques. Je ne sais pas comment charger des fichiers à partir de plusieurs dossiers.

L'extrait de code suivant échoue:

for fileEntry in files:
    fileName = basePath + "/" + fileEntry
    lines = sc.textFile(fileName)
    if retval == None:
        retval = lines
    else:
        retval = sc.union(retval, lines)

Cela échoue sur la troisième boucle avec le message d'erreur suivant:

retval = sc.union(retval, lines)
TypeError: union() takes exactly 2 arguments (3 given)

ce Qui est bizarre étant donné que je ne fournis que deux arguments. Tous les conseils sont appréciés.

22
demandé sur Mike Müller 2014-05-01 01:00:17

4 réponses

pourquoi pas cette phrase à la place?

sc.union([sc.textFile(basepath + "/" + f) for f in files])

En Scala SparkContext.union() a deux variantes, l'une qui prend les arguments vararg, et l'autre qui prend une liste. Seul le second existe en Python (puisque Python n'a pas de polymorphisme).

UPDATE

Vous pouvez utiliser un seul textFile appeler pour lire plusieurs fichiers.

sc.textFile(','.join(files))
40
répondu Daniel Darabos 2017-03-17 17:19:45

Je résous des problèmes similaires en utilisant le Joker.

par exemple, j'ai trouvé certains caractères dans les fichiers que je veux charger dans spark,

dir

subdir1/folder1/x.txt

subdir2/folder2/y.txt

vous pouvez utiliser la phrase suivante

sc.textFile("dir/*/*/*.txt")

pour charger tous les fichiers relatifs.

Le Joker ' * ' ne fonctionne que dans un répertoire de niveau unique, ce qui n'est pas récursif.

13
répondu fibonacci 2015-06-17 09:33:36

Vous pouvez utiliser la fonction suivante de SparkContext:

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

lire un répertoire de fichiers texte à partir de HDFS, un système de fichiers local (disponible sur tous les noeuds), ou N'importe quel URI de système de fichiers soutenu par Hadoop. Chaque fichier est lu comme un seul enregistrement et retourné dans une paire clé-valeur, où la clé est le chemin d'accès de chaque fichier, la valeur est le contenu de chaque fichier.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

2
répondu Neil 2015-12-12 03:30:00

Vous pouvez utiliser ce

d'Abord, Vous pouvez obtenir un Tampon/Liste de S3 Chemins :

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

passez maintenant cet objet List au morceau de code suivant, note : sc est un objet de SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Maintenant vous avez un RDD Final unifié i.e. df

en Option, Et Vous pouvez également repartitionner en un seul BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Repartitionnement fonctionne toujours :D

1
répondu Murtaza Kanchwala 2015-08-20 11:57:33