你的位置:首页 > 操作系统

[操作系统]RxJava 和 RxAndroid 五(线程调度)

本文将有几个例子说明,rxjava线程调度的正确使用姿势。

例1

 Observable        .create(new Observable.OnSubscribe<String>() {          @Override          public void call(Subscriber<? super String> subscriber) {            Logger.v( "rx_call" , Thread.currentThread().getName() );            subscriber.onNext( "dd");            subscriber.onCompleted();          }        })        .map(new Func1<String, String >() {          @Override          public String call(String s) {            Logger.v( "rx_map" , Thread.currentThread().getName() );            return s + "88";          }        })        .subscribe(new Action1<String>() {          @Override          public void call(String s) {            Logger.v( "rx_subscribe" , Thread.currentThread().getName() );          }        }) ;

  结果

/rx_call: main           -- 主线程
/rx_map: main        --  主线程
/rx_subscribe: main   -- 主线程

例2

  new Thread(new Runnable() {      @Override      public void run() {        Logger.v( "rx_newThread" , Thread.currentThread().getName() );        rx();      }    }).start(); void rx(){    Observable        .create(new Observable.OnSubscribe<String>() {          @Override          public void call(Subscriber<? super String> subscriber) {            Logger.v( "rx_call" , Thread.currentThread().getName() );            subscriber.onNext( "dd");            subscriber.onCompleted();          }        })        .map(new Func1<String, String >() {          @Override          public String call(String s) {            Logger.v( "rx_map" , Thread.currentThread().getName() );            return s + "88";          }        })        .subscribe(new Action1<String>() {          @Override          public void call(String s) {            Logger.v( "rx_subscribe" , Thread.currentThread().getName() );          }        }) ;  }

 

      结果

/rx_newThread: Thread-564   -- 子线程
/rx_call: Thread-564              -- 子线程
/rx_map: Thread-564            -- 子线程 
/rx_subscribe: Thread-564    -- 子线程

 

  • 通过例1和例2,说明,Rxjava默认运行在当前线程中。如果当前线程是子线程,则rxjava运行在子线程;同样,当前线程是主线程,则rxjava运行在主线程

 

例3

 Observable        .create(new Observable.OnSubscribe<String>() {          @Override          public void call(Subscriber<? super String> subscriber) {            Logger.v( "rx_call" , Thread.currentThread().getName() );            subscriber.onNext( "dd");            subscriber.onCompleted();          }        })        .subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .map(new Func1<String, String >() {          @Override          public String call(String s) {            Logger.v( "rx_map" , Thread.currentThread().getName() );            return s + "88";          }        })        .subscribe(new Action1<String>() {          @Override          public void call(String s) {            Logger.v( "rx_subscribe" , Thread.currentThread().getName() );          }        }) ;

  结果

/rx_call: RxCachedThreadScheduler-1    --io线程
/rx_map: main                                     --主线程
/rx_subscribe: main                              --主线程

 

例4

 Observable        .create(new Observable.OnSubscribe<String>() {          @Override          public void call(Subscriber<? super String> subscriber) {            Logger.v( "rx_call" , Thread.currentThread().getName() );            subscriber.onNext( "dd");            subscriber.onCompleted();          }        })        .map(new Func1<String, String >() {          @Override          public String call(String s) {            Logger.v( "rx_map" , Thread.currentThread().getName() );            return s + "88";          }        })        .subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Action1<String>() {          @Override          public void call(String s) {            Logger.v( "rx_subscribe" , Thread.currentThread().getName() );          }        }) ; 

      结果

/rx_call: RxCachedThreadScheduler-1     --io线程
/rx_map: RxCachedThreadScheduler-1   --io线程
/rx_subscribe: main                              --主线程

   

  • 通过例3、例4 可以看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 写的位置不一样,造成的结果也不一样。从例4中可以看出 map() 操作符默认运行在事件产生的线程之中。事件消费只是在 subscribe() 里面。
  • 对于 create() , just() , from()   等                 --- 事件产生   

               map() , flapMap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消费

  •   事件产生:默认运行在当前线程,可以由 subscribeOn()  自定义线程

         事件加工:默认跟事件产生的线程保持一致, 可以由 observeOn() 自定义线程

       事件消费:默认运行在当前线程,可以有observeOn() 自定义

 

例5  多次切换线程

 

Observable        .create(new Observable.OnSubscribe<String>() {          @Override          public void call(Subscriber<? super String> subscriber) {            Logger.v( "rx_call" , Thread.currentThread().getName() );            subscriber.onNext( "dd");            subscriber.onCompleted();          }        })        .observeOn( Schedulers.newThread() )  //新线程        .map(new Func1<String, String >() {          @Override          public String call(String s) {            Logger.v( "rx_map" , Thread.currentThread().getName() );            return s + "88";          }        })        .observeOn( Schedulers.io() )   //io线程        .filter(new Func1<String, Boolean>() {          @Override          public Boolean call(String s) {            Logger.v( "rx_filter" , Thread.currentThread().getName() );            return s != null ;          }        })        .subscribeOn(Schedulers.io())   //定义事件产生线程:io线程        .observeOn(AndroidSchedulers.mainThread())   //事件消费线程:主线程        .subscribe(new Action1<String>() {          @Override          public void call(String s) {            Logger.v( "rx_subscribe" , Thread.currentThread().getName() );          }        }) ;

  结果

/rx_call: RxCachedThreadScheduler-1           -- io 线程
/rx_map: RxNewThreadScheduler-1             -- new出来的线程
/rx_filter: RxCachedThreadScheduler-2        -- io线程
/rx_subscribe: main                                   -- 主线程