Comment gérer la pagination avec RxJava?

je cherche à convertir mon application android pour utiliser Rxjava pour les requêtes réseau. J'ai actuellement accès à un service de ce type:

getUsersByKeyword(String query, int limit, int offset)

d'après ce que j'ai compris, les Observables sont une interface "push" plutôt qu'une interface "pull". Alors, voici comment je comprends les choses:

  • l'application s'enregistre avec le service, obtenant Observable pour la requête
  • les résultats sont poussés à l'application
  • l'application traite des résultats
  • quand app veut plus de résultats ...?

C'est ici que les choses tombent en panne pour moi. Auparavant, je voudrais juste demander au webservice pour exactement ce que je veux, faire la requête à nouveau avec l'offset. Mais dans ce cas, cela impliquerait de créer un autre Observable et d'y souscrire, allant à l'encontre du but recherché.

Comment dois-je gérer la pagination dans mon application? (C'est une application android, mais je ne pense pas que ce soit pertinent).

17
demandé sur john the android guy 2014-10-12 05:41:54

4 réponses

c'était du hard rock!) Nous avons donc demande à réseau:

getUsersByKeyword(String query, int limit, int offset)

et cette requête renvoie par exemple

List< Result >

Si nous utilisons RetroFit pour la mise en réseau, cette requête sera:

Observable< List< Result >> getUsersByKeyword(String query, int limit, int offset)

En conséquence, nous voulons obtenir tout Result à partir du serveur.

Donc, il ressemblera à ceci

int page = 50;
int limit = page;
Observable
                .range(0, Integer.MAX_VALUE - 1)
                .concatMap(new Func1<Integer, Observable<List<Result>>>() {
                    @Override
                    public Observable<List<Result>> call(Integer integer) {
                        return getUsersByKeyword(query, integer * page, limit);
                    }
                })
                .takeWhile(new Func1<List<Result>, Boolean>() {
                    @Override
                    public Boolean call(List<Result> results) {
                        return !results.isEmpty();
                    }
                })
                .scan(new Func2< List<Result>, List<Result>, List<Result>>() {
                    @Override
                    public List<Result> call(List<Result> results, List< Result> results2) {
                        List<Result> list = new ArrayList<>();
                        list.addAll(results);
                        list.addAll(results2);
                        return list;
                    }
                })
                .last()
                .subscribe(new Subscriber<List<Result>>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(List<Results> results) {
                    }
                });

Code a été TESTÉ!

8
répondu xoxol_89 2015-08-13 11:41:49

Donc, si c'est une façon de pagination, voici un modèle que vous pourriez essayer. Ce code n'a pas été exécuté ou compilé, mais j'ai essayé à annoter pour expliquer ce qui se passe.

private static final int LIMIT = 50;

// Given: Returns a stream of dummy event objects telling us when
// to grab the next page. This could be from a click or wherever.
Observable<NextPageEvent> getNextPageEvents();  

// Given:
// The search query keywords. Each emission here means a new top-level
// request;
Observable<String> queries;

queries.switchMap((query) -> getNextPageEvents()
        // Ignore 'next page' pokes when unable to take them.
        .onBackPressureDrop()
        // Seed with the first page.
        .startWith(new NextPageEvent())
        // Increment the page number on each request.
        .scan(0, (page, event) -> page + 1) 
        // Request the page from the server.
        .concatMap((page) -> getUsersByKeyword(query, LIMIT, LIMIT * page)
                // Unroll Observable<List<User> into Observable<User>
                .concatMap((userList) -> Observable.from(userList))
                .retryWhen(/** Insert your favorite retry logic here. */))
        // Only process new page requests sequentially.
        .scheduleOn(Schedulers.trampoline())
        // Trampoline schedules on the 'current thread', so we make sure that's
        // a background IO thread.
        .scheduleOn(Schedulers.io());

cela devrait permettre au signal 'next page events' de déclencher un chargement des données de la page suivante à chaque fois, ainsi que de ne pas sauter des pages si elle rencontre une erreur en chargeant une. Il redémarre également complètement au niveau supérieur s'il reçoit une nouvelle requête de recherche. Si je (ou quelqu'un d'autre?) a des fois, j'aimerais pour vérifier mes hypothèses sur le trampoline et la contre-pression et s'assurer qu'il bloque toute tentative de récupérer prématurément la page suivante pendant qu'on charge.

5
répondu lopar 2015-12-22 10:12:13

j'ai fait ça et ce n'est pas si difficile.

l'approche consiste à modéliser chaque première requête (offset 0) dans un firstRequestsObservable. Pour le rendre facile, vous pouvez le faire comme un Publisubject où vous appelez onNext() pour alimenter la requête suivante, mais il y a des façons plus intelligentes de le faire (par exemple, si les requêtes sont faites quand un bouton est cliqué, alors le requestObservable est le click observable mappé par certains opérateurs).

une Fois que vous avez firstRequestsObservable à la place, vous pouvez faire responseObservable par flatMapping de firstRequestsObservable et ainsi de suite, pour faire de l'appel de service.

Voici maintenant le truc: faire un autre observable appelé subsequentRequestsObservable qui est mappé de responseObservable incrémenter le décalage (à cette fin, il est bon d'inclure, dans les données de réponse, le décalage de l'origine de la demande). Une fois que vous introduisez cet observable, vous devez maintenant changer la définition de responseObservable de sorte que cela dépend aussi de subsequentRequestsObservable. Vous obtenez alors une circulaire la dépendance comme ceci:

firstRequestsObservable -> responseObservable -> subsequentRequestsObservable -> responseObservable -> subsequentRequestsObservable -> ...

Pour briser ce cycle, vous voudrez probablement inclure un filter opérateur dans la définition de subsequentRequestsObservable, en filtrant les cas où le décalage dépasserait la limite du "total". La dépendance circulaire signifie aussi que vous besoin pour avoir l'un de ces Sujets, sinon il serait impossible de déclarer les observables. Je recommande responseObservable à ce Sujet.

donc, dans l'ensemble, vous initialisez tout d'abord la réponse observable en tant que sujet, puis vous déclarez que la première réponse est observable, puis vous déclarez que les demandes ultérieures sont observables à la suite du passage de la réponse observable par certains opérateurs. responseObservable peut alors être "alimenté" en utilisant onNext.

2
répondu André Staltz 2014-10-12 18:42:09

pour être clair, je suppose que vos questions est plus la façon d'appliquer RxJava dans votre application Android plutôt que sur le dos (bien qu'il soit possible d'appliquer aussi, mais pas aussi typique que le devant). Et je ne suis pas très sûr que cet exemple soit un cas d'utilisation typique pour l'utilisation du modèle de programmation fonctionnelle réactive (RFP) sauf pour les constructions de code propre.

chaque flux ci-dessous est un Observable. Personnellement, je voudrais le penser aussi stream comme il est facile de raisonner au sujet de la événement. Le flux de Pagination peut être représenté par 6 flux:

firstPageStream -f-------------------> // this is actually to produce very first event to obtain the first page result. The event can come from a touch in one of the screen that navigates to this list screen.
nextPageStream  -----n-------n-------> // this is the source of events coming from Next button 'touch' actions
prevPageStream  ---------p-------p---> // this is the source of events coming from Previous button 'touch' actions
requestStream   -r---r---r---r---r---> // this is to consume the signal from 3 streams above and spawn events (r) which create a query and pagination details i.e.: offset and limit
responseStream  -R---R---R---R---R---> // this is to take each r and invoke your web service getUsersByKeyword() then spawn the response (R)

les flux ci-dessus peuvent être représentés en pseudo code (style JS) ci-dessous (il serait assez facile de traduire vers RxJava ou d'autres langues)

firstPageStream = Observable.just(0); // offset=0
nextPageStream = Observable.fromEvent(nextButton, 'touch')
                           .map(function() {
                              offset = offset + limit;
                              return offset;
                            });
prevPageStream = Observable.fromEvent(prevButton, 'touch')
                           .map(function() {
                              offset = offset - limit; // TODO some check here
                              return offset;
                            });
requestStream = Observable.merge(firstPageStream, nextPageStream, prevPageStream)
                            .map(function(offsetValue) {
                               return {offset : offsetValue, limit: limit};
                            });
responseStream = requestStream.flatMap(function(pagination) {
                                return webservice.getUsersByKeyword(query, pagination.offset, pagination.limit); // assume this is async response
                              });
responseStream.subscribe(function(result) {
  // use result to render the display
});

PS1: Je n'ai pas testé le code ci-dessus. J'apprends la RFP alors j'essaie de penser de façon réactive en écrivant. Ouverts à toute suggestion.

PS2: je suis très influencé par https://gist.github.com/staltz/868e7e9bc2a7b8c1f754 pour la façon d'expliquer le flux de réactif.

0
répondu erolagnab 2015-02-13 14:02:46