Spark Context Textfile: charger plusieurs fichiers
j'ai besoin de traiter plusieurs fichiers dispersés sur différents annuaires. Je voudrais charger tout cela dans un RDD simple et puis effectuer la carte/réduire sur elle. Je vois que SparkContext est capable de charger plusieurs fichiers à partir d'un seul répertoire en utilisant des caractères génériques. Je ne sais pas comment charger des fichiers à partir de plusieurs dossiers.
L'extrait de code suivant échoue:
for fileEntry in files:
fileName = basePath + "/" + fileEntry
lines = sc.textFile(fileName)
if retval == None:
retval = lines
else:
retval = sc.union(retval, lines)
Cela échoue sur la troisième boucle avec le message d'erreur suivant:
retval = sc.union(retval, lines)
TypeError: union() takes exactly 2 arguments (3 given)
ce Qui est bizarre étant donné que je ne fournis que deux arguments. Tous les conseils sont appréciés.
4 réponses
pourquoi pas cette phrase à la place?
sc.union([sc.textFile(basepath + "/" + f) for f in files])
En Scala SparkContext.union()
a deux variantes, l'une qui prend les arguments vararg, et l'autre qui prend une liste. Seul le second existe en Python (puisque Python n'a pas de polymorphisme).
UPDATE
Vous pouvez utiliser un seul textFile
appeler pour lire plusieurs fichiers.
sc.textFile(','.join(files))
Je résous des problèmes similaires en utilisant le Joker.
par exemple, j'ai trouvé certains caractères dans les fichiers que je veux charger dans spark,
dir
subdir1/folder1/x.txt
subdir2/folder2/y.txt
vous pouvez utiliser la phrase suivante
sc.textFile("dir/*/*/*.txt")
pour charger tous les fichiers relatifs.
Le Joker ' * ' ne fonctionne que dans un répertoire de niveau unique, ce qui n'est pas récursif.
Vous pouvez utiliser la fonction suivante de SparkContext:
wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
lire un répertoire de fichiers texte à partir de HDFS, un système de fichiers local (disponible sur tous les noeuds), ou N'importe quel URI de système de fichiers soutenu par Hadoop. Chaque fichier est lu comme un seul enregistrement et retourné dans une paire clé-valeur, où la clé est le chemin d'accès de chaque fichier, la valeur est le contenu de chaque fichier.
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext
Vous pouvez utiliser ce
d'Abord, Vous pouvez obtenir un Tampon/Liste de S3 Chemins :
import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest
def listFiles(s3_bucket:String, base_prefix : String) = {
var files = new ArrayList[String]
//S3 Client and List Object Request
var s3Client = new AmazonS3Client();
var objectListing: ObjectListing = null;
var listObjectsRequest = new ListObjectsRequest();
//Your S3 Bucket
listObjectsRequest.setBucketName(s3_bucket)
//Your Folder path or Prefix
listObjectsRequest.setPrefix(base_prefix)
//Adding s3:// to the paths and adding to a list
do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());
//Removing Base Directory Name
files.remove(0)
//Creating a Scala List for same
files.asScala
}
passez maintenant cet objet List au morceau de code suivant, note : sc est un objet de SQLContext
var df: DataFrame = null;
for (file <- files) {
val fileDf= sc.textFile(file)
if (df!= null) {
df= df.unionAll(fileDf)
} else {
df= fileDf
}
}
Maintenant vous avez un RDD Final unifié i.e. df
en Option, Et Vous pouvez également repartitionner en un seul BigRDD
val files = sc.textFile(filename, 1).repartition(1)
Repartitionnement fonctionne toujours :D