Utilisation de RxJava et Okhttp

je veux demander à une url en utilisant okhttp dans un autre thread (comme le thread IO) et obtenir Response dans le fil principal Android, mais je ne sais pas comment créer un Observable.

18
demandé sur Marcin Koziński 2015-09-21 08:27:16

4 réponses

tout d'Abord ajouter RxAndroid à vos dépendances, puis créez votre Observable comme ceci:

 Subscription subscription =   Observable.create(new Observable.OnSubscribe<Response>() {
        OkHttpClient client = new OkHttpClient();
          @Override
          public void call(Subscriber<? super Response> subscriber) {
            try {
              Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
              if (response.isSuccessful()) {
                  if(!subscriber.isUnsubscribed()){
                     subscriber.onNext(response);
                  }
                  subscriber.onCompleted();
              } else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
                  subscriber.onError(new Exception("error"));
                }
            } catch (IOException e) {
              if (!subscriber.isUnsubscribed()) {
                  subscriber.onError(e);
              }
            }
          }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Response>() {
              @Override
              public void onCompleted() {

              }

              @Override
              public void onError(Throwable e) {

              }

              @Override
              public void onNext(Response response) {

              }
            });

il demandera votre url dans un autre thread (thread io) et l'observera sur le thread principal android.

Et enfin, lorsque vous quittez l'écran utiliser subsribtion.unsubscribe() pour éviter une fuite de mémoire.

quand vous utilisez Observable.create, vous devriez écrire beaucoup de code boilerplate, aussi vous devez gérer l'Abonnement par vos propres. Une meilleure alternative est d'utiliser différer. La forme de la doc:

ne pas créer Observables jusqu'à ce que l'observateur s'abonne, et de créer une nouvelle Observables pour chaque observateur

L'opérateur Defer attend jusqu'à ce qu'un observateur s'y abonne, et puis il génère un Observable, typiquement avec une usine Observable fonction. Il le fait à nouveau pour chaque abonné, donc bien que chaque l'abonné peut penser qu'il souscrit à la même Observable, en fait chaque abonné obtient son propre séquence.

Alors que Marcin Koziński mentionné, vous avez juste besoin de faire ceci:

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});
22
répondu Saeed Masoumi 2017-06-26 10:09:23

Il est plus facile et plus sûr à utiliser Observable.defer() au lieu de Observable.create():

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

de cette façon, la désinscription et la contre-pression sont gérées pour vous. Ici un grand poste par Dan Lewcreate() et defer().

Si vous souhaitez aller de l' Observable.create() route, alors il devrait regarder de plus comme dans cette bibliothèqueisUnsubscribed() appels dispersés partout. Et je crois que ça ne supporte toujours pas la contre-pression.

16
répondu Marcin Koziński 2016-07-06 08:51:35

je me rends compte que ce post est un peu vieux, mais il ya une nouvelle et plus pratique de le faire maintenant

Observable.fromCallable {
        client.newCall(Request.Builder().url("your url").build()).execute()
    }

Plus d'info: https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/

7
répondu FRR 2017-10-16 19:16:05

je suis arrivé en retard à la discussion, mais, si pour une raison quelconque le code nécessaire pour le streaming le corps de la réponse, puis defer ou fromCallable ne pas le faire. Au lieu de cela on peut utiliser le using opérateur.

Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
             response -> { // 2
                 ...

                 return Single.just((Consumer<OutputStream>) fileOutput -> {
                     try (InputStream upstreamResponseStream = response.body().byteStream();
                          OutputStream fileOutput = responseBodyOutput) {
                         ByteStreams.copy(upstreamResponseStream, output);
                     }
                 });
             },
             Response::close, // 3
             false) // 4
      .subscribeOn(Schedulers.io()) // 5
      .subscribe(copier -> copier.accept(...), // 6
                 throwable -> ...); // 7
  1. La première lambda exécute la réponse après au moment de la souscription.
  2. la seconde lambda crée le type observable, ici avec Single.just(...)
  3. La troisième lambda dispose de la réponse. defer on pourrait utiliser le essayez avec les ressources style.
  4. mettez le eager basculer à false de faire appeler l'éliminateur après l'événement terminal, c'est-à-dire après exécution du consommateur abonné.
  5. bien entendu, la chose se produire sur un autre pool de threads
  6. Voici la lambda qui consommera le corps de réponse. Sans eagerfalse, le code va soulever une IOException avec la raison 'closed' parce que la réponse sera déjà fermée avant d'entrer ceci lambda.
  7. onError lambda devrait traiter les exceptions, particulièrement les IOException qui ne peut plus être pris avec l' using opérateur comme c'était possible avec un try/catch avec defer.
0
répondu Brice 2018-08-30 16:27:26