Mise en cache réactive du service HTTP

J'utilise RsJS 5 (5.0.1) pour mettre en cache en angle 2. Il fonctionne bien.

de la viande de La fonction de mise en cache est:

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
  )
  .publishReplay(1, this.RECACHE_INTERVAL)
  .refCount().take(1)
  .do(() => this.console.log('CACHE HIT', cacheKey));

actualFn est le this.http.get('/some/resource').

Comme je l'ai dit, cela fonctionne parfaitement pour moi. Le cache est retourné de l'observable pour la durée de la RECACHE_INTERVAL. Si une demande est faite après cet intervalle, le actualFn() sera appelée.

Ce que j'essaie de comprendre, c'est quand l' RECACHE_INTERVAL expire et l' actualFn() est appelle - comment retourner la dernière valeur. Il y a un espace de temps entre le moment où le RECACHE_INTERVAL expire et l' actualFn() est rejouée que l'observable ne renvoie pas de valeur. Je voudrais me débarrasser de cet écart dans le temps et toujours retourner la dernière valeur.

je pourrais utiliser un effet secondaire et de stocker la dernière bonne valeur appeler .next(lastValue) en attendant le retour de la réponse HTTP, mais cela semble naïf. Je voudrais utiliser une méthode "RxJS", une solution de fonction pure-si possible.

20
demandé sur НЛО 2017-02-16 20:08:29

3 réponses

presque n'importe quelle logique compliquée devient rapidement hors de contrôle si vous utilisez simple rxjs. Je voudrais plutôt mettre en œuvre personnalisée cache opérateur à partir de zéro, vous pouvez utiliser ce gist comme un exemple.

3
répondu kemsky 2017-02-18 23:25:41

réponse mise à jour:

Si voulez toujours utiliser la valeur précédente alors qu'une nouvelle demande est faite alors peut mettre un autre sujet dans la chaîne qui maintient la valeur la plus récente.

vous pouvez alors répéter la valeur pour qu'il soit possible de dire si elle provient du cache ou non. L'abonné peut alors filtrer les valeurs mises en cache s'il n'est pas intéressé par de ceux-ci.

// Take values while they pass the predicate, then return one more
// i.e also return the first value which returned false
const takeWhileInclusive = predicate => src =>
  src
  .flatMap(v => Observable.from([v, v]))
  .takeWhile((v, index) =>
     index % 2 === 0 ? true : predicate(v, index)
  )
  .filter((v, index) => index % 2 !== 1);

// Source observable will still push its values into the subject
// even after the subscriber unsubscribes
const keepHot = subject => src =>
  Observable.create(subscriber => {
    src.subscribe(subject);

    return subject.subscribe(subscriber);
  });

const cachedRequest = request
   // Subjects below only store the most recent value
   // so make sure most recent is marked as 'fromCache'
  .flatMap(v => Observable.from([
     {fromCache: false, value: v},
     {fromCache: true, value: v}
   ]))
   // Never complete subject
  .concat(Observable.never())
   // backup cache while new request is in progress
  .let(keepHot(new ReplaySubject(1)))
   // main cache with expiry time
  .let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL)))
  .publish()
  .refCount()
  .let(takeWhileInclusive(v => v.fromCache));

  // Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL
  // Subscribers will get the most recent cached value first then an updated value

https://acutmore.jsbin.com/kekevib/8/edit?js, console

réponse Originale à cette question:

au lieu de définir une taille de fenêtre sur le replaySubject - vous pourriez changer la source observable à répéter après un délai.

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
  )
  .repeatWhen(_ => _.delay(this.RECACHE_INTERVAL))
  .publishReplay(1)
  .refCount()
  .take(1)
  .do(() => this.console.log('CACHE HIT', cacheKey));

repeatWhen l'opérateur nécessite RxJs-beta12 ou plus https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09

2
répondu Ashley 2017-02-23 20:35:54

voir cette démo:https://jsbin.com/todude/10/edit?js, console

notez que j'essaie d'obtenir des résultats en cache à 1200ms quand le cas est invalidé et puis à 1300ms lorsque la requête précédente est toujours en attente (il faut 200ms). Les deux résultats sont reçus comme il se doit.

cela se produit parce que lorsque vous vous abonnez et le publishReplay() ne contient aucune valeur valide, il n'émet rien et ne remplit pas immédiatement (grâce à take(1)) il doit donc s'abonner à sa source qui fait les requêtes HTTP (cela se produit en fait dans refCount()).

alors le second abonné ne recevra rien aussi bien et sera ajouté au tableau des observateurs en publishReplay(). Il ne s'abonnera plus car il est déjà abonné à sa source (refCount()) et attend une réponse.

donc la situation que vous décrivez ne devrait pas se produire je pense. Éventuellement faire une démo qui montre votre problème.

EDIT:

Émettant invalidé élément et produits frais

l'exemple suivant montre un peu de fonctionnalité différente de l'exemple lié. Si la réponse mise en cache est invalidée, elle sera émise de toute façon et alors elle reçoit aussi la nouvelle valeur. Cela signifie que l'abonné reçoit un ou deux valeurs:

  • 1 valeur: La valeur mise en cache
  • 2 valeurs: les Invalides une valeur mise en cache, puis une nouvelle valeur qui sera mise en cache à partir de maintenant.

Le code pourrait ressembler à l'exemple suivant:

let counter = 1;
const RECACHE_INTERVAL = 1000;

function mockDataFetch() {
  return Observable.of(counter++)
    .delay(200);
}

let source = Observable.defer(() => {
  const now = (new Date()).getTime();

  return mockDataFetch()
    .map(response => {
      return {
        'timestamp': now,
        'response': response,
      };
    });
});

let updateRequest = source
  .publishReplay(1)
  .refCount()
  .concatMap(value => {
    if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) {
      return Observable.from([value.response, null]);
    } else {
      return Observable.of(value.response);
    }
  })
  .takeWhile(value => value);


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);

Voir la démonstration en ligne: https://jsbin.com/ketemi/2/edit?js console

ceci affiche pour la console la sortie suivante:

Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1200: 2
Response 1300: 2
Response 1500: 2
Response 3500: 2
Response 3500: 3

Avis 1200 et 1300 a reçu en premier la valeur ancienne mise en cache 1 immédiatement puis un autre de valeur avec les frais 2 valeur.

Sur l'autre main 1500 reçu que la nouvelle valeur car 2 est déjà en cache et est valide.

Le plus déroutant chose est probablement pourquoi suis-je à l'aide de concatMap().takeWhile(). C'est parce que j'ai besoin de m'assurer que la nouvelle réponse (et non la réponse invalidée) est la dernière valeur avant d'envoyer une notification complète et il n'y a probablement pas d'opérateur pour cela (ni first() ni takeWhile() sont applicables à cet usage).

Émettant seulement l'article courant sans attendre rafraîchir

encore un autre cas d'utilisation pourrait être lorsque nous voulons émettre uniquement la valeur mise en cache sans attendre une nouvelle réponse de la requête HTTP.

let counter = 1;
const RECACHE_INTERVAL = 1000;

function mockDataFetch() {
  return Observable.of(counter++)
    .delay(200);
}

let source = Observable.defer(() => {
  const now = (new Date()).getTime();

  return mockDataFetch()
    .map(response => {
      return {
        'timestamp': now,
        'response': response,
      };
    });
});

let updateRequest = source
  .publishReplay(1)
  .refCount()
  .concatMap((value, i) => {
    if (i === 0) {
      if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { // is cached item valid?
        return Observable.from([value.response, null]);
      } else {
        return Observable.of(value.response);
      }
    }
    return Observable.of(null);
  })
  .takeWhile(value => value);


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3800:", val)), 3800);

Voir la démonstration en ligne: https://jsbin.com/kebapu/2/edit?js console

cet exemple imprime vers la console:

Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1500: 2
Response 3500: 2
Response 3800: 3

Notez que les deux 1200 et 1300 recevoir 1 parce que c'est la valeur mise en cache même si c'est valide maintenant. Le premier appel à 1200 lance simplement une nouvelle requête HTTP sans attendre sa réponse et n'émet que la valeur mise en cache. Puis à 1500 la valeur fraîche est mise en cache donc elle est juste réémit. La même chose s'applique à 3500 et 3800.

notez que l'abonné à 1200 recevra le next notification immédiate mais la complete la notification ne sera envoyée qu'après la fin de la requête HTTP. Nous devons attendre parce que si nous avons envoyé complete juste après next il ferait la chaîne pour disposer de ses disposables ce qui devrait également annuler la requête HTTP (ce que nous ne voulons absolument pas faire).

2
répondu martin 2017-09-23 17:11:43