星空网 > 软件开发 > 操作系统

RxJava 和 RxAndroid 五(线程调度)

本文将有几个例子说明,rxjava线程调度的正确使用姿势。

例1

 Observable        .create(new Observable.OnSubscribe<String>() {          @Override          public void call(Subscriber<? super String> subscriber) {            Logger.v( "rx_call" , Thread.currentThread().getName() );            subscriber.onNext( "dd");            subscriber.onCompleted();          }        })        .map(new Func1<String, String >() {          @Override          public String call(String s) {            Logger.v( "rx_map" , Thread.currentThread().getName() );            return s + "88";          }        })        .subscribe(new Action1<String>() {          @Override          public void call(String s) {            Logger.v( "rx_subscribe" , Thread.currentThread().getName() );          }        }) ;

  结果

/rx_call: main           -- 主线程
/rx_map: main        --  主线程
/rx_subscribe: main   -- 主线程

例2

  new Thread(new Runnable() {      @Override      public void run() {        Logger.v( "rx_newThread" , Thread.currentThread().getName() );        rx();      }    }).start(); void rx(){    Observable        .create(new Observable.OnSubscribe<String>() {          @Override          public void call(Subscriber<? super String> subscriber) {            Logger.v( "rx_call" , Thread.currentThread().getName() );            subscriber.onNext( "dd");            subscriber.onCompleted();          }        })        .map(new Func1<String, String >() {          @Override          public String call(String s) {            Logger.v( "rx_map" , Thread.currentThread().getName() );            return s + "88";          }        })        .subscribe(new Action1<String>() {          @Override          public void call(String s) {            Logger.v( "rx_subscribe" , Thread.currentThread().getName() );          }        }) ;  }

 

      结果

/rx_newThread: Thread-564   -- 子线程
/rx_call: Thread-564              -- 子线程
/rx_map: Thread-564            -- 子线程 
/rx_subscribe: Thread-564    -- 子线程

 

  • 通过例1和例2,说明,Rxjava默认运行在当前线程中。如果当前线程是子线程,则rxjava运行在子线程;同样,当前线程是主线程,则rxjava运行在主线程

 

例3

 Observable        .create(new Observable.OnSubscribe<String>() {          @Override          public void call(Subscriber<? super String> subscriber) {            Logger.v( "rx_call" , Thread.currentThread().getName() );            subscriber.onNext( "dd");            subscriber.onCompleted();          }        })        .subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .map(new Func1<String, String >() {          @Override          public String call(String s) {            Logger.v( "rx_map" , Thread.currentThread().getName() );            return s + "88";          }        })        .subscribe(new Action1<String>() {          @Override          public void call(String s) {            Logger.v( "rx_subscribe" , Thread.currentThread().getName() );          }        }) ;

  结果

/rx_call: RxCachedThreadScheduler-1    --io线程
/rx_map: main                                     --主线程
/rx_subscribe: main                              --主线程

 

例4

 Observable        .create(new Observable.OnSubscribe<String>() {          @Override          public void call(Subscriber<? super String> subscriber) {            Logger.v( "rx_call" , Thread.currentThread().getName() );            subscriber.onNext( "dd");            subscriber.onCompleted();          }        })        .map(new Func1<String, String >() {          @Override          public String call(String s) {            Logger.v( "rx_map" , Thread.currentThread().getName() );            return s + "88";          }        })        .subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Action1<String>() {          @Override          public void call(String s) {            Logger.v( "rx_subscribe" , Thread.currentThread().getName() );          }        }) ; 

      结果

/rx_call: RxCachedThreadScheduler-1     --io线程
/rx_map: RxCachedThreadScheduler-1   --io线程
/rx_subscribe: main                              --主线程

   

  • 通过例3、例4 可以看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 写的位置不一样,造成的结果也不一样。从例4中可以看出 map() 操作符默认运行在事件产生的线程之中。事件消费只是在 subscribe() 里面。
  • 对于 create() , just() , from()   等                 --- 事件产生   

               map() , flapMap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消费

  •   事件产生:默认运行在当前线程,可以由 subscribeOn()  自定义线程

         事件加工:默认跟事件产生的线程保持一致, 可以由 observeOn() 自定义线程

       事件消费:默认运行在当前线程,可以有observeOn() 自定义

 

例5  多次切换线程

 

Observable        .create(new Observable.OnSubscribe<String>() {          @Override          public void call(Subscriber<? super String> subscriber) {            Logger.v( "rx_call" , Thread.currentThread().getName() );            subscriber.onNext( "dd");            subscriber.onCompleted();          }        })        .observeOn( Schedulers.newThread() )  //新线程        .map(new Func1<String, String >() {          @Override          public String call(String s) {            Logger.v( "rx_map" , Thread.currentThread().getName() );            return s + "88";          }        })        .observeOn( Schedulers.io() )   //io线程        .filter(new Func1<String, Boolean>() {          @Override          public Boolean call(String s) {            Logger.v( "rx_filter" , Thread.currentThread().getName() );            return s != null ;          }        })        .subscribeOn(Schedulers.io())   //定义事件产生线程:io线程        .observeOn(AndroidSchedulers.mainThread())   //事件消费线程:主线程        .subscribe(new Action1<String>() {          @Override          public void call(String s) {            Logger.v( "rx_subscribe" , Thread.currentThread().getName() );          }        }) ;

  结果

/rx_call: RxCachedThreadScheduler-1           -- io 线程
/rx_map: RxNewThreadScheduler-1             -- new出来的线程
/rx_filter: RxCachedThreadScheduler-2        -- io线程
/rx_subscribe: main                                   -- 主线程

 




原标题:RxJava 和 RxAndroid 五(线程调度)

关键词:JAVA

*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: admin#shaoqun.com (#换成@)。

9月,亚马逊新站点或将上线!:https://www.ikjzd.com/articles/103865
假货克星!亚马逊欧洲站喜提零容忍计划!:https://www.ikjzd.com/articles/103866
警惕!深圳海关公布4宗进出口商品不合格典型案例,你中招了?:https://www.ikjzd.com/articles/103867
巧妙避税,这家加拿大公司助力合理向美国销售商品!:https://www.ikjzd.com/articles/103869
最新!律所又发案,侵权卖家做好准备!:https://www.ikjzd.com/articles/103871
收藏!各大跨境电商平台排名规则解读!:https://www.ikjzd.com/articles/103874
宠物梳专利查询分析:https://www.kjdsnews.com/a/1842293.html
温州旧货市场有玻璃柜卖吗?:https://www.vstour.cn/a/411246.html
相关文章
我的浏览记录
最新相关资讯
海外公司注册 | 跨境电商服务平台 | 深圳旅行社 | 东南亚物流