你的位置:首页 > ASP.net教程

[ASP.net教程]Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇


  目前业界流行的分布式消息队列系统(或者可以叫做消息中间件)种类繁多,比如,基于Erlang的RabbitMQ、基于Java的ActiveMQ/Apache Kafka、基于C/C++的ZeroMQ等等,都能进行大批量的消息路由转发。它们的共同特点是,都有一个消息中转路由节点,按照消息队列里面的专业术语,这个角色应该是broker。整个消息系统通过这个broker节点,进行从消息生产者Producer到消费者Consumer的消息路由。当然了,生产者和消费者可以是多对多的关系。消息路由的时候,可以根据关键字(专业的术语叫topic),进行关键字精确匹配、模糊匹配、广播方式的消息路由。

  简单来说,一个极简的分布式消息队列系统主要的构成模块有:

  Broker:简单来说就是消息队列服务器实体。

  Producer:消息的生产者,主要用来发送消息给消费者。

  Consumer:消息的消费者,主要用来接收生产者的消息。

  Routing Key:路由关键字(Topic),主要用来控制生产者和消费者之间的发送与接收消息的对应关系。

  Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

  到此为止,我们明白了一个分布式消息队列系统的主要构成模块,现在本人就通过Netty,这个优秀的Java NIO网络通讯框架,构建一个支持上述应用场景的分布式消息队列系统,本人把其命名为AvatarMQ。后续我会基于这个开源项目,连载出基于Netty构建分布式消息队列系统系列相关的文章,阐明主要的设计思路、组织结构、模块划分依据、类图结构等等。为了说明方便,后续本文中,如果没有特殊说明,有涉及基于Netty构建的分布式消息队列系统,就是指代AvatarMQ。由于整个开源项目涉及的代码量比较多,所以希望大家在本人编写系列博客文章的基础上,耐心地理解、分析其中的代码模块,相信一定不会让您失望,收获一定多多!

  AvatarMQ基于Netty,所以首先,你要能清楚的理解Netty是什么?它能做什么?此外AvatarMQ还大量使用了Java多线程的相关类库。所以希望在此之前,大家能回忆复习一下,这样理解起来会更加得心应手、事半功倍。

  AvatarMQ是基于Netty构建的分布式消息队列系统,支持多个生产者和多个消费者之间的消息路由、传递。主要特性如下:

  • AvatarMQ基于Java语言进行编写,网络通讯依赖Netty。
  • 生产者和消费者的关系可以是一对多、多对一、多对多的关系。
  • 若干个消费者可以组成消费者集群,生产者可以向这个消费者集群投递消息。
  • 消费者集群对于有共同关注点的消费者支持消息的负载均衡策略。
  • 支持动态新增、删除生产者、消费者。
  • 目前仅仅支持关键字的精确匹配路由,后续会逐渐完善。
  • 消息队列服务器Broker基于Netty的主从事件线程池模型开发设计。
  • 网络消息序列化采用Kryo进行消息的网络序列化传输。
  • Broker的消息派发、负载均衡、应答处理(ACK)基于异步多线程模型进行开发设计。
  • Broker消息的投递,目前支持严格的消息顺序。其中Broker还支持消息的缓冲派发,即Broker会缓存一定数量的消息之后,再批量分配给对此消息感兴趣的消费者。

  整个AvatarMQ依赖的jar包请参考:https://github.com/tang-jie/AvatarMQ/blob/master/nbproject/project.properties。

  另外,值得注意的是:AvatarMQ的Netty是基于4.0版本的(下载地址:http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2),消息序列化Kryo基于kryo-3.0.3(下载地址:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-3.0.3),请大家自行去官网下载使用。

  现在,我们先来看下整合AvatarMQ项目的软件架构图:

  

  从上述图例中,我们可以很清楚的看到:生产者和消费者之间是通过Broker进行消息的路由和转发,同时Broker还负责应答生产者和接收消费者的处理应答。

  在了解了,整个AvatarMQ的组织架构之后,我们再来实际运行一下AvatarMQ!

  首先,先启动一下Broker服务器(对应代码:https://github.com/tang-jie/AvatarMQ/blob/master/src/com/newlandframework/avatarmq/spring/AvatarMQServerStartup.java)

  如果一切正常,终端控制台会打印如下输出:

  

  接着,我们就来实际验证一下AvatarMQ的消息推送功能。

  1、生产者发送1条消息给关注这条消息的消费者。我们先启动消费者,再启动生产者。其中消费者1的测试代码(AvatarMQConsumer1.java)为(对应github下载路径:https://github.com/tang-jie/AvatarMQ/blob/master/src/com/newlandframework/avatarmq/test/AvatarMQConsumer1.java):

package com.newlandframework.avatarmq.test;import com.newlandframework.avatarmq.consumer.AvatarMQConsumer;import com.newlandframework.avatarmq.consumer.ProducerMessageHook;import com.newlandframework.avatarmq.msg.ConsumerAckMessage;import com.newlandframework.avatarmq.msg.Message;/** * @filename:AvatarMQConsumer1.java * @description:AvatarMQConsumer1功能模块 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */public class AvatarMQConsumer1 {  private static ProducerMessageHook hook = new ProducerMessageHook() {    public ConsumerAckMessage hookMessage(Message message) {      System.out.printf("AvatarMQConsumer1 收到消息编号:%s,消息内容:%s\n", message.getMsgId(), new String(message.getBody()));      ConsumerAckMessage result = new ConsumerAckMessage();      result.setStatus(ConsumerAckMessage.SUCCESS);      return result;    }  };  public static void main(String[] args) {    AvatarMQConsumer consumer = new AvatarMQConsumer("127.0.0.1:18888", "AvatarMQ-Topic-1", hook);    consumer.init();    consumer.setClusterId("AvatarMQCluster");    consumer.receiveMode();    consumer.start();  }}

  生产者1的测试代码(AvatarMQProducer1.java)为(对应github下载路径:https://github.com/tang-jie/AvatarMQ/blob/master/src/com/newlandframework/avatarmq/test/AvatarMQProducer1.java),下面代码的含义是发送1条消息,给关注“AvatarMQ-Topic-1”主题的消费者:

package com.newlandframework.avatarmq.test;import com.newlandframework.avatarmq.msg.Message;import com.newlandframework.avatarmq.msg.ProducerAckMessage;import com.newlandframework.avatarmq.producer.AvatarMQProducer;import org.apache.commons.lang3.StringUtils;/** * @filename:AvatarMQProducer1.java * @description:AvatarMQProducer1功能模块 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */public class AvatarMQProducer1 {  public static void main(String[] args) throws InterruptedException {    AvatarMQProducer producer = new AvatarMQProducer("127.0.0.1:18888", "AvatarMQ-Topic-1");    producer.setClusterId("AvatarMQCluster");    producer.init();    producer.start();    System.out.println(StringUtils.center("AvatarMQProducer1 消息发送开始", 50, "*"));    for (int i = 0; i < 1; i++) {      Message message = new Message();      String str = "Hello AvatarMQ From Producer1[" + i + "]";      message.setBody(str.getBytes());      ProducerAckMessage result = producer.delivery(message);      if (result.getStatus() == (ProducerAckMessage.SUCCESS)) {        System.out.printf("AvatarMQProducer1 发送消息编号:%s\n", result.getMsgId());      }      Thread.sleep(100);    }    producer.shutdown();    System.out.println(StringUtils.center("AvatarMQProducer1 消息发送完毕", 50, "*"));  }}

  首先我们先来启动消费者,如果一切正常,控制台输出结果为:

  这个时候我们再运行生产者,发送一条消息给消费者。启动生产者之后,控制台输出结果如下:

  那现在,我们切回去看下消费者是否收到生产者的消息了呢?

  非常正确,我们的消费者果然收到了生产者发送过来的消息。

 

  2、生产者发送1条消息给不关注这条消息的消费者。首先说明的是,代码样例还是基于上述的AvatarMQConsumer1.java、AvatarMQProducer1.java。只不过这次是生产者发送的主题改成:“AvatarMQ-Topic-Test”,消费者关注的主题改成“AvatarMQ-Topic-1”。然后依次启动消费者、生产者。下面是实际的运行情况:

  生产者成功发送消息:

  那按照要求,消费者应该无法收到生产者的这条消息,实际情况是不是这样呢?事实胜于雄辩,看如下截图所示:

  消费者依然处理启动监听状态,说明完全符合我们的预期。

 

  3、生产者发送N条消息(这里是发送100条消息)给一个消费者集群(有2个消费者组成,并且这2个消费者关注的消息主题topic是相同的)我们先启动2个消费者,再启动生产者。消费者代码参考:

package com.newlandframework.avatarmq.test;import com.newlandframework.avatarmq.consumer.AvatarMQConsumer;import com.newlandframework.avatarmq.consumer.ProducerMessageHook;import com.newlandframework.avatarmq.msg.ConsumerAckMessage;import com.newlandframework.avatarmq.msg.Message;/** * @filename:AvatarMQConsumer2.java * @description:AvatarMQConsumer2功能模块 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */public class AvatarMQConsumer2 {  private static ProducerMessageHook hook = new ProducerMessageHook() {    public ConsumerAckMessage hookMessage(Message message) {      System.out.printf("AvatarMQConsumer2 收到消息编号:%s,消息内容:%s\n", message.getMsgId(), new String(message.getBody()));      ConsumerAckMessage result = new ConsumerAckMessage();      result.setStatus(ConsumerAckMessage.SUCCESS);      return result;    }  };  public static void main(String[] args) {    AvatarMQConsumer consumer = new AvatarMQConsumer("127.0.0.1:18888", "AvatarMQ-Topic-2", hook);    consumer.init();    consumer.setClusterId("AvatarMQCluster2");    consumer.receiveMode();    consumer.start();  }}

  生产者代码参考(目的是发送100条消息)给消费者集群。

package com.newlandframework.avatarmq.test;import com.newlandframework.avatarmq.msg.Message;import com.newlandframework.avatarmq.msg.ProducerAckMessage;import com.newlandframework.avatarmq.producer.AvatarMQProducer;import org.apache.commons.lang3.StringUtils;/** * @filename:AvatarMQProducer2.java * @description:AvatarMQProducer2功能模块 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */public class AvatarMQProducer2 {  public static void main(String[] args) throws InterruptedException {    AvatarMQProducer producer = new AvatarMQProducer("127.0.0.1:18888", "AvatarMQ-Topic-2");    producer.setClusterId("AvatarMQCluster2");    producer.init();    producer.start();    System.out.println(StringUtils.center("AvatarMQProducer2 消息发送开始", 50, "*"));    for (int i = 0; i < 100; i++) {      Message message = new Message();      String str = "Hello AvatarMQ From Producer2[" + i + "]";      message.setBody(str.getBytes());      ProducerAckMessage result = producer.delivery(message);      if (result.getStatus() == (ProducerAckMessage.SUCCESS)) {        System.out.printf("AvatarMQProducer2 发送消息编号:%s\n", result.getMsgId());      }      Thread.sleep(100);    }    producer.shutdown();    System.out.println(StringUtils.center("AvatarMQProducer2 消息发送完毕", 50, "*"));  }}

  我们依次启动消费者AvatarMQConsumer2两次,这个时候终端控制台依次输出:

  这个时候我们再启动生产者,运行截图如下:

  说明生产者发送了100条消息出去,看下我们消费者1接收的情况:

  继续看下我们的消费者2,消息接收的情况,截图如下:

  最终统计一下,消费者1,接收的消息编号都是奇数,一共50个。消费者2,接收到的消息编号都是偶数,一共50个。两个消费者接收的消息总数加起来,刚好等于生产者发送的消息总数100个,完全符合我们的预期!另外消费者1、消费者2都收到了来自生产者的消息,说明Broker进行了消息的路由传递。

  4、多个生产者和多个消费者的消息传递,以及动态新增、删除生产者、消费者,就交给大家自行测试了,由于篇幅有限,在此本人就不一一阐述。

 

  到目前为止,相信大家对于AvatarMQ所具备的基本功能,有了一个大致的印象。当然,AvatarMQ还有一些美中不足,比如:

  • 不支持消息的刷盘存储,可能由于系统Crash,造成消息的丢失。后续需要接入一个存储系统(基于Java NIO),保证消息的持久序列化。
  • AvatarMQ的生产者、消费者模块,要进一步支持,断网重连Broker的功能,确保在Broker重启的情况下,把在途的消息继续发送、接收完毕。
  • Broker单点的问题,根据高可用性集群HA(High Available)的标准,Broker也要有主节点和从节点机制。在主节点宕机的情况,从节点要能灰度过渡,不至于Broker主节点宕机,整个AvatarMQ消息系统陷入瘫痪状态。
  • 消息应答失败,还未支持重试功能。
  • 当然还有一些未知的bug,有待发现和修复。
  • AvatarMQ的处理性能,未经历过生产系统实际检验,暂时无法保证其安全和可靠性。

  由于代码编写、测试等等工作,都是本人利用工作之余的时间完成,时间点上比较仓促。加上本人的技术水平有限,难免有说的不对及写得不好的地方,或者其中应该有更好的解决方案。欢迎广大同行、爱好者在线下进行学习交流,有什么宝贵的建议和观点,恳请批评指正,不吝赐教。虽然AvatarMQ和业界主流、久经考验的消息队列系统,在处理性能、可靠性上,肯定还有不小的差距。但是可以基于此,加深对分布式消息队列的理解,做到知其然知其所以然,何乐而不为?

  最后,本人后续会逐渐推出,基于Netty构建的分布式消息队列系统(AvatarMQ),架构设计、原理分析的详解连载文章,敬请期待!

  PS:目前AvatarMQ已经开源,整个项目托管到github,对应的网址为:https://github.com/tang-jie/AvatarMQ,欢迎有兴趣的同行朋友、爱好者关注下载,如果觉得还不错,可以点击Star一下。当然,你还可以点击推荐本文,也算是对我辛苦付出的一点支持和回报,谢谢大家!