MongoDB Java API slow reading peformance

nous lisons à partir d'un MongoDB local tous les documents d'une collection et la performance n'est pas très brillante.

nous avons besoin de vider toutes les données, ne vous inquiétez pas pourquoi, faites confiance que c'est vraiment nécessaire et il n'y a pas de solution possible.

nous avons 4mio documents qui ressemblent à :

{
    "_id":"4d094f58c96767d7a0099d49",
    "exchange":"NASDAQ",
    "stock_symbol":"AACC",
    "date":"2008-03-07",
    "open":8.4,
    "high":8.75,
    "low":8.08,
    "close":8.55,
    "volume":275800,
    "adj close":8.55
}

Et nous utilisons cela pour l'instant trivial code à lire:

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("localhost");
MongoCollection<Document> collection = database.getCollection("test");

MutableInt count = new MutableInt();
long start = System.currentTimeMillis();
collection.find().forEach((Block<Document>) document -> count.increment() /* actually something more complicated */ );
long start = System.currentTimeMillis();

nous lisons toute la collection à 16 secondes (ligne 250k / sec), ce qui est vraiment pas du tout impressionnant avec de petits documents. Gardez à l'esprit que nous voulons charger 800mio rows. Aucun agrégat, réduction de carte ou similaire ne sont possibles.

est-ce aussi rapide que MongoDB obtient ou y a-t-il d'autres façons de charger des documents plus rapidement (autres techniques, déplacement de Linux, plus de RAM, paramètres...)?

23
demandé sur nullpointer 2018-08-30 19:09:22

5 réponses

Vous ne précisez pas votre cas d'utilisation, il est donc très difficile de vous dire comment régler votre requête. (I.e.: qui voudrait charger 800mil row à la fois juste pour le compte?).

compte tenu de votre schéma, je pense que vos données sont presque en lecture seule et votre tâche est liée à l'agrégation de données.

votre travail actuel est juste lire les données, (très probablement votre pilote Lira en lot), puis arrêter, puis effectuer quelques calculs (enfer oui, un enveloppeur int est utilisé pour augmenter le temps de traitement plus), puis répéter. Ce n'est pas une bonne approche. La DB n'est pas magiquement rapide si vous n'y accédez pas de la bonne manière.

Si le calcul n'est pas trop complexe, je vous conseille d'utiliser le cadre d'agrégation au lieu de charger tout dans votre RAM.

quelque chose que vous devriez considérer pour améliorer votre agrégation:

  1. divisez votre ensemble de données en ensembles plus petits. (Par exemple: cloison par date, partition par exchange...). Ajouter l'index au support cette partition et opérer l'agrégation sur la partition puis combiner le résultat (typique divide-n-conquer approche)
  2. Projet uniquement les champs nécessaires
  3. filtrer les documents inutiles (si possible)
  4. Autoriser diskusage si vous ne pouvez pas effectuer votre agrégation sur la mémoire (si vous atteignez la limite de 100 Mo par pipiline).
  5. utilisez le pipeline intégré pour accélérer votre calcul (par exemple:$count pour ton exemple)

Si votre calcul est trop complexe que vous ne pouvez pas exprimer avec l'agrégation de cadre, puis utilisez mapReduce. Il opère sur l' mongod le processus et les données n'ont pas besoin de transférer sur le réseau vers votre mémoire.

mise à Jour

Donc regarder comme si vous voulez faire un traitement OLAP, et vous coincé à l'ETL étape.

vous n'avez pas besoin et devez éviter de charger L'ensemble des données OLTP à OLAP à chaque fois. Il suffit de charger les nouvelles modifications dans votre entrepôt de données. Alors tout d'abord le chargement/déchargement des données prend plus de temps est normal et acceptable.

pour le premier chargement, vous devriez considérer les points suivants:

  1. Divide-N-Conquer, encore une fois, brise vos données à un plus petit ensemble de données (avec prédicate comme date / exchange / stock label...)
  2. faites des calculs en parallèle, puis combinez votre résultat (vous devez partitionner votre ensemble de données correctement)
  3. Faire le calcul sur le lot au lieu de traitement forEach: charger les données partition puis calculer au lieu de calculer un par un.
12
répondu Mạnh Quyết Nguyễn 2018-09-04 09:03:10

collection.find().forEach((Block<Document>) document -> count.increment());

cette ligne peut s'ajouter beaucoup de temps puisque vous itérez plus de 250K d'enregistrements en mémoire.

À vérifier rapidement si c'est le cas, vous pouvez essayer ce -

long start1 = System.currentTimeMillis();
List<Document> documents = collection.find();
System.out.println(System.currentTimeMillis() - start1);

long start2 = System.currentTimeMillis();
documents.forEach((Block<Document>) document -> count.increment());
System.out.println(System.currentTimeMillis() - start2);

Cela vous aidera à comprendre comment beaucoup de temps il prend pour obtenir les documents de la base de données et combien de temps l'itération.

3
répondu RishikeshDhokare 2018-08-30 16:16:30

ce que je pense que je devrais faire dans votre cas était une solution simple et simultanément un moyen efficace est de maximiser le débit global en utilisant parallelCollectionScan

permet aux applications d'utiliser plusieurs curseurs parallèles lors de la lecture de tous les documents d'une collection, augmentant ainsi le débit. Le la commande parallelCollectionScan renvoie un document qui contient tableau des informations du curseur.

Chaque curseur permet d'accéder à le retour d'une partie de documents d'une collection. Itérer chaque curseur renvoie chaque document dans la collection. Les curseurs ne contiennent pas les résultats des commande de base de données. Le résultat de la commande database identifie les curseurs, mais ne contient pas ou qui constituent des curseurs.

Un exemple simple parallelcollectionsscan devrait être somethink comme celui-ci

 MongoClient mongoClient = MongoClients.create();
 MongoDatabase database = mongoClient.getDatabase("localhost");
 Document commandResult = database.runCommand(new Document("parallelCollectionScan", "collectionName").append("numCursors", 3));
2
répondu xXxpRoGrAmmErxXx 2018-09-04 08:17:29

tout d'abord, comme @Xtreme-biker l'a commenté, les performances dépendent fortement de votre matériel. Plus précisément, mon premier conseil serait de vérifier si vous utilisez une machine virtuelle ou un hôte natif. Dans mon cas avec un VM CentOS sur un i7 avec un lecteur SDD je peux lire 123 000 docs par seconde mais exactement le même code qui tourne sur L'hôte Windows sur le même lecteur lit jusqu'à 387 000 docs par seconde.

ensuite, supposons que vous ayez vraiment besoin de lire toute la collection. Ce est de dire que vous devez effectuer un balayage complet. Et supposons que vous ne pouvez pas changer la configuration de votre serveur MongoDB mais seulement optimiser votre code.

alors tout se résume à quoi

collection.find().forEach((Block<Document>) document -> count.increment());

en fait si.

un défilement rapide de la collection MongoCollection.find() montre qu'en fait cela:

ReadPreference readPref = ReadPreference.primary();
ReadConcern concern = ReadConcern.DEFAULT;
MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
Decoder<Document> codec = new DocumentCodec();
FindOperation<Document> fop = new FindOperation<Document>(ns,codec);
ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
QueryBatchCursor<Document> cursor = (QueryBatchCursor<Document>) fop.execute(readBinding);
AtomicInteger count = new AtomicInteger(0);
try (MongoBatchCursorAdapter<Document> cursorAdapter = new MongoBatchCursorAdapter<Document>(cursor)) {
    while (cursorAdapter.hasNext()) {
        Document doc = cursorAdapter.next();
        count.incrementAndGet();
    }
}

voici le FindOperation.execute() est assez rapide (moins de 10ms) et la plupart du temps est passé à l'intérieur de la boucle while, et spécifiquement à l'intérieur de la méthode privée QueryBatchCursor.getMore()

getMore() appelle DefaultServerConnection.command() et son temps est consommé essentiellement en deux opérations: 1) recherche des données de chaîne de caractères sur le serveur et 2) conversion des données string en BsonDocument.

il s'avère que Mongo est tout à fait intelligent en ce qui concerne le nombre de voyages aller-retour réseau qu'il fera pour obtenir un grand ensemble de résultats. Il récupérera d'abord 100 résultats avec une commande firstBatch, puis de plus grands lots avec nextBatch étant la taille du lot en fonction de la taille de la collecte jusqu'à une limite.

donc, sous le bois quelque chose comme ça va arriver pour aller chercher la première fournée.

ReadPreference readPref = ReadPreference.primary();
ReadConcern concern = ReadConcern.DEFAULT;
MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
FieldNameValidator noOpValidator = new NoOpFieldNameValidator();
DocumentCodec payloadDecoder = new DocumentCodec();
Constructor<CodecProvider> providerConstructor = (Constructor<CodecProvider>) Class.forName("com.mongodb.operation.CommandResultCodecProvider").getDeclaredConstructor(Decoder.class, List.class);
providerConstructor.setAccessible(true);
CodecProvider firstBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("firstBatch"));
CodecProvider nextBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("nextBatch"));
Codec<BsonDocument> firstBatchCodec = fromProviders(Collections.singletonList(firstBatchProvider)).get(BsonDocument.class);
Codec<BsonDocument> nextBatchCodec = fromProviders(Collections.singletonList(nextBatchProvider)).get(BsonDocument.class);
ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
BsonDocument find = new BsonDocument("find", new BsonString(collectionName));
Connection conn = readBinding.getReadConnectionSource().getConnection();

BsonDocument results = conn.command(databaseName,find,noOpValidator,readPref,firstBatchCodec,readBinding.getReadConnectionSource().getSessionContext(), true, null, null);
BsonDocument cursor = results.getDocument("cursor");
long cursorId = cursor.getInt64("id").longValue();

BsonArray firstBatch = cursor.getArray("firstBatch");

Puis cursorId est utilisé pour extraire chaque lot suivant.

à mon avis, le "problème" avec l'implémentation du pilote est que la chaîne de caractères vers JSON decoder est injectée mais pas le JsonReader-sur lequel repose la méthode decode (). C'est de cette façon jusqu'à com.mongodb.internal.connection.InternalStreamConnection où vous êtes déjà près de la communication socket.

par conséquent, je pense qu'il n'y a presque rien que vous pourriez faire pour améliorer sur MongoCollection.find() sauf si vous allez aussi profond que InternalStreamConnection.sendAndReceiveAsync()

vous ne pouvez pas réduire le nombre de voyages aller-retour et vous ne pouvez pas changer la façon dont la réponse est convertie en BsonDocument. Pas sans court-circuiter le conducteur et écrire à votre propre client, ce qui, je pense, est une bonne idée.

P. D. si vous voulez essayer certains des code ci-dessus, vous aurez besoin de la méthode getCluster() qui nécessite un hack sale dans mongo-java-pilote.

private Cluster getCluster() {
    Field cluster, delegate;
    Cluster mongoCluster = null;
    try {
        delegate = mongoClient.getClass().getDeclaredField("delegate");
        delegate.setAccessible(true);
        Object clientDelegate = delegate.get(mongoClient);
        cluster = clientDelegate.getClass().getDeclaredField("cluster");
        cluster.setAccessible(true);
        mongoCluster = (Cluster) cluster.get(clientDelegate);
    } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
        System.err.println(e.getClass().getName()+" "+e.getMessage());
    }
    return mongoCluster;
}
1
répondu Serg M Ten 2018-09-10 06:16:48

d'après mes calculs, vous traitez environ 50 MiB/s (250k ligne/sec * 0.2 KiB/ligne). C'est entrer dans la commande de disque et le territoire de goulot d'étranglement de réseau. Quel type de stockage utilise MongoDB? Quelle bande passante avez-vous entre le client et le serveur MongoDB? Avez-vous essayé de co-localiser le serveur et le client sur un réseau à grande vitesse (>= 10 Gib/s) avec une latence minimale (< 1,0 ms)? Gardez à l'esprit que si vous utilisez un fournisseur de cloud computing comme AWS ou GCP, ils vont avoir les goulots d'étranglement de la virtualisation qui s'ajoutent aux goulots d'étranglement physiques.

vous avez demandé des paramètres qui pourraient aider. Vous pouvez essayer de changer les paramètres de compression sur le connexion et collection (options sont "none"snappy et zlib). Même si ni l'améliorer snappy, voir la différence que le réglage fait (ou ne fait pas) pourrait aider à comprendre quelle partie du système est le plus stressé.

Java n'a pas de bon performance pour le nombre crunching comparé à C++ ou Python, donc vous pourriez envisager de réécrire cette opération particulière dans l'un de ces langages et de l'intégrer à votre code Java. Je suggère que vous fassiez un essai de juste boucler les données en Python et de les comparer à la même chose en Java.

0
répondu Old Pro 2018-09-09 22:49:18