Comment fonctionne le modèle de disrupteur de LMAX?
j'essaie de comprendre le disruptor pattern . J'ai regardé la vidéo D'InfoQ et j'ai essayé de lire leur journal. Je comprends qu'il y a un tampon annulaire impliqué, qu'il est initialisé comme un tableau extrêmement grand pour profiter de la localisation de cache, éliminer l'allocation de nouvelle mémoire.
on dirait qu'il y a un ou plusieurs entiers atomiques qui gardent la trace des positions. Chaque "événement" semble obtenir un id unique et sa position dans le l'anneau est trouvée par trouver son module par rapport à la taille de l'anneau, etc., etc.
malheureusement, je n'ai pas un sens intuitif de la façon dont cela fonctionne. J'ai fait de nombreuses applications commerciales et étudié le acteur modèle , examiné SEDA, etc.
dans leur présentation, ils ont mentionné que ce modèle est essentiellement la façon dont les routeurs fonctionnent; cependant, je n'ai pas trouvé de bonnes descriptions de la façon dont les routeurs fonctionnent non plus.
y a-t-il de bons indices pour une meilleure explication?
5 réponses
Le Google Code du projet n' référence à un document technique sur la mise en œuvre de l'anneau de la mémoire tampon, mais il est un peu sec, universitaire et difficile pour quelqu'un qui veut apprendre comment il fonctionne. Cependant, il ya quelques billets de blog qui ont commencé à expliquer les internes d'une manière plus lisible. Il y a une explication de tampon de bague qui est le cœur du modèle de disrupteur, une description des barrières de consommation (la partie liée à la lecture du disrupteur) et certains informations sur la manipulation de plusieurs producteurs disponibles.
la description la plus simple du Disrupteur est: c'est une façon d'envoyer des messages entre les threads de la manière la plus efficace possible. Il peut être utilisé comme une alternative à une file d'attente, mais il partage également un certain nombre de fonctionnalités avec SEDA et les acteurs.
comparé aux Files d'attente:
le disrupteur fournit la capacité de passer un message sur un autre fil, en le réveillant si nécessaire (semblable à une queue bloquante). Toutefois, il y a trois différences distinctes.
- l'utilisateur du Disrupteur définit comment les messages sont stockés en étendant la classe D'entrée et en fournissant une usine pour faire la préallocation. Cela permet la réutilisation de la mémoire (copie) ou l'Entrée peut contenir une référence à un autre objet.
- mettre des messages dans le disrupteur est un processus en 2 phases, d'abord un slot est réclamé dans le tampon d'anneau, qui fournit à l'utilisateur l'entrée qui peut être remplie avec les données appropriées. Ensuite, l'entrée doit être engagée, cette approche en 2 phases est nécessaire pour permettre l'utilisation souple de la mémoire mentionnée ci-dessus. C'est le commit qui rend le message visible aux fils de consommation.
- il est de la responsabilité du consommateur de tenir compte des messages qui ont été consommées par le tampon annulaire. Déplacer cette responsabilité loin du tampon de bague lui-même a aidé à réduire la quantité de contestation d'écriture que chaque fil maintient son propre compteur.
comparé aux acteurs
le modèle acteur est plus proche du Disrupteur que la plupart des autres modèles de programmation, surtout si vous utilisez les classes BatchConsumer/BatchHandler qui sont fournies. Ces classes cachent toutes les complexité de la conservation des numéros de séquence consommés et de fournir un ensemble de rappels simples lorsque des événements importants se produisent. Cependant, il ya quelques différences subtiles.
- Le Perturbateur utilise un 1 fil - 1 modèle de consommation, où les Acteurs utilisent un N:M modèle c'est à dire que vous pouvez avoir autant d'acteurs que vous voulez et ils seront distribués à l'échelle d'un fixe nombre de threads (généralement 1 par cœur).
- L'interface BatchHandler fournit un rappel supplémentaire (et très important)
onEndOfBatch()
. Cela permet aux consommateurs lents, par exemple ceux qui font de l'E/S pour grouper les événements afin d'améliorer le débit. Il est possible de faire batching dans d'autres cadres D'acteurs, mais comme presque tous les autres cadres ne fournissent pas un rappel à la fin du lot, vous devez utiliser un temps d'arrêt pour déterminer la fin du lot, résultant en une faible latence.
comparé à SEDA
LMAX construit le modèle de disrupteur pour remplacer une approche basée SEDA.
- la principale amélioration qu'il a apportée par rapport à la SEDA était la capacité de travailler en parallèle. Pour ce faire, le disrupteur prend en charge le multi-casting des mêmes messages (dans le même ordre) à plusieurs consommateurs. Cela évite la nécessité d'étapes de fourche dans le pipeline.
- nous permettons également aux consommateurs d'attendre les résultats des autres consommateurs sans avoir à mettre un autre de files d'attente stade de entre eux. Un consommateur peut simplement regarder le numéro de séquence d'un consommateur qu'il est tributaire. Cela évite la nécessité d'étapes de jointure dans le pipeline.
comparé aux barrières mémoire
une autre façon de penser à ce sujet est comme une barrière de mémoire structurée et ordonnée. Où la barrière du producteur forme la barrière de l'écriture et la barrière du consommateur est la barrière de la lecture.
nous aimerions D'abord comprendre le modèle de programmation qu'il offre.
il y a un ou plusieurs écrivains. Il y a un ou plusieurs lecteurs. Il y a une ligne d'entrées, totalement ordonnées de l'ancien au nouveau (de gauche à droite). Les rédacteurs peuvent ajouter de nouvelles entrées à droite. Chaque lecteur lit les entrées de gauche à droite. Les lecteurs ne peuvent pas lire les écrivains du passé, évidemment.
il n'y a pas de notion de suppression d'entrée. J'utilise "lecteur" au lieu de "consommateurs" pour éviter l'image des entrées de consommation. Cependant nous comprenons que les entrées à gauche du dernier lecteur deviennent inutiles.
généralement les lecteurs peuvent lire simultanément et indépendamment. Cependant, nous pouvons déclarer des dépendances parmi les lecteurs. Les dépendances du lecteur peuvent être un graphique acyclique arbitraire. Si le lecteur B dépend de lecteur, un lecteur de B ne peut pas lire au-delà de lecteur A.
la dépendance du Lecteur se produit parce que le lecteur a peut annoter une entrée, et le lecteur B dépend de cette annotation. Par exemple, A fait un certain calcul sur une entrée, et stocke le résultat dans le champ a
dans l'entrée. A puis passer à autre chose, Et maintenant B peut lire l'entrée, et la valeur de a
a stocké. Si le lecteur C ne dépend pas de A, C ne doit pas essayer de lire a
.
il s'agit en effet d'un modèle de programmation intéressant. Quelle que soit la performance, le modèle seul peut bénéficier de nombreuses applications.
de bien sûr, le principal objectif de LMAX est la performance. Il utilise un anneau pré-alloué d'entrées. L'anneau est assez grand, mais il est limité de sorte que le système ne sera pas chargé au-delà de la capacité de conception. Si l'anneau est plein, l'écrivain attendra que les lecteurs les plus lents avancent et fassent de la place.
les objets D'entrée sont pré-alloués et vivent pour toujours, afin de réduire les coûts de collecte des ordures. Nous n'insérons pas de nouveaux objets d'entrée ou supprimons de vieux objets d'entrée, à la place, un écrivain demande un pré-existant entrée, remplir ses champs et prévenir les lecteurs. Cette action apparente en deux phases est en fait simplement une action atomique
setNewEntry(EntryPopulator);
interface EntryPopulator{ void populate(Entry existingEntry); }
pré-allocation des entrées signifie également les entrées adjacentes (très probablement) localisez dans les cellules mémoire adjacentes, et parce que les lecteurs lisent les entrées séquentiellement, ceci est important d'utiliser les caches CPU.
et beaucoup d'efforts pour éviter le verrouillage, CAS, même la barrière de mémoire (par exemple, utilisez une variable de séquence non volatile s'il n'y a qu'un seul écrivain)
pour les développeurs de lecteurs: les différents lecteurs annotant devraient écrire dans des champs différents, pour éviter d'écrire discorde. (En fait, ils devraient écrire sur différentes lignes de cache.) Un lecteur annotant ne doit pas toucher quoi que ce soit que d'autres lecteurs non dépendants peuvent lire. C'est pourquoi je dis ces lecteurs annoter entrées, au lieu de modifier entrées.
Martin Fowler a écrit un article sur LMAX et le modèle de disrupteur, L'Architecture LMAX , qui peut clarifier davantage.
j'ai effectivement pris le temps d'étudier la source, par pure curiosité, et l'idée derrière cela est assez simple. La version la plus récente au moment de la rédaction de ce post est 3.2.1.
il y a une mémoire tampon stockant les événements pré-alloués qui retiendra les données pour les consommateurs à lire.
le buffer est soutenu par un tableau de drapeaux (tableau entier) de sa longueur qui décrit la disponibilité des fentes de buffer (voir plus loin pour plus de détails). Le tableau est accédé comme un java#AtomicIntegerArray, donc pour le but de cette explication vous pouvez aussi bien supposer qu'il est un.
Il peut y avoir n'importe quel nombre de producteurs. Quand le producteur veut écrire dans le tampon, un long numéro est généré (comme dans l'appel à un#getand incrément atomiclong, le disrupteur utilise en fait sa propre implémentation, mais il fonctionne de la même manière). Appelons cela produit depuis longtemps un producerCallId. De la même manière, un consumerCallId est généré lorsqu'un consommateur termine la lecture d'une fente à partir d'un tampon. Le plus récent consumerCallId est accessible.
(S'il y a beaucoup de consommateurs, l'appel avec le plus bas id est choisi.)
ces codes sont ensuite comparés, et si la différence entre les deux est moindre que le côté tampon, le producteur est autorisé à écrire.
(si le producerCallId est plus grand que le consumerCallId + bufferSize récent, cela signifie que le buffer est complète, et le producteur est obligé de bus-attendre jusqu'à ce qu'une place devient disponible.)
le producteur se voit alors attribuer le slot dans le buffer sur la base de son Callide (qui est un bufferSize prducercallid modulo, mais puisque le bufferSize est toujours une puissance de 2 (limite imposée sur la création de buffer), l'opération actuelle utilisée est producerCallId & (bufferSize - 1)). Il est alors libre de modifier l'événement dans cette fente.
(L'algorithme est un peu plus compliqué, impliquant la mise en cache de consumerId récent dans une référence atomique séparée, pour une optimisation.)
Lorsque l'événement a été modifié, le changement est "publié". Lors de la publication de la fente respective dans le tableau de drapeau est rempli avec le drapeau mis à jour. La valeur de drapeau est le nombre de la boucle (producerCallId divisé par bufferSize).
de la même manière, il peut y avoir n'importe quel nombre de consommateur. Chaque fois qu'un consommateur veut accéder au tampon, un consumerCallId est généré (en fonction de la façon dont les consommateurs ont été ajoutés au disrupteur, l'atome utilisé dans la génération d'id peut être partagé ou séparé pour chacun d'eux). Ce consumercallide est ensuite comparé au plus récent producentcallide, et s'il est moindre des deux, le lecteur est autorisé à progresser.
(de même si le producerCallId est même au consumerCallId, cela signifie que le tampon est vide et le consommateur est obligé d'attendre. La manière d'attendre est définie par une stratégie D'attente lors de la création du disrupteur.)
pour les consommateurs individuels (ceux qui ont leur propre générateur d'id), la prochaine chose vérifiée est la capacité de consommer par lot. Les fentes dans le tampon sont examinées dans l'ordre de celle du consumerCallId (l'indice est déterminé de la même manière que pour les producteurs), à celle du producerCallId récent.
ils sont examinés dans une boucle en comparant la valeur de drapeau écrite dans le tableau de drapeau, par rapport à une valeur de drapeau générée pour le consumerCallId. Si les drapeaux correspondent, cela signifie que les producteurs qui remplissent les fentes ont engagé leurs changements. Si ce n'est pas le cas, la boucle est rompue, et le changeId le plus élevé est retourné. Les fentes de ConsumerCallId à reçu dans changeId peuvent être consommées en fournée.
si un groupe de consommateurs lisent ensemble( ceux avec le générateur d'id partagé), chacun un seul prend un Callide simple, et seule la fente pour ce Callide simple est vérifiée et retournée.
à Partir de cet article :
le modèle de disrupteur est une file d'attente de batching soutenue par une circulaire tableau (c.-à-d. le tampon de cycle) rempli de transfert préaffecté les objets qui utilisent des barrières de mémoire pour synchroniser les producteurs et les consommateurs à travers les séquences.
de la Mémoire-les obstacles sont un peu difficile à expliquer et Trisha du blog a fait la meilleure tentative à mon avis avec ce post: http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast.html
mais si vous ne voulez pas plonger dans les détails de bas niveau, vous pouvez juste savoir que les barrières de mémoire en Java sont mis en œuvre par le mot-clé volatile
ou par le java.util.concurrent.AtomicLong
. Les séquences de disrupteurs sont AtomicLong
s et sont communiquées de part et d'autre parmi les producteurs et les consommateurs par des barrières mémoire au lieu de serrures.
je trouve qu'il est plus facile de comprendre un concept à travers le code, donc le code ci-dessous est un simple helloworld de CoralQueue , qui est une implémentation de modèle de disrupteur faite par les blocs Coraux avec lesquels je suis affilié. Dans le code ci-dessous, vous pouvez voir comment le modèle de disrupteur met en œuvre batching et comment le ring-buffer (i.e. circular array) permet une communication sans ordures entre deux fils:
package com.coralblocks.coralqueue.sample.queue;
import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;
public class Sample {
public static void main(String[] args) throws InterruptedException {
final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);
Thread consumer = new Thread() {
@Override
public void run() {
boolean running = true;
while(running) {
long avail;
while((avail = queue.availableToPoll()) == 0); // busy spin
for(int i = 0; i < avail; i++) {
MutableLong ml = queue.poll();
if (ml.get() == -1) {
running = false;
} else {
System.out.println(ml.get());
}
}
queue.donePolling();
}
}
};
consumer.start();
MutableLong ml;
for(int i = 0; i < 10; i++) {
while((ml = queue.nextToDispatch()) == null); // busy spin
ml.set(System.nanoTime());
queue.flush();
}
// send a message to stop consumer...
while((ml = queue.nextToDispatch()) == null); // busy spin
ml.set(-1);
queue.flush();
consumer.join(); // wait for the consumer thread to die...
}
}