Comment s'arrêter et reprendre Observable.intervalle émettant des tiques

ceci émettra une tique toutes les 5 secondes.

Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
            .subscribe(tick -> Log.d(TAG, "tick = "+tick));

Pour l'arrêter, vous pouvez utiliser

Schedulers.shutdown();

mais alors tous les ordonnanceurs s'arrêtent et il n'est pas possible de reprendre le tic-tac plus tard. Comment puis-je arrêter et reprendre le prononçant "gracieusement"`?

23
demandé sur Kirill Rakhman 2016-02-16 00:04:50

5 réponses

Voici une solution possible:

class TickHandler {

    private AtomicLong lastTick = new AtomicLong(0L);
    private Subscription subscription;

    void resume() {
        System.out.println("resumed");
        subscription = Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                                 .map(tick -> lastTick.getAndIncrement())
                                 .subscribe(tick -> System.out.println("tick = " + tick));
    }

    void stop() {
        if (subscription != null && !subscription.isUnsubscribed()) {
            System.out.println("stopped");
            subscription.unsubscribe();
        }
    }
}
30
répondu AndroidEx 2016-02-15 21:23:06
val switch = new java.util.concurrent.atomic.AtomicBoolean(true)
val tick = new java.util.concurrent.atomic.AtomicLong(0L)

val suspendableObservable = 
  Observable.
    interval(5 seconds).
    takeWhile(_ => switch.get()).
    repeat.
    map(_ => tick.incrementAndGet())

Vous pouvez définir switchfalse pour suspendre le tic-tac et true pour la reprendre.

7
répondu Sheng 2016-05-18 02:25:47

il y a quelque temps, je cherchais aussi des solutions de type "timer" de RX, mais aucune d'entre elles n'a répondu à mes attentes. Donc là, vous pouvez trouver ma solution:

AtomicLong elapsedTime = new AtomicLong();
AtomicBoolean resumed = new AtomicBoolean();
AtomicBoolean stopped = new AtomicBoolean();

public Flowable<Long> startTimer() { //Create and starts timper
    resumed.set(true);
    stopped.set(false);
    return Flowable.interval(1, TimeUnit.SECONDS)
            .takeWhile(tick -> !stopped.get())
            .filter(tick -> resumed.get())
            .map(tick -> elapsedTime.addAndGet(1000));
}

public void pauseTimer() {
    resumed.set(false);
}

public void resumeTimer() {
    resumed.set(true);
}

public void stopTimer() {
    stopped.set(true);
}

public void addToTimer(int seconds) {
    elapsedTime.addAndGet(seconds * 1000);
}
4
répondu Artur Szymański 2017-07-24 06:06:48

Voici une autre façon de le faire, je pense.

Lorsque vous vérifiez le code source, vous trouverez intervalle() utiliser la classe OnSubscribeTimerPeriodically. Le code de la clé ci-dessous.

@Override
public void call(final Subscriber<? super Long> child) {
    final Worker worker = scheduler.createWorker();
    child.add(worker);
    worker.schedulePeriodically(new Action0() {
        long counter;
        @Override
        public void call() {
            try {
                child.onNext(counter++);
            } catch (Throwable e) {
                try {
                    worker.unsubscribe();
                } finally {
                    Exceptions.throwOrReport(e, child);
                }
            }
        }

    }, initialDelay, period, unit);
}

ainsi, vous verrez, si vous voulez caneler la boucle, Que diriez-vous de lancer une nouvelle exception dans onNext (). Exemple de code ci-dessous.

Observable.interval(1000, TimeUnit.MILLISECONDS)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.i("abc", "onNext");
                    if (aLong == 5) throw new NullPointerException();
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    Log.i("abc", "onError");
                }
            }, new Action0() {
                @Override
                public void call() {
                    Log.i("abc", "onCompleted");
                }
            });

Ensuite, vous verrez ceci:

08-08 11:10:46.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:47.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:48.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:49.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:50.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.018 28146-28181/net.bingyan.test I/abc: onError             
2
répondu Chris Wong 2016-08-08 03:18:53

Désolé, c'est en RxJS au lieu de RxJava, mais le concept sera le même. J'ai adapté cette de apprendre-rxjs.io et c'est sur qu'ici codepen.

L'idée est que vous commencez avec deux flux d'événements click, startClick$ et stopClick$. Chaque clic se produisant sur le stopClick$ stream obtenir mappé à un vide observables, et clique sur startClick$ chacune est mappée à la interval$ stream. Les deux flux résultants obtiennent merge-d, ainsi que dans un observables-de-observables. En d'autres termes, un nouvel observable de l'un des deux types sera émis par merge chaque fois qu'il y a un clic. La résultante observable va passer par switchMap, qui commence à écouter ce nouvel observable et cesse d'écouter ce qu'il écoutait auparavant. Switchmap va également commencer à fusionner les valeurs de ce nouvel observable sur son flux existant.

Après le passage, scan seule la valeur "incrémentielle" émise par interval$, et il ne voit aucune valeur quand "stop" a été cliqué.

Et jusqu'au premier clic, startWith commencera à émettre des valeurs à partir de $interval, juste pour faire bouger les choses:

const start = 0;
const increment = 1;
const delay = 1000;
const stopButton = document.getElementById('stop');
const startButton = document.getElementById('start');
const startClick$ = Rx.Observable.fromEvent(startButton, 'click');
const stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
const interval$ = Rx.Observable.interval(delay).mapTo(increment);
const setCounter = newValue => document.getElementById("counter").innerHTML = newValue;
setCounter(start);

const timer$ = Rx.Observable

    // a "stop" click will emit an empty observable,
    // and a "start" click will emit the interval$ observable.  
    // These two streams are merged into one observable.
    .merge(stopClick$.mapTo(Rx.Observable.empty()), 
           startClick$.mapTo(interval$))

    // until the first click occurs, merge will emit nothing, so 
    // use the interval$ to start the counter in the meantime
    .startWith(interval$)

    // whenever a new observable starts, stop listening to the previous
    // one and start emitting values from the new one
    .switchMap(val => val)

    // add the increment emitted by the interval$ stream to the accumulator
    .scan((acc, curr) => curr + acc, start)

    // start the observable and send results to the DIV
    .subscribe((x) => setCounter(x));

et voici le HTML

<html>
<body>
  <div id="counter"></div>
  <button id="start">
    Start
  </button>
  <button id="stop">
    Stop
  </button>
</body>
</html>
1
répondu mikebridge 2017-06-25 04:26:19