java架构师必备高可用架构(高级架构师之路---RxJava原理分析)

一、什么是RXJAVARXJAVA是一个,用来支持我们需求里需要异步操作的地方它比起handler等异步操作的实现方式来说,显得更为简洁把整个操作整合成一条流水线,从上游到下游都能够看的清,我来为大家科普一下关于java架构师必备高可用架构?下面希望有你要的答案,我们一起来看看吧!

java架构师必备高可用架构(高级架构师之路---RxJava原理分析)

java架构师必备高可用架构

一、什么是RXJAVA

RXJAVA是一个,用来支持我们需求里需要异步操作的地方。它比起handler等异步操作的实现方式来说,显得更为简洁。把整个操作整合成一条流水线,从上游到下游都能够看的清。

二、RXJAVA的原理

RXJAVA的实现,是一种扩展式的观察者模式

RXJAVA中有四种概念。observable(被观察者),Observer(观察者),subscribe(订阅),事件。Observable和Observer通过subscribe来实现订阅关系。与传统的观察者模式不同,除了onNext事件外,Rxjava还提供了onCompleted和onError。当不再有onNext事件发送时,将以onCompleted事件作为结束。当处理过程中出现异常时,会触发onError,同时队列自动终止,不允许再有事件发出。onCompleted和onError在一个序列中有且只有一个,二者互斥,只能出现一个

subscribeOn和observeOn

subscribeOn调用可以将之前的操作加如线程池,从而保证运行于子线程中,observeOn会使后边的执行运行于主线程,这里的之前和后边均是指的代码结构上的前后。

subscribeOn

经过分析可知道,当subscribeOn调用的时候,会创建一个ObservableSubscribeOn对象返回,与此同时,上一级产生的对象会被保存在当前对象的source变量中,并且,将创建出一个线程池,先看线程池的创建,这里直接以io线程为例
Schedulers.io(Schedulers.io())

public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); }

其中的IO是在Schedulers类加载的时候就创建出来的,从这个结构可以看出,IO就是IoScheduler对象RxJavaPlugins.initIoScheduler方法接收一个Callable线程,返回callable.call,也就是call方法中返回的就是这个函数的返回值(Callable是另一种开启线程的方式,这个线程有返回值,当返回值获取到之前,会阻塞当前线程)

IO = RxJavaPlugins.initIoscheduler(new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return IoHolder.DEFAULT; } }); static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); }

那么IoScheduler是什么?当IoScheduler创建的时候

public IoScheduler() { this.pool = new AtomicReference<CachedworkerPool>(NONE); start(); } @Override public void start() { CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } }

NONE是IoScheduler中创建的一个线程池,所以IoScheduler其实就是一个封装好了的线程池对象

static final CachedWorkerPool NONE; static NONE = new CachedWorkerPool(0, null); } CachedWorkerPool(long keepAliveTime, TimeUnit unit) { this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L; this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>(); this.allWorkers = new CompositeDisposable(); ScheduledExecutorService evictor = null; Future<?> task = null; if (unit != null) { evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS); } evictorService = evictor; evictorTask = task; }

Schedulers.io(Schedulers.io())的调用,执行了两个动作,第一,保存上一级的对象,第二创建线程池

observeOn(AndroidSchedulers.mainThread())

接下来来看主线程的切换,调用observeOn方法,创建ObservableObserveOn对象,同样保存上一级产生的对象到source中,这里指的就是subscribeOn返回的对象ObservableSubscribeOn,并且保存传入的Scheduler--AndroidSchedulers.mainThread()

@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }

进入AndroidSchedulers.mainThread(),与上边同样的写法,最后返回HandlerScheduler

public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } }); private static final class MainHolder { //可以猜测这个HandlerScheduler是一个通过对Handler进行封装 //运行于主线程的线程,可以看到Looper.getMainLooper()传入了一个主线程的 //looper对象,事实上也是如此 static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); } }

所以,很类似,observeOn(AndroidSchedulers.mainThread())同样是做了两件事,保存source和Scheduler,那么两种线程是如何进行调度的,其实看到这里,还没有进入正题,真正的逻辑其实在subscribe方法上。

subscribe

以subscribe(new Observer<String>())为例说明(new Consumer最终源码也是相同的),调用subscribe方法后会来到Observable的抽象方法subscribeActual中,所以我们要到当前Observable实现类中找这个方法,按照上边程序调用的顺序,此时,调用subscribe方法的对象是observeOn方法产生的ObservableObserveOn,进入这个类,找到subscribeActual方法

@Override protected void subscribeActual(Observer<? super T> observer) { //这个scheduler是指AndroidSchedulers.mainThread(),也就是HandlerScheduler if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //创建一个worker Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }

进入HandlerScheduler找到createWorker方法,这里创建了一个HandlerWorker对象,看到这里大概也可以猜测一下,HandlerWorker中的schedule方法将会是一个关键传入的handler是主线程中的handler,明显是要通过消息机制发送到主线程执行,问题的关键,在于是怎么发送到主线程执行的,schedule方法的具体执行我们暂且不看,按照程序执行顺序继续往下走

@Override public Worker createWorker() { return new HandlerWorker(handler); } private static final class HandlerWorker extends Worker { private final Handler handler; private volatile boolean disposed; HandlerWorker(Handler handler) { this.handler = handler; } @Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ...... ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); ...... return scheduled; } ...... }

在创建了worker之后,调用方法subscribe,source很明显是ObservableObserveOn对象创建的时候所保存的上一级的调用subscribeOn方法产生的ObservableSubscribeOn对象,通过这个对象调用subscribe方法,又会进入到ObservableSubscribeOn的subscribeActual方法。observer指的是我们代码中传入的observer(subscribe时new的那个),这里对observer封装了一层,以ObserveOnObserver的形式传入到ObservableSubscribeOn的subscribeActual方法中,向上层传递了一级,可以参考08.RxJava运作流程源码分析中提供的流程图

//source指ObservableSubscribeOn对象 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

来到ObservableSubscribeOn的subscribeActual

@Override //参数s指的时对observer封装了一层之后的ObserveOnObserver(new ObserveOnObserver(new Observer )) public void subscribeActual(final Observer<? super T> s) { //对ObserveOnObserver对象进行一次封装 //此时Observer已经被封装了两层 //(new SubscribeOnObserver(new ObserveOnObserver(new Observer))) final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //调用ObserveOnObserver对象的onSubscribe s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }

看看ObserveOnObserver的onSubscribe方法

@Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; //注意这里这个判断这次是不会满足的,也就是这里的代码不会走 if (s instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposable<T> qd = (QueueDisposable<T>) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); //同步,如果是要同步执行,就是指如果设置了在主线程执行,那么 //就执行schedule(),往下看可以发现是使用我我们创建的worker //发送到主线程执行 if (m == QueueDisposable.SYNC) { sourceMode = m; queue = qd; done = true; //actual指的就是我们传入的最原始的那个observer actual.onSubscribe(this); schedule(); return; } //异步,如果是异步执行,直接在当前线程执行,当前线程也就是子线程 if (m == QueueDisposable.ASYNC) { sourceMode = m; queue = qd; actual.onSubscribe(this); return; } queue = new SpscLinkedArrayQueue<T>(bufferSize); //actual是我们new的那个Observer,所以这里直接回调了onSubscribe方法 actual.onSubscribe(this); } }

scheduler就是Schedulers.io()得到的就是IoSchedule对象,在上边分析subscribeOn方法时我们已经知道这个对象是一个线程池调用scheduleDirect方法就是将SubscribeTask这个Runnable放进了线程池执行,并且是在子线程中

@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }

createWorker()是个抽象类,在IoSchedule中找到重写的方法

@Override public Worker createWorker() { return new EventLoopWorker(pool.get()); }

所以这样一来也就是说new SubscribeTask(parent))这个Runnable被放入了线程池执行,这时候会调用它的run方法,这样就又回到了调用上一级产生对象的subscribe方法中去了,不同的是此时subscribe已经是在线程池中执行了(子线程)

@Override public void run() { source.subscribe(parent); }

就这样一级一级的往上调用,下一个会走到ObservableMap的subscribeActual方法,最后走到ObservableJust的subscribeActual,s.onSubscribe(sd)方法并没有执行什么东西,onSubscribe在之前已经被调用了,重点在 sd.run()

@Override protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); s.onSubscribe(sd); sd.run(); }

终于在这里要看到onNext onComplete方法的执行了

@Override public void run() { if (get() == START && compareAndSet(START, ON_NEXT)) { //observer 是 new MapObserver(new SubscribeOnObserver(new ObserveOnObserver(new Observer(){......}))) observer.onNext(value); if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } }

此时一层一层的调用到这里,observer对象已经是经过层层封装包裹的observer了(new MapObserver(new SubscribeOnObserver(new ObserveOnObserver(new Observer(){......})))),所以调用observer.onNext会首先执行MapObserver中的onNext,不管用户调用了几次map操作符,都会一个一个的通过回调onNext方法执行完成(如果有多个map方法被调用,当执行完一个apply方法后,后边的actual.onNext就会进入下一个MapObserver中的onNext方法),当执行到最后一个onNext方法的时候,此时这个actual表示的就是SubscribeOnObserver对象了,也就会去执行它里边的onNext

@Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { //执行apply方法,也就是map操作符中的回调方法 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); }

SubscribeOnObserver中的onNext,这里的actual指的是ObserveOnObserver,所以又要去执行它的onNext

@Override public void onNext(T t) { actual.onNext(t); }

ObserveOnObserver中的onNext

@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { //这个worker是AndroidScheduler.mainThread得到的一个运行于主线程的封装类 HandlerWorker worker.schedule(this); } }

在分析observeOn方法的时候我们已经知道这个worker是AndroidScheduler.mainThread得到的一个运行于主线程的封装类 HandlerWorker ,worker.schedule(this)传入的是一个Runnable,也就是会在主线程中执行这个Runnable,我们找到重写的run方法。终于找到onNext和onComplete的最终执行的地方了,并且我们知道,这两个方法是在主线程执行的

@Override public void run() { if (outputFused) { drainFused(); } else { //会执行这个,上边那个先不管 drainNormal(); } } void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }

到这里,RxJava线程调度的实现方式基本上我们已经了解了。
这里可以插一个题外话,通常我们使用handler发送的消息都是在handleMessage方法中执行,但是这里我们无论如何找不到这个方法的实现,那么handler是如何处理消息的?

@Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ...... ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); ...... return scheduled; }

可以看到这里Message message = Message.obtain(handler, scheduled),看一下obtain方法的源码会发现传入的第二个参数是一个callback,保存到了message的成员变量m.callback中,当handler调用sendMessageDelayed会将消息加入主线程的消息队列(因为handler就是主线程的handler),我们知道应用启动就会初始化一个主线程的handler一个looper和messageQueue(对消息机制不理解的可以看另一篇15.源码阅读(安卓消息机制)),调用looper.loop开启一个无限循环不断的从主线程消息队列中取消息,我们看看它是如何取的

public static void loop() { for (;;) { Message msg = queue.next(); // might block ...... msg.target.dispatchMessage(msg); } }

无限循环中取到message后会执行发送这个Message的handler中的dispatchMessage方法,这时候会判断callback也就是我们上边那个传入的,如果它不能与null,就执行handleCallback,执行callback的run方法,找到这里终于找到为什么没有handlerMessage仍然可以处理消息了

public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } } private static void handleCallback(Message message) { message.callback.run(); }

传入的callback是哪个,就是 Message message = Message.obtain(handler, scheduled)中的schedule,schedule是哪个ScheduledRunnable ,也就是说执行的是ScheduledRunnable 的run方法,delegate就是ScheduledRunnable 中传入的那个runnable,追溯上去,这个runnable就是worker.schedule(this)中的this,所以可以找到重写的run方法

@Override public void run() { try { delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } }


专注于技术热点大数据,人工智能,JAVA、Python、 C 、GO、Javascript等语言最新前言技术,及业务痛点问题分析,请关注【编程我最懂】共同交流学习。

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页