RxJava 介绍

 

As shown below. (shown 是 show 的过去分词)

ReactiveX是一套异步编程模型,可以使用对偶性来推导。对偶性即导致相同的物理结果,而表面上不同的理论之间的对应。

Reactive Revolution

ReactiveX is more than an API, it’s an idea and a breakthrough in programming. It has inspired several other APIs, frameworks, and even programming languages.


public interface Iterable<T> {
    Iterator<T> getIterator(void);
}

public interface Iterator<T> {
    boolean getHasNext(void);

    T|error getNext(void);
}


public interface Iterable<T> {
    void setIterator(Iterator<T>);
}

public interface Iterable<T> {
    void setHasNext(boolean);
    void setNext(T|error);
}


public interface Iterable<T> {
    void setHasNext(false);
    void setNext(T);
    void setError(error);
}

public interface Observable<T> {
    void subscribe(Observer<? super T> observer);
}

public interface Observer<T> {
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

每个操作符类都是一个继承了 Observable 的实现类,然后又有一个内部类实现了 Observer 接口。

在 RxJava 中,每个操作符内部实现了一整套基于 Push 的接口体系,从而可以在整个调用链中发挥承上启下的作用。

实现异步

线程操作符会将上游传入的数据缓存起来,然后开启一个线程,在线程中传递出去。


public class ObserverOn<T> extends Observable<T> {

    // 指向上游传入的 Observable
    private Observable observable;

    private Scheduler scheduler;

    public ObserverOn(Observable observable, Scheduler scheduler) {
        this.observable = observable;
        this.scheduler = scheduler;
    }

    /**
     * 当真正的订阅发生时,这个方法会被调用,传入下游的 Observer。
     */
    @Override
    protected void subscribeActual(Observer observer) {
        // 先获得一个线程操作类,Work 封装了具体的线程
        Scheduler.Worker worker = scheduler.createWorker();
        // 创建一个内部类
        ObserverOnInner inner = ObserverOnInner(observer, worker);
        // 将整个内部类传递给上游
        // 上游的 Observable 获得内部类的引用
        // 当上游有数据时,会将数据传递给整个内部类。
        observable.subscribe(inner);
    }

    static final class ObserverOnInner<T> implements Observer<T>, Runnable, Disposable {
        // 持有下游的 observer
        private Observer<T> actual;

        // 切换的线程
        private Scheduler.Worker curWork;

        // 缓存的队列
        private Queue<T> cache;

        private boolean hasCalled = false;
        private boolean cancelled = false;
        private boolean done = false;
        private Throwable error = null;

        public ObserverOnInner(Observer<T> observer, Scheduler.Worker worker) {
            this.curWork = worker;
            actual = observer;
            cache = new ConcurrentLinkQueue<>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            actual.onSubscribe(this);
        }

        /**
         * 上游的 Observable 有数据时,会转入数据进来。
         */
        @Override
        public void onNext(T value) {
            // 因为需要切换线程,因此暂时把数据缓存起来
            cache.offer(value)
            // 启动新线程
            schedule();
        }

        @Override
        public void onError(Throwable e) {
            // 上游发生错误时的处理
            error = e;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            // 完成的处理
            done = true;
            schedule();
        }

        void schedule() {
            // 这个方法只需要调用一次,因为只需要拉起一次线程
            if (!hasCalled) {
                hasCalled = true;
                // curWork 背后的线程会执行这个 runnable
                curWork.schedle(this);
            }
        }

        // 新线程启动后,会执行这个
        @Override
        public void run() {
            // 此时,代码已经在新得线程环境中执行了。
            for(;;) {
                if (cancelled) {
                    return;
                }
                // onError(), onComplete() 方法的处理
                if (done) {
                    if (error != null) {
                        actual.onError(error);
                        return;
                    }
                    actual.onComplete();
                    return;
                }
                T t = cache.poll();
                if (t != null) {
                    // 将数据传递给下游的 Observer
                    actual.onNext(t);
                }
            }
        }
        // 这个接口用于取消订阅
        @Override
        public void dispose() {
            cancelled = true;
        }
    }
}

EOF


Power by TeXt.