你的位置:首页 > 操作系统

[操作系统]Android AsyncTask 深度理解、简单封装、任务队列分析、自定义线程池


前言:由于最近在做SDK的功能,需要设计线程池。看了很多资料不知道从何开始着手,突然发现了AsyncTask有对线程池的封装,so,就拿它开刀,本文将从AsyncTask的基本用法,到简单的封装,再到任务队列分析,最后自定义线程池。

 

1、概念

    Android 中的异步任务常用的一种方式是:Handler + Thread 组合来实现的。Thread 负责子线程的耗时操作,Handler 负责线程间的通信,用的最多的当属子线程和主线程通信。

    Android 为了简化操作,提供了 AsyncTask 类来实现异步任务,并且轻松实现子线程和主线程间的通信。

2、AsyncTask的简单封装

    三个参数代表的含义

  • Params:第一个参数是启动任务传进来的参数;
  • Progress:第二个参数是用来显示进度条的参数;
  • Result:第三个参数是后台执行后返回的参数的类型。
package com.app;import android.os.AsyncTask;/** * Created by ${zyj} on 2016/8/2. */public class MyTask<T> extends AsyncTask<T , Integer , T> {  private TaskListener taskListener ;  public MyTask(){  }  //执行预处理,它运行于UI线程,可以为后台任务做一些准备工作,比如绘制一个进度条控件。  @Override  protected void onPreExecute() {    if ( taskListener != null ){      taskListener.start();    }  }  //运行于UI线程,可以对后台任务的结果做出处理,结果就是doInBackground(Params...)的返回值。  @Override  protected void onPostExecute(T t) {    if ( taskListener != null ){      taskListener.result( t );    }  }  /**   * 更新子线程进度,运行于UI线程   * @param values   */  @Override  protected void onProgressUpdate(Integer... values) {;    if ( taskListener != null ){      taskListener.update( values[0] );    }  }  //运行与后台线程  @Override  protected T doInBackground(T... ts) {    if ( taskListener != null ){      return (T) taskListener.doInBackground( ts[0] ) ;    }    return null;  }  public MyTask setTaskListener(TaskListener taskListener ){    this.taskListener = taskListener ;    return this ;  }  /**   * 更新进度   * @param progress   */  public void updateProgress( int progress ){    publishProgress( progress );  }  public interface TaskListener<T>{    void start() ;    void update( int progress ) ;    T doInBackground( T t );    void result( T t );  }  /**   * 取消一个正在执行的任务   */  public void cancle(){    if ( !isCancelled() ){      cancel( true ) ;    }  }}

 

3、简单的异步任务使用

package com.app;import android.os.Bundle;import android.support.v7.app.AppCompatActivity;import android.util.Log;import wifi.app.wei.com.myapplication.R;public class MainActivity extends AppCompatActivity {  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_main);    new MyTask<String>().setTaskListener(new MyTask.TaskListener() {      @Override      public void start() {        Log.d( "task--" , "start 开始了, 运行在主线程" ) ;      }      @Override      public void update(int progress) {      }      @Override      public Object doInBackground(Object o) {        Log.d( "task--" , "doInBackground , 运行在子线程" ) ;        return null;      }      @Override      public void result(Object o) {        Log.d( "task--" , "result , 运行在主线程" ) ;      }    }).execute( "" ) ;  }}

  

 4、带有进度更新的异步任务使用

package com.app;import android.os.Bundle;import android.support.v7.app.AppCompatActivity;import android.util.Log;import android.widget.TextView;import wifi.app.wei.com.myapplication.R;public class MainActivity extends AppCompatActivity {  private TextView textView ;  private MyTask myTask ;  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_main);    textView = (TextView) findViewById( R.id.tv1 );    myTask = new MyTask<String>().setTaskListener(new MyTask.TaskListener() {      @Override      public void start() {        Log.d( "task--" , "start 开始了, 运行在主线程" ) ;        textView.setText( "任务开始了" );      }      @Override      public void update(int progress) {        textView.setText( "进度" + progress );      }      @Override      public Object doInBackground(Object o) {        Log.d( "task--" , "doInBackground , 运行在子线程" ) ;        for ( int i = 0 ; i < 100 ; i++ ){          try {            Thread.sleep( 100 ) ;            myTask.updateProgress( i ) ; //每隔100毫秒,更新一下进度          } catch (InterruptedException e) {            e.printStackTrace();          }        }        return "结束了";      }      @Override      public void result(Object o) {        Log.d( "task--" , "result , 运行在主线程" ) ;        textView.setText( "" + o );      }    }) ;    //开始执行任务    myTask.execute( "" ) ;  }}

  执行效果图

 

5、AsyncTask 任务执行应该注意的细节

  (1)、如果异步任务需要联网,则需要添加联网权限

             <uses-permission android:name="android.permission.INTERNET"/>

      (2)、AsyncTask实例必须在UI线程中创建,execute(Params…)方法必须在UI线程中调用。不用手动调用onPreExecute()。

     (3)、一个任务只能被执行一次

 

6、如何取消任务

     可以调用 myTask.cancle() ;  

     但是这个方法并没有真正的结束任务,只是设置了一个标志位,把当前线程中断了。

     取消任务实践

package com.app;import android.os.Bundle;import android.support.v7.app.AppCompatActivity;import android.util.Log;import android.view.View;import android.widget.TextView;import wifi.app.wei.com.myapplication.R;public class MainActivity extends AppCompatActivity {  private TextView textView ;  private MyTask myTask ;  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_main);    textView = (TextView) findViewById( R.id.tv1 );    textView.setOnClickListener(new View.OnClickListener() {      @Override      public void onClick(View v) {        //取消任务        myTask.cancle();      }    });    myTask = new MyTask<String>().setTaskListener(new MyTask.TaskListener() {      @Override      public void start() {        Log.d( "task--" , "start 开始了, 运行在主线程" ) ;        textView.setText( "任务开始了" );      }      @Override      public void update(int progress) {        textView.setText( "进度" + progress );      }      @Override      public Object doInBackground(Object o) {        Log.d( "task--" , "doInBackground , 运行在子线程" ) ;        for ( int i = 0 ; i < 100 ; i++ ){          try {            Thread.sleep( 100 ) ;            myTask.updateProgress( i ) ; //每隔100毫秒,更新一下进度          } catch (InterruptedException e) {            e.printStackTrace();          }        }        return "结束了";      }      @Override      public void result(Object o) {        Log.d( "task--" , "result , 运行在主线程" ) ;        textView.setText( "" + o );      }    }) ;    //开始执行任务    myTask.execute( "" ) ;  }}

  当点击textView时,调用了 myTask.cancle() ;方法后,Android studio 控制台抛出了异常

 

通过这里我们发现,AsyncTask 虽然提供了cancle( true )  方法来停止任务,但是这个方法只是中断了这个线程,但是并不能真正意思上的停止任务,这也是很多人说 AsyncTask 的弊端。极容易造成内存溢出的。

 

几种结束任务的间接实现方式:

1、判断标志位的办法:

我们要知道在java的线程中,没有办法停止一个正在运行中的线程。在Android的AsyncTask中也是一样的。如果必须要停止一个线程,我们可以采用这个线程中设置一个标志位,然后在线程run方法或AsyncTask的doInBackground方法中的关键步骤判断这个标志位以决定是否继续执行。然后在需要终止此线程的地方改变这个标志位以达到停止线程的目的。

2、合理的利用Exception

从外部调用AsyncTask的cancel方法并不能停止一个已经启动的AsyncTask。这个cancel方法的作用与线程的interrupt方法相似,调用了一个线程的interrupt方法之后线程仍然运行,但是如果该线程的run方法里面调用过sleep的或者wait方法后,处于sleep或wait状态,则sleep和wait立即结束并且抛出InterruptedException异常。AsyncTask的cancel方法也一样,如果在这个Task的doInBackground方法中调用了sleep或wait方法,当在UI线程中调用了这个Task实例的cancel方法之后,sleep或wait立即结束并且抛出InterruptedException异常,但是如果捕获该异常的代码后面还有其他代码,则这些代码还会继续执行。

3、可以在UI上做手脚

如果用户在后台线程正获取内容时做出了取消的行为,我们可以根据用户的这种行为在UI上立即做出反馈,此时,即使线程完成了数据的Loading,我们也不让数据显示出来,算是一种投机取巧的办法吧。

7、AsyncTask 串行处理任务 和 并行处理任务

     在上面的代码演示中,执行任务用的都是 myTask.execute() , 这个默认是串行执行任务的。比如同一时刻有两个任务要处理,AsyncTask 会先执行第一个任务,等第一个任务执行结束,然后才会执行第二个任务。

     在AsyncTask中还有一个并行处理任务的方法:executeOnExecutor( Executor exe , Params... params ) 。 

 

     下面是串行执行任务execute()的源码

     

 

  通过看源码,发现其实串行执行任务也是调用了并行的方法 executeOnExecutor () , 只不过启用了一个默认的 sDefaultExecutor (sDefaultExecutor 是一个串行的线程池)。

  有串行线程池,那么势必就有一个并行线程池 , 在AsyncTask里面源码里面定义了一个并行线程池: THREAD_POOL_EXECUTOR 。

  

       可以看到并行 THREAD_POOL_EXECUTOR 是通过 new ThreadPoolExecutor() 来创建的

  public ThreadPoolExecutor(int corePoolSize,               int maximumPoolSize,               long keepAliveTime,               TimeUnit unit,               BlockingQueue<Runnable> workQueue,               ThreadFactory threadFactory) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,       threadFactory, defaultHandler);  }

参数说明:

corePoolSize: 线程池维护线程的最少数量 
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略

 

我们知道,受限于硬件、内存和性能,我们不可能无限制的创建任意数量的线程,因为每一台机器允许的最大线程是一个有界值。也就是说ThreadPoolExecutor管理的线程数量是有界的。线程池就是用这些有限个数的线程,去执行提交的任务。然而对于多用户、高并发的应用来说,提交的任务数量非常巨大,一定会比允许的最大线程数多很多。为了解决这个问题,必须要引入排队机制,或者是在内存中,或者是在硬盘等容量很大的存储介质中。J.U.C提供的ThreadPoolExecutor只支持任务在内存中排队,通过BlockingQueue暂存还没有来得及执行的任务。

任务的管理是一件比较容易的事,复杂的是线程的管理,这会涉及线程数量、等待/唤醒、同步/锁、线程创建和死亡等问题。ThreadPoolExecutor与线程相关的几个成员变量是:keepAliveTime、allowCoreThreadTimeOut、poolSize、corePoolSize、maximumPoolSize,它们共同负责线程的创建和销毁。

corePoolSize:

线程池的基本大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。这里需要注意的是:在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。再考虑到keepAliveTime和allowCoreThreadTimeOut超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize。

maximumPoolSize:

线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。这里值得一提的是largestPoolSize,该变量记录了线程池在整个生命周期中曾经出现的最大线程个数。为什么说是曾经呢?因为线程池创建之后,可以调用setMaximumPoolSize()改变运行的最大线程的数目。

poolSize:

线程池中当前线程的数量,当该值为0的时候,意味着没有任何线程,线程池会终止;同一时刻,poolSize不会超过maximumPoolSize。

keepAliveTime

当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。

     

了解了各个参数的含义之后,我们来看看 AsyncTask 中默认的并行线程队列 THREAD_POOL_EXECUTOR 各项的数值

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();private static final int CORE_POOL_SIZE = CPU_COUNT + 1;private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;private static final int KEEP_ALIVE = 1;private static final ThreadFactory sThreadFactory = new ThreadFactory() {  private final AtomicInteger mCount = new AtomicInteger(1);  public Thread newThread(Runnable r) {    return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());  }};private static final BlockingQueue<Runnable> sPoolWorkQueue =      new LinkedBlockingQueue<Runnable>(128);

  • corePoolSize 为cup数加 1  ;       
  • maximumPoolSize 为cup数的2倍加1
  • 存活时间为1秒
  • 任务缓存队列为 LinkedBlockingQueue

   

     小测试:我手上的手机是联想 k50-t5 ,  在设置里面看到处理器为 8 核1.7GHZ , 运行 Runtime.getRuntime().availableProcessors(); 方法得到的值为:8 。

     另外我们也可以总结出:

  •  同一台手机上THREAD_POOL_EXECUTOR 的 corePoolSize 和 maximumPoolSize 的值是固定的。
  •  在不同的手机上,THREAD_POOL_EXECUTOR 的 corePoolSize 和 maximumPoolSize 的值是不同的。 这种动态设置的方法值得我们学习,在不同的设备上所使用的策略是不同的。但是也是方式也是有弊端的,任务并发数是由cpu的限定的,不可人为的修改。

 

总结:

    //开始执行 串行任务    myTask.execute( "" ) ;    或者    myTask.executeOnExecutor(AsyncTask.SERIAL_EXECUTOR , "" ) ;    //开始执行 并行任务    myTask.executeOnExecutor( AsyncTask.THREAD_POOL_EXECUTOR , "" ) ;

 

 8、自定义线程池

    上一部分我们已经明白了AsyncTask 的默认并行线程池 THREAD_POOL_EXECUTOR 是通过 new ThreadPoolExecutor() 来创建的 , 那么我们也可以自己定义一个线程池。

        首先来看 ThreadPoolExecutor 的构造函数

        

public ThreadPoolExecutor(int corePoolSize,               int maximumPoolSize,               long keepAliveTime,               TimeUnit unit,               BlockingQueue<Runnable> workQueue) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,        Executors.defaultThreadFactory(), defaultHandler);  }    public ThreadPoolExecutor(int corePoolSize,               int maximumPoolSize,               long keepAliveTime,               TimeUnit unit,               BlockingQueue<Runnable> workQueue,               ThreadFactory threadFactory) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,        threadFactory, defaultHandler);  }    public ThreadPoolExecutor(int corePoolSize,               int maximumPoolSize,               long keepAliveTime,               TimeUnit unit,               BlockingQueue<Runnable> workQueue,               RejectedExecutionHandler handler) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,        Executors.defaultThreadFactory(), handler);  }    public ThreadPoolExecutor(int corePoolSize,               int maximumPoolSize,               long keepAliveTime,               TimeUnit unit,               BlockingQueue<Runnable> workQueue,               ThreadFactory threadFactory,               RejectedExecutionHandler handler) {    if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)      throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)      throw new NullPointerException();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;  }

  通过看构造方法,发现 corePoolSize 、maximunPoolSize 、keepAliveTime 、unit 、workQueue 是必须要写的。

  分析最后一个构造

if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)      throw new IllegalArgumentException();

  corePoolSize :最小值 0 

     maximunPoolSize :最小值 1

      corePoolSize 必须小于或者等于 maximunPoolSize 

 

    主要来看 workQueue , 这个是就是线程队列了。

    下面是AsyncTask并行线程池 THREAD_POOL_EXECUTOR 里面所使用的线程队列,128 代表线程队列的长度

  private static final BlockingQueue<Runnable> sPoolWorkQueue =      new LinkedBlockingQueue<Runnable>(128);

     下面给出一个完整的例子:

 

    //创建缓冲队列 队列长度:100    BlockingQueue<Runnable> sPoolWorkQueue =        new LinkedBlockingQueue<Runnable>(100);        //创建线程池 核心线程:5个  最大线程:10个  线程空闲存活时间:1秒    Executor executor = new ThreadPoolExecutor( 5 , 10 , 1 , TimeUnit.SECONDS ,        sPoolWorkQueue ) ;    //添加任务到缓冲队列    myTask1.executeOnExecutor( executor , "" ) ;

  

  

线程创建规则

一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
 1、  如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
 2、  如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
 3、  如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
 4、  如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
 5、  当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

 

线程池按以下行为执行任务

  1. 当线程数小于核心线程数时,创建线程。
  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  3. 当线程数大于等于核心线程数,且任务队列已满
    1. 若线程数小于最大线程数,创建线程
    2. 若线程数等于最大线程数,抛出异常,拒绝任务

 

任务队列执行的逻辑:

    FIFO  先进先出