你的位置:首页 > Java教程

[Java教程]Disruptor 分析


通过分析如下代码,大致了解Disruptor的原理

 1 public static void main(String[] args)throws Exception{ 2     EventFactory<LongEvent> eventFactory = new LongEventFactory(); 3     int ringBufferSize = 1024; 4     ExecutorService executors = Executors.newCachedThreadPool(); 5     final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executors,ProducerType.SINGLE, new BlockingWaitStrategy());   6     //final Disruptor<IntEvent> disruptor = new Disruptor<IntEvent>(eventFactory, ringBufferSize, executors,ProducerType.MULTI, new BlockingWaitStrategy()); //多生产者 7     Consumer consumer1 = new Consumer(); 8     Consumer consumer2 = new Consumer(); 9     EventHandlerGroup<LongEvent> firstLevel = disruptor.handleEventsWith(consumer1,consumer2);10     Consumer consumer21 = new Consumer();11     Consumer consumer22 = new Consumer();12     EventHandlerGroup<LongEvent> secondLevel = firstLevel.then(consumer21,consumer22);13     disruptor.start();  //启动disruptor,consumer开始等待,消费数据14     Producer producer = new Producer(disruptor);15     16     //启动生产者17     new Thread(producer).start();18   }

 1. 第2行代码  EventFactory<LongEvent> eventFactory = new LongEventFactory();

 数据工厂类构造单个数据,disruptor使用此工厂类预分配数据。

 2. 第5行代码 final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executors,ProducerType.SINGLE, new BlockingWaitStrategy());

 预分配数据,构建RingBuffer,指定生产者类型(单生产者、多生产者)、消费者执行的线程池、生产者等待可发布数据空间和消费者等待可消费数据的策略。

 

 3. 第9行代码 EventHandlerGroup<LongEvent> firstLevel = disruptor.handleEventsWith(consumer1,consumer2);

   每个消费者Handler都会被封装为一个Processor,其可消费序号由其sequence barrier决定。

 4. 第12行代码 EventHandlerGroup<LongEvent> secondLevel = firstLevel.then(consumer21,consumer22);

 5. 第13行代码 disruptor.start(); //启动disruptor,consumer开始等待,消费数据

 6. 第14-17行代码,创建启动生产者,发布数据