Quelle est la signification de "niveau de localité" sur le cluster Spark
Quelle est la signification du titre "niveau de localité" et des données d'état 5 local --> processus local --> nœud local --> rack local --> Tout?
1 réponses
Le niveau de la localité pour autant que je sache indique quel type d'accès aux données a été effectué. Lorsqu'un nœud termine tout son travail et que son processeur devient inactif, Spark peut décider de démarrer une autre tâche en attente nécessitant l'obtention de données à partir d'autres endroits. Donc, idéalement, toutes vos tâches devraient être locales car elles sont associées à une latence d'accès aux données plus faible.
Vous pouvez configurer le temps d'attente avant de passer à d'autres niveaux de localité en utilisant:
spark.locality.wait
Plus d'informations sur le les paramètres peuvent être trouvés dans les documents de Configuration Spark
En ce qui concerne les différents niveaux PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ou tout, je pense que les méthodes findTask et findSpeculativeTask dans org.Apache.étincelle.planificateur.TaskSetManager illustre comment Spark choisit les tâches en fonction de leur niveau de localité. Il vérifiera d'abord les tâches PROCESS_LOCAL qui vont être lancées dans le même processus exécuteur. Si non, il va vérifier pour Les tâches NODE_LOCAL qui peuvent être dans d'autres exécuteurs dans le même nœud ou doivent être récupérées à partir de systèmes comme HDFS, mis en cache, etc. RACK_LOCAL signifie que les données sont dans un autre nœud et doivent donc être transférées avant l'exécution. Et enfin, tout est juste pour prendre n'importe quelle tâche en attente qui peut s'exécuter dans le nœud actuel.
/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
}
}
// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality)
}