Comment diffuser les résultats de la requête MongoDB avec nodejs?

J'ai cherché un exemple de la façon dont je peux diffuser le résultat d'une requête MongoDB à un client nodejs. Toutes les solutions que j'ai trouvées jusqu'à présent semblent lire le résultat de la requête à la fois, puis renvoyer le résultat au serveur.

Au Lieu de cela, je voudrais (évidemment) fournir un rappel à la méthode de requête et demander à MongoDB de l'appeler lorsque le morceau suivant du jeu de résultats est disponible.

J'ai regardé Mangouste-devrais-je probablement utiliser un autre pilote?

Jan

25
demandé sur Jan Algermissen 2011-09-10 19:13:18

5 réponses

Le Streaming dans Mongoose est devenu disponible dans la version 2.4.0 qui est apparue trois mois Après avoir posté cette question:

Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);

Des exemples plus élaborés peuvent être trouvés sur leur page de documentation .

25
répondu nab 2012-08-15 20:04:31

node-mongodb-driver (la couche sous-jacente que chaque client mongoDB utilise dans nodejs) sauf l'API cursor que d'autres ont mentionnée a une belle API stream(#458). Malheureusement, je ne l'ai pas trouvé documenté ailleurs.

Mise à Jour: il y a des docs aussi ici.

, Il peut être utilisé comme ceci:

var stream = collection.find().stream()
stream.on('error', function (err) {
  console.error(err)
})
stream.on('data', function (doc) {
  console.log(doc)
})

Il implémente réellement L'interface ReadableStream, donc il a tous les goodies (pause/resume etc)

25
répondu Dan Milon 2014-11-21 09:51:11

mongoose n'est pas vraiment "pilote", c'est en fait un wrapper ORM autour du pilote MongoDB (node-mongodb-native).

Pour faire ce que vous faites, jetez un oeil à la méthode .find et .each du pilote. Voici un code des exemples:

// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
  sys.puts("Printing docs from Cursor Each")
  cursor.each(function(err, doc) {
    if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
  })                    
});

Pour diffuser les résultats, vous remplacez essentiellement ce sys.puts par votre fonction "stream". Je ne sais pas comment vous prévoyez de diffuser les résultats. Je pense que vous pouvez faire response.write() + response.flush(), mais vous pouvez aussi commander socket.io.

10
répondu Gates VP 2012-03-22 23:37:09

Voici la solution que j'ai trouvée (veuillez me corriger si c'est la mauvaise façon de le faire): (Excusez aussi le mauvais codage-trop tard pour moi maintenant pour faire semblant)

var sys = require('sys')
var http = require("http");

var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
  Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
  Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
  Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;

var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));

var products;

db.open(function (error, client) {
  if (error) throw error;
  products = new Collection(client, 'products');
});

function ProductReader(collection) {
        this.collection = collection;
}

ProductReader.prototype = new process.EventEmitter();

ProductReader.prototype.do = function() {
        var self = this;

        this.collection.find(function(err, cursor) {
                if (err) {
                        self.emit('e1');
                        return;

                }
                sys.puts("Printing docs from Cursor Each");

                self.emit('start');
                cursor.each(function(err, doc) {
                        if (!err) {
                                self.emit('e2');
                                self.emit('end');
                                return;
                        }

                        if(doc != null) {
                                sys.puts("doc:" + doc.name);
                                self.emit('doc',doc);
                        } else {
                                self.emit('end');
                        }
                })
        });
};
http.createServer(function(req,res){
        pr = new ProductReader(products);
        pr.on('e1',function(){
                sys.puts("E1");
                res.writeHead(400,{"Content-Type": "text/plain"});
                res.write("e1 occurred\n");
                res.end();
        });
        pr.on('e2',function(){
                sys.puts("E2");
                res.write("ERROR\n");
        });

        pr.on('start',function(){
                sys.puts("START");
                res.writeHead(200,{"Content-Type": "text/plain"});
                res.write("<products>\n");
        });

        pr.on('doc',function(doc){
                sys.puts("A DOCUMENT" + doc.name);
                res.write("<product><name>" + doc.name + "</name></product>\n");
        });

        pr.on('end',function(){
                sys.puts("END");
                res.write("</products>");
                res.end();
        });

        pr.do();

  }).listen(8000);
2
répondu Jan Algermissen 2011-09-11 20:56:01

J'ai étudié moi-même les flux mongodb, alors que je n'ai pas toute la réponse que vous cherchez, j'en ai une partie. vous pouvez configurer un socket.io flux

Cela utilise javascript socket.io et socket. io-streaming disponible à NPM aussi mongodb pour la base de données parce que l'utilisation d'une base de données vieille de 40 ans qui a des problèmes est incorrecte, il est temps de moderniser de plus, la base de données de 40 ans est SQL et SQL ne fait pas de flux à ma connaissance

Donc, même si vous avez seulement posé des questions sur les données en cours du serveur au client, je veux aussi obtenir le client au serveur dans ma réponse parce que je ne peux jamais le trouver n'importe où quand je recherche et je voulais configurer un endroit avec les éléments send et receive via stream afin que tout le monde puisse le maîtriser rapidement.

Côté Client envoi de données au serveur via streaming

stream = ss.createStream();
blobstream=ss.createBlobReadStream(data);
blobstream.pipe(stream);
ss(socket).emit('data.stream',stream,{},function(err,successful_db_insert_id){
 //if you get back the id it went into the db and everything worked
});

Serveur recevant le flux du côté client, puis répondant une fois terminé

ss(socket).on('data.stream.out',function(stream,o,c){
 buffer=[];
 stream.on('data',function(chunk){buffer.push(chunk);});
 stream.on('end',function(){
  buffer=Buffer.concat(buffer);
  db.insert(buffer,function(err,res){
   res=insertedId[0];
   c(null,res);
  });
 });
});

/ / C'est l'autre moitié de cela la récupération des données et le streaming vers le client

Côté Client demandant et recevant des données de flux à partir du serveur

stream=ss.createStream();
binarystring='';
stream.on('data',function(chunk){ 
 for(var I=0;i<chunk.length;i++){
  binarystring+=String.fromCharCode(chunk[i]); 
 }
});
stream.on('end',function(){ data=window.btoa(binarystring); c(null,data); });
ss(socket).emit('data.stream.get,stream,o,c);

Réponse côté serveur à la demande de diffusion de données

ss(socket).on('data.stream.get',function(stream,o,c){
 stream.on('end',function(){
  c(null,true);
 });
 db.find().stream().pipe(stream);
});

Le tout dernier il y a le seul où je suis en train de le sortir de mes fesses parce que je ne l'ai pas encore essayé, mais cela devrait fonctionner. En fait, je fais quelque chose de similaire, mais j'écris le fichier sur le disque dur, puis j'utilise fs.createReadStream pour le diffuser au client. Donc, je ne sais pas si 100% mais d'après ce que j'ai lu, il devrait être, Je te rappellerai une fois que je l'aurai testé.

P. S. quelqu'un veut m'embêter à propos de ma façon familière de parler, je suis canadien, et j'aime dire " eh "viens à moi avec tes câlins et frappe bros / sis': D

0
répondu postJS 2018-07-09 14:59:55