你的位置:首页 > 软件开发 > Java > BlockingQueue 阻塞队列(ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue、SynchronousQueue)

BlockingQueue 阻塞队列(ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue、SynchronousQueue)

发布时间:2016-10-21 16:00:42
1 package com.thread.test.thread; 2 import java.util.Random; 3 import java.util.concurrent.*; 4 5 /** 6 * Created by windwant on 2016/5/26 ...
 1 package com.thread.test.thread; 2 import java.util.Random; 3 import java.util.concurrent.*; 4  5 /** 6  * Created by windwant on 2016/5/26. 7 */ 8 public class MyBlockingQueue { 9   public static void main(String[] args) throws InterruptedException { 10     testArrayBlockingQueue(); 11   } 12  13   /** 14    * 公平性 构造函数 true 15   */ 16   public static void testArrayBlockingQueue(){ 17     BlockingQueue<String> abq = new ArrayBlockingQueue<String>(5); 18     ExecutorService es = Executors.newCachedThreadPool(); 19     es.execute(new MyPro(abq, 1000)); 20     es.execute(new MyCus(abq, 5000)); 21     es.shutdown(); 22   } 23  24   /** 25    * 基于链表节点的可设置容量的队列,先进先出,队尾插入元素,队首获取元素。 26    * 链表队列比基于数据的队列有更高的存取效率,但是在并发应用中效率无法预测。 27   */ 28   public static void testLinkedBlockingQueue(){ 29     BlockingQueue<String> abq = new LinkedBlockingQueue<String>(5); 30     ExecutorService es = Executors.newCachedThreadPool(); 31     es.execute(new MyPro(abq, 20)); 32     es.execute(new MyCus(abq, 2000)); 33     es.shutdown(); 34   } 35  36   /** 37    * DelayQueue 38    * 无容量限制的阻塞队列,元素包含延迟时限,只有到达时限,元素才能被取出。 39    * 队列顶部是距离到期时间最远的元素。 40    * 如果所有的元素都未到期,将会返回null。 41    * 元素在执行getDelay()方法返回值小于等于0时过期,即使没有被通过take或者poll执行提取,它们也会被当作一般元素对待。 42    * 队列size方法返回所有元素的数量。 43    * 队列不能包含null元素。 44   */ 45   public static void testDelayQueue() throws InterruptedException { 46     DelayQueue<MyDelayItem> dq = new DelayQueue<MyDelayItem>(); 47     ExecutorService es = Executors.newFixedThreadPool(5); 48     es.execute(new MyDelayPro(dq, 1000)); 49     es.execute(new MyDelayCus(dq, 10000)); 50     es.shutdown(); 51   } 52  53   /** 54    * 无容量限制的阻塞队列,元素顺序维持策略同PriorityQueue一样,支持阻塞获取 55    * 不允许添加null元素 56    * 元素必须支持排序 57    * 支持集合遍历,排序 58   */ 59   public static void testPriorityBlockingQueue() throws InterruptedException { 60     PriorityBlockingQueue<MyPriorityItem> pbq = new PriorityBlockingQueue<MyPriorityItem>(); 61     ExecutorService es = Executors.newFixedThreadPool(5); 62     es.execute(new MyPriorityBlockingQueuePro(pbq, 1000)); 63     es.execute(new MyPriorityBlockingQueueCus(pbq, 10000)); 64     es.shutdown(); 65   } 66  67   /** 68    * 阻塞队列,插入元素和提取元素必须同步。 69    * 异步队列没有容量的概念。 70    * 无法使用peek,因为只有当你尝试移除时,元素才会存在。 71    * 无法插入元素,除非有另外一个线程同时尝试获取元素。 72    * 不支持遍历操作,因为队列中根本没有元素。 73    * 队列的顶部就是尝试插入元素的线程要插入的元素。 74    * 如果没有尝试插入元素的线程,那么就不存在能够提取的元素,poll会返回null。 75    * 集合操作contains返null 76    * 不允许插入null元素 77    * */ 78   public static void testSynchronousQueue() throws InterruptedException { 79     SynchronousQueue<String> sq = new SynchronousQueue<String>(); 80     ExecutorService es = Executors.newFixedThreadPool(5); 81     es.execute(new MySynchronousQueuePro(sq, 1000)); 82     es.execute(new MySynchronousQueueCus(sq, 2000)); 83     es.shutdown(); 84   } 85 } 86  87 /** 88  * 测试生产者 89 */ 90 class MyPro implements Runnable{ 91  92   private BlockingQueue<String> bq; 93  94   private int period = 1000; 95  96   private Random r = new Random(); 97   MyPro(BlockingQueue bq, int period){ 98     this.bq = bq; 99     this.period = period;100   }101 102   public void run() {103     try{104       while (true){105         Thread.sleep(period);106         String value = String.valueOf(r.nextInt(100));107         if(bq.offer(value)){ //offer 能够插入就返回true,否则返回false108           System.out.println("pro make value: " + value + " queue : " + bq.toString());109           System.out.println("******************************************************");110         }111       }112     }catch (InterruptedException e){113       e.printStackTrace();114     }115   }116 }117 118 /**119  * 测试消费者120 */121 class MyCus implements Runnable{122 123   private BlockingQueue<String> bq;124 125   private int period = 1000;126 127   private Random r = new Random();128   MyCus(BlockingQueue bq, int period){129     this.bq = bq;130     this.period = period;131   }132 133   public void run() {134     try{135       while (true){136         Thread.sleep(period);137         String value = bq.take(); //获取队列头部元素,无元素则阻塞138         System.out.println("cus take value: " + value + " queue : " + bq.toString());139         System.out.println("======================================================");140       }141     }catch (InterruptedException e){142       e.printStackTrace();143     }144   }145 }146 147 /**148  * 延迟队列元素 实现排序149 */150 class MyDelayItem implements Delayed{151 152   private long liveTime;153 154   private long removeTime;155 156   MyDelayItem(long liveTime, long removeTime){157     this.liveTime = liveTime;158     this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.nanoTime();159   }160 161   public long getDelay(TimeUnit unit) {162     return unit.convert(removeTime - System.nanoTime(), unit);163   }164 165   public int compareTo(Delayed o) {166     if(o == null) return -1;167     if(o == this) return 0;168     if(o instanceof MyDelayItem){169       MyDelayItem tmp = (MyDelayItem) o;170       if(liveTime > tmp.liveTime){171         return 1;172       }else if(liveTime == tmp.liveTime){173         return 0;174       }else{175         return -1;176       }177     }178     long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);179     return diff > 0 ? 1 : diff == 0 ? 0 : -1;180   }181 182   public String toString(){183     return "{livetime: " + String.valueOf(liveTime) + ", removetime: " + String.valueOf(removeTime) + "}";184   }185 }186 187 /**188  * 延迟队列测试生产者189 */190 class MyDelayPro implements Runnable{191 192   private DelayQueue<MyDelayItem> dq;193 194   private int period = 1000;195 196   private Random r = new Random();197 198   MyDelayPro(DelayQueue dq, int period){199     this.dq = dq;200     this.period = period;201   }202   public void run() {203     try{204       while (true){205         Thread.sleep(period);206         if(dq.size() > 5){207           continue;208         }209         MyDelayItem di = new MyDelayItem(r.nextInt(10), r.nextInt(10));210         dq.offer(di);211         System.out.println("delayqueue: add---" + di.toString() + "size: " + dq.size());212         System.out.println("*************************************");213       }214     }catch (InterruptedException e){215       e.printStackTrace();216     }217   }218 }219 220 /**221  * 延迟队列测试消费者222 */223 class MyDelayCus implements Runnable{224 225   private DelayQueue<MyDelayItem> dq;226 227   private int period = 1000;228 229   MyDelayCus(DelayQueue dq, int period){230     this.dq = dq;231     this.period = period;232   }233   public void run() {234     try{235       while (true){236         Thread.sleep(period);237         MyDelayItem di = dq.take();238         System.out.println("delayqueue: remove---" + di.toString());239         System.out.println("delayqueue: ---" + dq.toString());240         System.out.println("======================================");241       }242     }catch (InterruptedException e){243       e.printStackTrace();244     }245   }246 }247 248 /**249  * 延迟队列元素 时限排序对比延迟队列250 */251 class MyPriorityItem implements Comparable<MyPriorityItem> {252 253   private int priority;254 255   MyPriorityItem(int priority){256     this.priority = priority;257   }258 259   /**260    * 数字大优先级高261    * @param o262    * @return263   */264   public int compareTo(MyPriorityItem o) {265     if(o == null) return -1;266     if(o == this) return 0;267     if(priority > o.priority){268       return -1;269     }else if(priority == o.priority){270       return 0;271     }else{272       return 1;273     }274   }275 276   public String toString(){277     return "{priority: " + String.valueOf(priority) + "}";278   }279 }280 281 /**282  * 优先队列测试生产者283 */284 class MyPriorityBlockingQueuePro implements Runnable{285 286   private PriorityBlockingQueue<MyPriorityItem> pbq;287 288   private int period = 1000;289 290   private Random r = new Random();291 292   MyPriorityBlockingQueuePro(PriorityBlockingQueue pbq, int period){293     this.pbq = pbq;294     this.period = period;295   }296   public void run() {297     try{298       while (true){299         Thread.sleep(period);300         if(pbq.size() > 5){301           continue;302         }303         MyPriorityItem pi = new MyPriorityItem(r.nextInt(10));304         pbq.offer(pi);305         System.out.println("PriorityBlockingQueue: add---" + pi.toString() + " size: " + pbq.size());306         System.out.println("PriorityBlockingQueue: " + pbq.toString());307         System.out.println("*************************************");308       }309     }catch (InterruptedException e){310       e.printStackTrace();311     }312   }313 }314 315 /**316  * 优先队列测试消费者317 */318 class MyPriorityBlockingQueueCus implements Runnable{319 320   private PriorityBlockingQueue<MyPriorityItem> pbq;321 322   private int period = 1000;323 324   private Random r = new Random();325 326   MyPriorityBlockingQueueCus(PriorityBlockingQueue pbq, int period){327     this.pbq = pbq;328     this.period = period;329   }330   public void run() {331     try{332       while (true){333         Thread.sleep(period);334         MyPriorityItem di = pbq.take();335         System.out.println("PriorityBlockingQueue: remove---" + di.toString());336         System.out.println("PriorityBlockingQueue: ---" + pbq.toString());337         System.out.println("======================================");338       }339     }catch (InterruptedException e){340       e.printStackTrace();341     }342   }343 }344 345 /**346  * 阻塞队列测试生产者347 */348 class MySynchronousQueuePro implements Runnable{349 350   private SynchronousQueue<String> sq;351 352   private int period = 1000;353 354   private Random r = new Random();355   MySynchronousQueuePro(SynchronousQueue sq, int period){356     this.sq = sq;357     this.period = period;358   }359 360   public void run() {361     try{362       while (true){363         Thread.sleep(period);364         String value = String.valueOf(r.nextInt(100));365         if(sq.offer(value)) {366           System.out.println("pro make value: " + value + " synchronous :" + sq.toString());367           System.out.println("******************************************************");368         }369       }370     }catch (InterruptedException e){371       e.printStackTrace();372     }373   }374 }375 376 /**377  * 阻塞队列测试消费者378 */379 class MySynchronousQueueCus implements Runnable{380 381   private BlockingQueue<String> sq;382 383   private int period = 1000;384 385   MySynchronousQueueCus(BlockingQueue sq, int period){386     this.sq = sq;387     this.period = period;388   }389 390   public void run() {391     try{392       while (true){393         Thread.sleep(period);394         String value = sq.take();395         System.out.println("cus take value: " + value + " synchronous :" + sq.toString());396         System.out.println("======================================================");397       }398     }catch (InterruptedException e){399       e.printStackTrace();400     }401   }402 }

原标题:BlockingQueue 阻塞队列(ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue、SynchronousQueue)

关键词:array

*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: admin#shaoqun.com (#换成@)。

可能感兴趣文章

我的浏览记录