observeOn
observeOn用来切换事件流下游所在的线程;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24Observable.create<Int> {
it.onNext(1)
it.onNext(2)
it.onNext(3)
it.onComplete()
}.map {
it.toString()
}.flatMap {
Observable.just(it)
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object :Observer<String>{
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
}
override fun onError(e: Throwable) {
}
})
流程
1 | @CheckReturnValue |
与其他的操作符一样的套路,关键逻辑在ObservableObserveOn中:
1 | public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { |
在subscribeActual方法中,主要是创建一个我们在observeOn时指定线程的worker,然后调用上游操作
符Observable的subscribe方法,看上去和普通操作符订阅流程没什么区别,这是因为observeOn指定的
线程是作用在数据发送流程中的,和订阅流程没任何关系;
要让线程作用在数据发送流程,关键要靠ObserveOnObserver;
ObserveOnObserver
包装了Observer,并在指定的线程中回调下游Observer的onXXX,让数据在指定的线程中发送;
1 | static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> |
ObserveOnObserver是如何实现线程的切换的?
在数据发送的过程中,上游的Observer回调了ObserveOnObserver的onNext方法,将发送数据传递给它,
此时在ObserveOnObserver的onNext方法中会通过worker(由我们调用observeOn方法传递的Scheduler
进行创建),在指定的线程中回调下游的onNext方法,这样就实现了下游的线程切换;
因为observeOn是作用在数据的发送流程上,所以每次调用立即生效,并且都能让后面发送流程切换到对应
的线程中(实际就是让下游的onNext切换到对应线程中回调),所以每次调用observeOn都是有效的!
ObserveOnObserver的onSubscribe的处理方式?
ObserveOnObserver也实现了onSubscribe方法,在这个方法中,将上游传递过来的Disposable拦截下
来,并保存为成员变量,并将自身作为新的Disposable传递给下游的Observer。
实际上,大部分实例操作符的Observer包装类都是这么做的!
ObserveOnObserver实现数据发送状态同步的原理?
ObserveOnObserver继承了AtomicInteger,该类提供原子的get和set操作,同步模式下,实现状态同
步流程如下:
1、当上游通过onNext发送数据过来时,先将数据入队,然后调用schedule方法,检测是否开启线程发送数
据;
2、每次执行schedule方法时,都会调用AtomicInteger的getAndIncrement方法,让AtomicInteger
的value加1,然后返回旧值,如果旧值是0,说明之前队列为空,并且没有处于发送数据的状态,可以开始
发送数据。否则说明正处于发送数据状态中,因为数据已经入队了,所以不需要管,数据会自动被出队发送;
3、每次发送一个数据,都会调用AtomicInteger的addAndGet方法,这个方法同样是原子操作,每次调用
让AtomicInteger的value加-1,表示队列已经取出并发送了一个数据,当value为0时,表示队列数据发
送完毕,此时再调用schedule方法中的getAndIncrement方法,返回的就是0;
总结:ObserveOnObserver通过原子操作AtomicInteger的value,来实现数据发送状态的同步,当value
大于0时,说明当前正处于数据发送状态,只需要将数据入队即可,worker开启的线程会不断的从队列中取出
数据,发送出去;否则value等于0,表明没有处于数据发送状态,则会通过worker schedule一个线程出
来,从队列中取出数据进行发送;
需要注意的是,这种方式同样适用于异步的模式。因为异步模式下,并没有真正发送数据,而是发送null!
在异步模式下,只需要循环检测AtomicInteger的value是否为0,如果为0,则退出发送数据,否则一直
循环发送null。
ObserveOnObserver的两种数据发送模式?异步模式下对数据的处理方式?
ObserveOnObserver实现了QueueDisposable接口,该接口提供两种数据发送模式:SYNC(同步模式),
ASYNC(异步模式);
默认情况下,ObserveOnObserver是同步的,并且是通过队列来实现同步。但是,我们可以通过设置方法:1
2
3
4
5
6
7
8@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
传入ASYNC参数,让ObserveOnObserver进行异步的数据发送;
在异步的情况下,Worker线程会执行run方法,并调用drainFused(),进行数据的异步发送,而在异步情况
下,ObserveOnObserver并没有发送上游传递过来的数据,而是发送空数据null!
并且当上游也是处于异步模式的情况下,传递数据过来时,ObserveOnObserver根本就没有接收数据,而是
直接抛弃了:1
2
3
4
5
6
7
8
9
10
11
12@Override
public void onNext(T t) {
if (done) {
return;
}
// sourceMode在onSubscribe方法执行时,会记录上游的fusion mode
// 在fusion mode是非异步模式才将数据入队,否则直接忽略
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
个人猜测,异步模式下,ObserveOnObserver主要目的已经不是发送数据,而是发送信号通知。
ObserveOnObserver dispose时的特点?
1 | @Override |
ObserveOnObserver继承自AtomicInteger,不像其他继承自AtomicReference
包装类,ObserveOnObserver无法原子的set和get自身,所在dispose自身时,不是通过为自身设置一个
DISPOSED常量来表示,而是通过设置cancelled,这个volatile修饰的变量来表示自身的状态;