Comment écouter les modifications d'une collection MongoDB?

je crée une sorte de système de file d'attente de travail avec MongoDB comme Data store. Comment puis-je "écouter" les inserts à une collection MongoDB avant de frayer travailleurs pour traiter le travail? Ai-je besoin de sonder toutes les quelques secondes pour voir s'il y a des changements par rapport à la dernière fois, ou y a-t-il un moyen pour mon script d'attendre que les inserts se produisent? C'est un projet PHP sur lequel je travaille, mais n'hésitez pas à répondre en Ruby ou en langage agnostique.

161
demandé sur Andrew 2012-03-14 00:09:59

9 réponses

ce à quoi vous pensez ressemble beaucoup à des déclencheurs. MongoDB n'a pas de support pour les déclencheurs, cependant certaines personnes ont "roulé leur propre" en utilisant quelques trucs. La clé ici est la oplog.

lorsque vous exécutez MongoDB dans un jeu de répliques, toutes les actions MongoDB sont enregistrées dans un journal des opérations (appelé oplog). Le oplog est fondamentalement juste une liste des modifications apportées aux données. Les jeux de répliques fonctionnent en écoutant les changements sur cet oplog et puis appliquer les modifications locales.

ça vous dit quelque chose?

Je ne peux pas détailler l'ensemble du processus ici, il s'agit de plusieurs pages de documentation, mais les outils dont vous avez besoin sont disponibles.

D'abord quelques notes sur l'oplog - brève description - Layout de la local collection (qui contient l'oplog)

You va également vouloir tirer parti tailable cursors . Ceux-ci vous fourniront un moyen d'écouter les changements au lieu de les rechercher. Notez que la réplication utilise des curseurs personnalisables, il s'agit donc d'une fonctionnalité supportée.

103
répondu Gates VP 2017-03-19 05:33:05

MongoDB a ce qu'on appelle capped collections et tailable cursors qui permet à MongoDB de pousser des données aux auditeurs.

Un capped collection est essentiellement une collection qui est d'une taille fixe et ne permet que des insertions. Voici à quoi ça ressemblerait d'en créer un:

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB Tailable curseurs ( original post par Jonathan H. Salaire )

Ruby

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python (by Robert Stewart)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl (par Max )

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Ressources Supplémentaires:

Ruby/Node.js tutoriel qui vous guide à travers la création d'une application qui écoute les inserts dans une collection MongoDB capped.

un article parlant plus en détail des curseurs modulables.

PHP, Ruby, Python, Perl et des exemples d'utilisation de tailable curseurs.

89
répondu Andrew 2016-02-05 23:33:21

depuis MongoDB 3.6 il y aura une nouvelle API de notifications appelée Change Streams que vous pouvez utiliser pour cela. Voir ce billet de blog pour un exemple . Exemple:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])
28
répondu Mitar 2018-01-12 16:04:23

découvrez ce: Changement de Flux

10 Janvier 2018 - Version 3.6

*EDIT: j'ai écrit un article sur la façon de faire ce https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


c'est nouveau dans mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6 / 2018/01/10

$ mongod --version
db version v3.6.2

afin d'utiliser changeStreams la base de données doit être un "1519190920 de la Réplication" Set

plus d'informations sur les jeux de réplication: https://docs.mongodb.com/manual/replication/

votre base de données sera une " autonome " par défaut.

comment convertir Un autonome en une réplique: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


le suivant exemple est une application pratique pour la façon dont vous pourriez utiliser ceci.

* Spécifiquement pour le Nœud.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

Liens utiles:

https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set

https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example

http://plusnconsulting.com/post/MongoDB-Change-Streams

16
répondu Rio Weber 2018-03-28 19:56:59

MongoDB version 3.6 inclut maintenant des flux de changement qui est essentiellement une API au-dessus de L'OpLog permettant des cas d'utilisation de type trigger/notification.

Voici un lien vers un exemple Java: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams /

un exemple NodeJS pourrait ressembler à quelque chose comme:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });
13
répondu Robert Walters 2017-12-17 12:35:04

alternativement, vous pouvez utiliser la méthode Mongo FindAndUpdate standard, et dans le callback, lancer un événement EventEmitter (dans le noeud) lorsque le callback est lancé.

toute autre partie de l'application ou de l'architecture écoutant cet événement sera informée de la mise à jour, et toute donnée pertinente envoyée là aussi. C'est un moyen très simple d'obtenir des notifications de Mongo.

3
répondu Alex 2015-08-03 12:12:28

il y a un exemple de travail en java qui peut être trouvé ici .

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

la clé est options de requête donnée ici.

vous pouvez aussi changer la requête de find, si vous n'avez pas besoin de charger toutes les données à chaque fois.

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
1
répondu Maleen Abewardana 2016-08-24 13:08:47

en fait, au lieu de regarder la sortie, pourquoi vous ne recevez pas d'avis quand quelque chose de nouveau est inséré en utilisant middle-ware qui a été fourni par schéma Mangoose

vous pouvez attraper l'événement d'insérer un nouveau document et faire quelque chose après cette insertion fait

1
répondu Duong Nguyen 2016-11-08 04:09:57

beaucoup de ces réponses ne vous donneront que de nouveaux enregistrements et non des mises à jour et/ou sont extrêmement inefficaces

la seule façon fiable et performante de le faire est de créer un curseur adaptable sur le db Local: oplog.rs collection pour obtenir tous les changements à MongoDB et en faire ce que vous voulez. (MongoDB fait même cela en interne plus ou moins pour soutenir la réplication!)

explication de ce que contient l'oplog: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

exemple de Noeud.bibliothèque js qui fournit une API autour de ce qui est disponible pour être fait avec l'oplog: https://github.com/cayasso/mongo-oplog

1
répondu John Culviner 2017-05-25 19:26:01