Utilisation de RxJava et Okhttp
je veux demander à une url en utilisant okhttp dans un autre thread (comme le thread IO) et obtenir Response
dans le fil principal Android, mais je ne sais pas comment créer un Observable
.
4 réponses
tout d'Abord ajouter RxAndroid
à vos dépendances, puis créez votre Observable
comme ceci:
Subscription subscription = Observable.create(new Observable.OnSubscribe<Response>() {
OkHttpClient client = new OkHttpClient();
@Override
public void call(Subscriber<? super Response> subscriber) {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
if (response.isSuccessful()) {
if(!subscriber.isUnsubscribed()){
subscriber.onNext(response);
}
subscriber.onCompleted();
} else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
subscriber.onError(new Exception("error"));
}
} catch (IOException e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Response>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Response response) {
}
});
il demandera votre url dans un autre thread (thread io) et l'observera sur le thread principal android.
Et enfin, lorsque vous quittez l'écran utiliser subsribtion.unsubscribe()
pour éviter une fuite de mémoire.
quand vous utilisez Observable.create
, vous devriez écrire beaucoup de code boilerplate, aussi vous devez gérer l'Abonnement par vos propres. Une meilleure alternative est d'utiliser différer.
La forme de la doc:
ne pas créer Observables jusqu'à ce que l'observateur s'abonne, et de créer une nouvelle Observables pour chaque observateur
L'opérateur Defer attend jusqu'à ce qu'un observateur s'y abonne, et puis il génère un Observable, typiquement avec une usine Observable fonction. Il le fait à nouveau pour chaque abonné, donc bien que chaque l'abonné peut penser qu'il souscrit à la même Observable, en fait chaque abonné obtient son propre séquence.
Alors que Marcin Koziński mentionné, vous avez juste besoin de faire ceci:
final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
@Override public Observable<Response> call() {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
Il est plus facile et plus sûr à utiliser Observable.defer()
au lieu de Observable.create()
:
final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
@Override public Observable<Response> call() {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
de cette façon, la désinscription et la contre-pression sont gérées pour vous. Ici un grand poste par Dan Lewcreate()
et defer()
.
Si vous souhaitez aller de l' Observable.create()
route, alors il devrait regarder de plus comme dans cette bibliothèqueisUnsubscribed()
appels dispersés partout. Et je crois que ça ne supporte toujours pas la contre-pression.
je me rends compte que ce post est un peu vieux, mais il ya une nouvelle et plus pratique de le faire maintenant
Observable.fromCallable {
client.newCall(Request.Builder().url("your url").build()).execute()
}
Plus d'info: https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/
je suis arrivé en retard à la discussion, mais, si pour une raison quelconque le code nécessaire pour le streaming le corps de la réponse, puis defer
ou fromCallable
ne pas le faire. Au lieu de cela on peut utiliser le using
opérateur.
Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
response -> { // 2
...
return Single.just((Consumer<OutputStream>) fileOutput -> {
try (InputStream upstreamResponseStream = response.body().byteStream();
OutputStream fileOutput = responseBodyOutput) {
ByteStreams.copy(upstreamResponseStream, output);
}
});
},
Response::close, // 3
false) // 4
.subscribeOn(Schedulers.io()) // 5
.subscribe(copier -> copier.accept(...), // 6
throwable -> ...); // 7
- La première lambda exécute la réponse après au moment de la souscription.
- la seconde lambda crée le type observable, ici avec
Single.just(...)
- La troisième lambda dispose de la réponse.
defer
on pourrait utiliser le essayez avec les ressources style. - mettez le
eager
basculer àfalse
de faire appeler l'éliminateur après l'événement terminal, c'est-à-dire après exécution du consommateur abonné. - bien entendu, la chose se produire sur un autre pool de threads
- Voici la lambda qui consommera le corps de réponse. Sans
eager
false
, le code va soulever une IOException avec la raison 'closed' parce que la réponse sera déjà fermée avant d'entrer ceci lambda. onError
lambda devrait traiter les exceptions, particulièrement lesIOException
qui ne peut plus être pris avec l'using
opérateur comme c'était possible avec un try/catch avecdefer
.