你的位置:首页 > 操作系统

[操作系统]RxJava漫谈


RxJava在github上的地址:https://github.com/ReactiveX/RxJava

RxAndroid在github上的地址:https://github.com/ReactiveX/RxAndroid

 

本文主要介绍RxAndroid的使用,如果对于RxJava还不熟悉的可以先看一下RxJava的介绍文章。

Android的程序是用Java书写的,Android也有一些自己的线程模型,例如AsyncTask和Handler等。RxJava正是结合了前面的这几项,在此基础上推出了RxAndroid。下面介绍使用。

首先,我们在项目中引入RxAndroid,主要是在gradle脚本中引入下面两句话即可。

dependencies {  compile fileTree(dir: 'libs', include: ['*.jar'])  testCompile 'junit:junit:4.12'  compile 'com.android.support:appcompat-v7:23.1.1'  compile 'com.android.support:design:23.1.1'  //引入RxAndroid----begin  compile 'io.reactivex:rxandroid:1.1.0'  compile 'io.reactivex:rxjava:1.1.0'  //引入RxAndroid----end}

这样就可以在Android代码中使用RxAndroid了,下面的例子展示了一段使用RxAndroid书写的代码:

Observable.just("one", "two", "three", "four", "five")        .subscribeOn(Schedulers.newThread())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(/* an Observer */);

这一段代码是在RxAndroid的官网上写的一段示例的代码。这里可以看出来,与RxJava相比较,其主要增加了如下的几个java文件:

  AndroidSchedulers  BuildConfig  HandlerScheduler  MainThreadSubscription  RxAndroidPlugins  RxAndroidSchedulersHook

下面分别对这几个文件的源码看一下:

1.AndroidSchedulers类的源码

public final class AndroidSchedulers {  private AndroidSchedulers() {    throw new AssertionError("No instances");  }  private static class MainThreadSchedulerHolder {    static final Scheduler MAIN_THREAD_SCHEDULER =        new HandlerScheduler(new Handler(Looper.getMainLooper()));  }  public static Scheduler mainThread() {    Scheduler scheduler =        RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();    return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER;  }}

这个类主要提供了MainThread调度器,在RxAndroid中需要在主线程中处理的食物都需要用到这个类的mainThread。

2.BuildConfig类的源码:

public final class BuildConfig { public static final boolean DEBUG = false; public static final String APPLICATION_ID = "rx.android"; public static final String BUILD_TYPE = "release"; public static final String FLAVOR = ""; public static final int VERSION_CODE = -1; public static final String VERSION_NAME = "";}

主要是一些常量配置。

3.HandlerScheduler类的源码:

public final class HandlerScheduler extends Scheduler {  public static HandlerScheduler from(Handler handler) { //从一个Handler中创建一个Scheduler    if (handler == null) throw new NullPointerException("handler == null");    return new HandlerScheduler(handler);  }  private final Handler handler;  HandlerScheduler(Handler handler) {    this.handler = handler;  }  @Override  public Worker createWorker() {//覆盖Scheduler的createWorker函数,创建基于Handler的Worker    return new HandlerWorker(handler);  }  static class HandlerWorker extends Worker {    private final Handler handler;    private final CompositeSubscription compositeSubscription = new CompositeSubscription();    HandlerWorker(Handler handler) {      this.handler = handler;    }    @Override    public void unsubscribe() {      compositeSubscription.unsubscribe();    }    @Override    public boolean isUnsubscribed() {      return compositeSubscription.isUnsubscribed();    }    @Override    public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {//覆盖Worker的调度函数schedule      if (compositeSubscription.isUnsubscribed()) {        return Subscriptions.unsubscribed();      }      action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);      final ScheduledAction scheduledAction = new ScheduledAction(action);      scheduledAction.addParent(compositeSubscription);      compositeSubscription.add(scheduledAction);      handler.postDelayed(scheduledAction, unit.toMillis(delayTime));//使用Handler处理这个调度动作ScheduleAction      scheduledAction.add(Subscriptions.create(new Action0() {        @Override        public void call() {          handler.removeCallbacks(scheduledAction);//这句话保证当调度动作被取消的时候,能够及时把这个action从Handler中移除        }      }));      return scheduledAction;    }    @Override    public Subscription schedule(final Action0 action) {      return schedule(action, 0, TimeUnit.MILLISECONDS);    }  }}

HandlerScheduler 类就是使用Handler作为处理核心的Scheduler类。

4.MainThreadSubscription类的源码:

public abstract class MainThreadSubscription implements Subscription { public static void verifyMainThread() { //静态方法,判断当前线程是否是主线程  if (Looper.myLooper() != Looper.getMainLooper()) {   throw new IllegalStateException(     "Expected to be called on the main thread but was " + Thread.currentThread().getName());  } } private final AtomicBoolean unsubscribed = new AtomicBoolean(); @Override public final boolean isUnsubscribed() {  return unsubscribed.get(); } @Override public final void unsubscribe() {//主线程的取消订阅  if (unsubscribed.compareAndSet(false, true)) {   if (Looper.myLooper() == Looper.getMainLooper()) {//如果是主线程直接进行    onUnsubscribe();   } else {    AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {//如果不是主线程,就创建创建一个Action放到主线程中去执行     @Override public void call() {      onUnsubscribe();     }    });   }  } } protected abstract void onUnsubscribe();}

MainThreadSubscription类主要在意的就是unsubscribe的执行线程,这里采取一切方式保证其在主线程中执行。

5.RxAndroidPlugins类的源码:

public final class RxAndroidPlugins {//这个类的主要作用就是维护了一个RxAndroidSchedulersHook  private static final RxAndroidPlugins INSTANCE = new RxAndroidPlugins();  public static RxAndroidPlugins getInstance() {    return INSTANCE;  }  private final AtomicReference<RxAndroidSchedulersHook> schedulersHook =      new AtomicReference<RxAndroidSchedulersHook>();  RxAndroidPlugins() {  }  @Beta  public void reset() {    schedulersHook.set(null);  }  public RxAndroidSchedulersHook getSchedulersHook() {    if (schedulersHook.get() == null) {      schedulersHook.compareAndSet(null, RxAndroidSchedulersHook.getDefaultInstance());//如果原来是null,就设置一个RxAndroidSchedulersHook      // We don't return from here but call get() again in case of thread-race so the winner will      // always get returned.    }    return schedulersHook.get();  }  public void registerSchedulersHook(RxAndroidSchedulersHook impl) {    if (!schedulersHook.compareAndSet(null, impl)) {//如果原来的RxAndroidSchedulerHook是空,则直接持有,否则抛出异常      throw new IllegalStateException(          "Another strategy was already registered: " + schedulersHook.get());    }  }}

可以看出RxAndroidSchedulerHook必须在使用前注册,一旦使用就不能再注册了。

6.RxAndroidSchedulersHook类的源码:

public class RxAndroidSchedulersHook {  private static final RxAndroidSchedulersHook DEFAULT_INSTANCE = new RxAndroidSchedulersHook();  public static RxAndroidSchedulersHook getDefaultInstance() {   return DEFAULT_INSTANCE;  }  public Scheduler getMainThreadScheduler() {    return null;  }  public Action0 onSchedule(Action0 action) {    return action;  }}

这是RxAndroid提供的一个默认的RxAndroidSchedulerHook类,程序员也可以自己定义一个这样的类注册到RxAndroidPlugins中,但是必须在使用RxAndroidPlugins之前注册。

自定义的RxAndroidSchedulerHook类可以覆盖onSchedule函数,在这里进行一些处理,例如日志记录等。

 

以上只是说明了RxAndroid与RxJava中不一样的地方,并没有尝试说明RxJava是什么,在阅读本文之前,读者应该先弄明白这个问题。

现在再来看之前的两个例子就很明白了:

public class ReactiveFragment extends Fragment {//在UI线程中的例子  @Override  public void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    Observable.just("one", "two", "three", "four", "five")        .subscribeOn(Schedulers.newThread())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(/* an Observer */);  }

new Thread(new Runnable() {//在其他线程中的例子  @Override  public void run() {    final Handler handler = new Handler(); //绑定到这个线程的Handler    Observable.just("one", "two", "three", "four", "five")        .subscribeOn(Schedulers.newThread())        .observeOn(HandlerScheduler.from(handler))        .subscribe(/* an Observer */)    // perform work, ...  }}, "custom-thread-1").start();