Comment puis-je itérer les RDD dans apache spark (scala)

j'utilise la commande suivante pour remplir un RDD avec un tas de tableaux contenant 2 chaînes ["filename", "content"].

maintenant je veux itérer sur chacune de ces occurrences pour faire quelque chose avec chaque nom de fichier et le contenu.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")

il semble que je ne trouve aucune documentation sur la façon de faire ceci cependant.

alors ce que je veux c'est ceci:

foreach occurrence-in-the-rdd{
   //do stuff with the array found on loccation n of the RDD
} 
19
demandé sur Shubham Mittal 2014-09-18 18:00:14

5 réponses

les opérations fondamentales dans Spark sont map et filter.

val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }

txtRDD va maintenant contenir uniquement les fichiers qui ont l'extension ".txt"

Et si vous voulez word count ces fichiers, vous pouvez dire

//split the documents into words in one long list
val words = txtRDD flatMap { case (id,text) => text.split("\s+") }
// give each word a count of 1
val wordT = words map (x => (x,1))  
//sum up the counts for each word
val wordCount = wordsT reduceByKey((a, b) => a + b)

vous voulez utiliser mapPartitions lorsque vous avez une initialisation coûteuse que vous devez effectuer -- par exemple, si vous voulez faire la reconnaissance D'entité nommée avec une bibliothèque comme les outils Stanford coreNLP.

Maître map, filter,flatMap et reduce, et vous êtes en bonne voie de maîtriser Spark.

7
répondu David 2014-09-26 20:12:47

vous appelez diverses méthodes sur le RDD qui acceptent les fonctions comme paramètres.

// set up an example -- an RDD of arrays
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)

// Print the RDD of arrays.
testRDD.collect().foreach(a => println(a.size))

// Use map() to create an RDD with the array sizes.
val countRDD = testRDD.map(a => a.size)

// Print the elements of this new RDD.
countRDD.collect().foreach(a => println(a))

// Use filter() to create an RDD with just the longer arrays.
val bigRDD = testRDD.filter(a => a.size > 3)

// Print each remaining array.
bigRDD.collect().foreach(a => {
    a.foreach(e => print(e + " "))
    println()
  })
}

notez que les fonctions que vous écrivez acceptent un seul élément RDD comme entrée, et renvoient des données d'un type uniforme, donc vous créez un RDD de ce dernier type. Par exemple, countRDD est un RDD[Int] tandis que bigRDD encore RDD[Array[Int]].

Il sera probablement tentant à un certain point pour écrire un foreach qui modifie certaines autres données, mais il faut résister pour les raisons décrites dans cette question et réponse.

Edit: N'essayez pas d'imprimer de grandes RDD s

plusieurs lecteurs ont demandé à utiliser collect() et println() voir leurs résultats, comme dans l'exemple ci-dessus. Bien sûr, cela ne fonctionne que si vous utilisez un mode interactif comme le Spark REPL (read-eval-print-loop. Il vaut mieux appeler collect() sur le RDD pour obtenir un tableau séquentiel pour l'impression ordonnée. Mais collect() peut ramener trop de données et dans tous les cas trop peut être imprimé. Voici quelques autres manières d'obtenir un aperçu de votre RDD s si elles sont grandes:

  1. RDD.take(): Cela vous donne un contrôle précis sur le nombre d'éléments, vous obtenez, mais pas d'où ils viennent-défini comme le "premier" qui est un concept traitées par diverses autres questions et réponses ici.

    // take() returns an Array so no need to collect()
    myHugeRDD.take(20).foreach(a => println(a))
    
  2. RDD.sample(): cela vous permet de contrôler (grossièrement) la fraction de résultats que vous obtenez, si l'échantillonnage utilise le remplacement, et même en option la graine de nombre aléatoire.

    // sample() does return an RDD so you may still want to collect()
    myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
    
  3. RDD.takeSample(): c'est un hybride: en utilisant un échantillonnage aléatoire que vous pouvez contrôler, mais à la fois vous permettant de spécifier le nombre exact de résultats et de retourner un Array.

    // takeSample() returns an Array so no need to collect() 
    myHugeRDD.takeSample(true, 20).foreach(a => println(a))
    
  4. RDD.count(): parfois, la meilleure idée vient du nombre d'éléments avec lesquels vous avez fini -- je le fais souvent en premier.

    println(myHugeRDD.count())       
    
25
répondu Spiro Michaylov 2017-08-15 15:35:15

j'essaierais d'utiliser une fonction de mappage de partition. Le code ci-dessous montre comment un ensemble de données RDD peut être traité dans une boucle de sorte que chaque entrée passe par la même fonction. J'ai bien peur de ne pas être au courant de Scala, donc tout ce que j'ai à offrir est java code. Cependant, il ne devrait pas être très difficile de le traduire en scala.

JavaRDD<String> res = file.mapPartitions(new FlatMapFunction <Iterator<String> ,String>(){ 
      @Override
      public Iterable<String> call(Iterator <String> t) throws Exception {  

          ArrayList<String[]> tmpRes = new ArrayList <>();
          String[] fillData = new String[2];

          fillData[0] = "filename";
          fillData[1] = "content";

          while(t.hasNext()){
               tmpRes.add(fillData);  
          }

          return Arrays.asList(tmpRes);
      }

}).cache();
2
répondu Mikel Urkia 2014-09-18 15:16:43

ce wholeTextFiles retour est une Paire de RDD:

def wholeTextFiles(chemin d'accès: String, minPartitions: Int): CA[(String, String)]

lire un répertoire de fichiers texte à partir de HDFS, un système de fichiers local (disponible sur tous les noeuds), ou N'importe quel URI de système de fichiers soutenu par Hadoop. Chaque fichier est lu comme un seul enregistrement et retourné dans une paire clé-valeur, où la clé est le chemin d'accès de chaque fichier, la valeur est le contenu de chaque fichier.

Voici un exemple de lire les fichiers à un chemin local puis d'imprimer chaque nom de fichier et le contenu.

val conf = new SparkConf().setAppName("scala-test").setMaster("local")
val sc = new SparkContext(conf)
sc.wholeTextFiles("file:///Users/leon/Documents/test/")
  .collect
  .foreach(t => println(t._1 + ":" + t._2));

le résultat:

file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12}

file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22}

file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18}

ou convertir la paire RDD en RDD d'abord

sc.wholeTextFiles("file:///Users/leon/Documents/test/")
  .map(t => t._2)
  .collect
  .foreach { x => println(x)}

le résultat:

{"name":"tom","age":12}

{"name":"john","age":22}

{"name":"leon","age":18}

Et je pense que wholeTextFiles est plus conforme pour les petits fichiers.

1
répondu leon 2016-09-05 05:31:52
for (element <- YourRDD)
{
// do what you want with element in each iteration, and if you want the index of element,  simply use a counter variable in this loop begining from 0 

Println (élément._1) / / ceci imprimera tous les noms de fichier }

-1
répondu AliSafari186 2018-06-15 08:31:12