Spark RDD - comment fonctionnent-ils
J'ai un petit programme Scala qui fonctionne bien sur un seul nœud. Cependant, je le redimensionne pour qu'il fonctionne sur plusieurs nœuds. C'est ma première tentative de ce genre. J'essaie juste de comprendre comment les RDDs fonctionnent dans Spark, donc cette question est basée sur la théorie et peut ne pas être correcte à 100%.
Disons que je crée un RDD:
val rdd = sc.textFile(file)
Maintenant, une fois que je l'ai fait, cela signifie que le fichier file
est maintenant partitionné entre les nœuds (en supposant que tous les nœuds ont accès au fichier chemin d'accès)?
Deuxièmement, je veux compter le nombre d'objets dans le RDD (assez simple), cependant, j'ai besoin d'utiliser ce nombre dans un calcul qui doit être appliqué aux objets dans le RDD-un exemple de pseudocode:
rdd.map(x => x / rdd.size)
Disons qu'il y a 100 objets dans rdd
, et disons qu'il y a 10 nœuds, donc un nombre de 10 objets par nœud (en supposant que c'est ainsi que fonctionne le concept RDD), maintenant quand j'appelle la méthode est-ce que chaque nœud va effectuer le calcul avec rdd.size
comme 10
ou 100
? Parce que, dans l'ensemble, le RDD est de taille 100
mais localement sur chaque nœud, il est seulement 10
. Suis-je tenu de faire une variable de diffusion avant de faire le calcul? Cette question est liée à la question ci-dessous.
Enfin, si je fais une transformation vers le RDD, par exemple rdd.map(_.split("-"))
, et que je voulais le nouveau size
du RDD, dois-je effectuer une action sur le RDD, comme count()
, donc toutes les informations sont renvoyées au nœud du pilote?
2 réponses
Habituellement, le fichier (ou des parties du fichier, s'il est trop gros) est répliqué sur N nœuds du cluster (par défaut N=3 sur HDFS). Ce n'est pas une intention de diviser chaque fichier entre tous les nœuds disponibles.
Cependant, pour vous (IE le client) travailler avec le fichier en utilisant Spark devrait être transparent - vous ne devriez pas voir de différence dans rdd.size
, peu importe le nombre de nœuds qu'il est divisé et/ou répliqué. Il existe des méthodes (au moins, dans Hadoop) pour savoir sur quels nœuds (parties des) fichier peut être situé au moment. Cependant, dans les cas simples, vous n'aurez probablement pas besoin d'utiliser cette fonctionnalité.
UPDATE: un article décrivant les internes de RDD: https://cs.stanford.edu/ ~ matei / papers / 2012 / nsdi_spark. pdf
val rdd = sc.textFile(file)
Cela signifie-t-il que le fichier est maintenant partitionné sur les nœuds?
Le fichier reste partout où il était. Les éléments du résultat RDD[String]
sont les lignes du fichier. Le RDD est partitionné pour correspondre au partitionnement naturel du système de fichiers sous-jacent. Le nombre de partitions ne dépend pas du nombre de nœuds.
, Il est important de comprendre que lorsque cette ligne est exécutée, elle ne lire le fichier(s). Le RDD est un objet paresseux et ne fera quelque chose quand il le doit. C'est génial car cela évite l'utilisation inutile de la mémoire.
Par exemple, si vous écrivez val errors = rdd.filter(line => line.startsWith("error"))
, rien ne se passe toujours. Si vous écrivez alors val errorCount = errors.count
Maintenant votre séquence d'opérations devra être exécutée car le résultat de count
est un entier. Ce que chaque noyau de travail (thread d'exécuteur) fera en parallèle, c'est lire un fichier (ou un morceau de fichier), parcourir ses lignes et compter les lignes commençant par "erreur". Mise en mémoire tampon et GC mis à part, une seule ligne par cœur sera en mémoire à la fois. Cela permet de travailler avec de très grandes données sans utiliser beaucoup de mémoire.
Je veux compter le nombre d'objets dans le RDD, cependant, j'ai besoin d'utiliser ce nombre dans un calcul qui doit être appliqué aux objets dans le RDD - un exemple de pseudocode:
rdd.map(x => x / rdd.size)
Il n'y a pas de méthode rdd.size
. Il y a rdd.count
, qui compte le nombre d'éléments dans le RDD. {[9] } ne fonctionnera pas. Le code va essayer pour envoyer la variable rdd
à tous les travailleurs et échouera avec un NotSerializableException
. Ce que vous pouvez faire est:
val count = rdd.count
val normalized = rdd.map(x => x / count)
Cela fonctionne, parce que count
est un Int
et peut être sérialisé.
Si je fais une transformation vers le RDD, par exemple
rdd.map(_.split("-"))
, et que je voulais la nouvelle taille du RDD, dois-je effectuer une action sur le RDD, telle quecount()
, donc toutes les informations sont renvoyées au nœud du pilote?
map
ne modifie pas le nombre d'éléments. Je ne sais pas ce que vous dire par "taille". Mais oui, vous devez effectuer une action, telle que count
pour obtenir quelque chose du RDD. Vous voyez, aucun travail n'est effectué jusqu'à ce que vous effectuiez une action. (Lorsque vous effectuez count
, seul le nombre par partition sera renvoyé au pilote, bien sûr, pas "toutes les informations".)