Rxjava2源码解析
1:用法:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { //Log.d(TAG, "ObservableEmitter"); //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName()); emitter.onNext(12); emitter.onNext(13); emitter.onNext(14); emitter.onNext(15); emitter.onComplete(); } }); Observer<Integer> observer = new Observer<Integer>() { private int i; private Disposable mDisposable; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); mDisposable = d; } @Override public void onNext(Integer integer) { /*i++; if(i == 3){ mDisposable.dispose(); }*/ Log.d(TAG, "onNext" + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; //observable.subscribe(observer); observable.doOnSubscribe(disposable -> { Log.d(TAG, "doOnSubscribe"); } ).doOnComplete(() -> { Log.d(TAG, "doOnComplete"); }).doOnNext((C) -> { Log.d(TAG, "doNext" + C); }).subscribe(observer);
2:Observable
首先看Observable:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { //Log.d(TAG, "ObservableEmitter"); //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName()); emitter.onNext(12); emitter.onNext(13); emitter.onNext(14); emitter.onNext(15); emitter.onComplete(); } });
看一下create:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
参数ObservableOnSubscribe是一个接口,里面只有一个函数subscribe:
public interface ObservableOnSubscribe<T> { /** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */ void subscribe(ObservableEmitter<T> e) throws Exception; }
create需要返回的是一个Observable:
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
通过在这里可知反回的是一个Observable的继承类ObservableCreate:
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
。。。。。。。。。。
总结以上的几个类,可以归纳一下:
通过Observable.create返回一个Observable,具体是返回ObservableCreate,该类继承Observable,同时该类持有ObservableOnSubscribe,而ObservableOnSubscribe是一个接口,具体的实现是在:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { //Log.d(TAG, "ObservableEmitter"); //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName()); emitter.onNext(12); emitter.onNext(13); emitter.onNext(14); emitter.onNext(15); emitter.onComplete(); } });
Observable的解析暂时先到这里,我们先看如何和observer关联起来:
//observable.subscribe(observer); observable.doOnSubscribe(disposable -> { Log.d(TAG, "doOnSubscribe"); } ).doOnComplete(() -> { Log.d(TAG, "doOnComplete"); }).doOnNext((C) -> { Log.d(TAG, "doNext" + C); }).subscribe(observer);
进入subscribe:
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
可以看到这是Observable里面的一个方法,前面我们说到,create生成的是ObservableCreate,而该类继承Observable,所以我们现在就是在ObservableCreate的subscribe方法里面,看参数,传进来的是一个observer,observer也是一个interface,具体实现就是应用层的:
Observer<Integer> observer = new Observer<Integer>() { private int i; private Disposable mDisposable; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); mDisposable = d; } @Override public void onNext(Integer integer) { /*i++; if(i == 3){ mDisposable.dispose(); }*/ Log.d(TAG, "onNext" + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } };
到这里再来总结一下,通过subscribe,就是ObservableCreate这个类(该类继承Observable,同时该类持有ObservableOnSubscribe,而ObservableOnSubscribe是一个接口)执行subscribe,传参为observer,是一个实现为用户层定义的接口。接下来就是具体看一下subscribe这个函数是如何走的:
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
首先看:
subscribeActual(observer);
点进这个函数看到是一个Observable里面的一个abstract函数,那实现是在哪里呢,前面一直提到的,我们现在其实是处于ObservableCreate这个类,进入该类找到subscribeActual这个函数:
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
先看这句:
source.subscribe(parent);
先看看source是怎么来的:
final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { //Log.d(TAG, "ObservableEmitter"); //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName()); emitter.onNext(12); emitter.onNext(13); emitter.onNext(14); emitter.onNext(15); emitter.onComplete(); } });
一层层往上追溯可以看到sorce其实就是用户层实现的那个接口:
public interface ObservableOnSubscribe<T> { /** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */ void subscribe(ObservableEmitter<T> e) throws Exception; }
会到:
source.subscribe(parent);
这里的parent是CreateEmitter,先看subscribe:
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { //Log.d(TAG, "ObservableEmitter"); //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName()); emitter.onNext(12); emitter.onNext(13); emitter.onNext(14); emitter.onNext(15); emitter.onComplete(); }
前面看到subscribe的参数是parent:
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
这里定义了一个CreateEmitter,持有observer:
implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } } else { RxJavaPlugins.onError(t); } } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } }
我们来看一下onNext是如何执行的,当执行到:
emitter.onNext(12);
的时候,就到了:
@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } }
看这句:
observer.onNext(t);
我们前面讲到了,这里的observer,就是用户层自定义的:
Observer<Integer> observer = new Observer<Integer>() { private int i; private Disposable mDisposable; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); mDisposable = d; } @Override public void onNext(Integer integer) { /*i++; if(i == 3){ mDisposable.dispose(); }*/ Log.d(TAG, "onNext" + integer); }
到这里,Observable里面的onNext和Observer的onNext就联系起来了,Observavble发送一个,Observer执行一个。onError和onComplete同理。