在本人的上一篇博客文章:Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇 中,重点向大家介绍了AvatarMQ主要构成模块以及目前存在的优缺点。最后以一个生产者、消费者传递消息的例子,具体演示了AvatarMQ所具备的基本消息路由功能。而本文的写作目的,是想从 ...
在本人的上一篇博客文章:Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇 中,重点向大家介绍了AvatarMQ主要构成模块以及目前存在的优缺点。最后以一个生产者、消费者传递消息的例子,具体演示了AvatarMQ所具备的基本消息路由功能。而本文的写作目的,是想从开发、设计的角度,简单的对如何使用Netty,构建分布式消息队列背后的技术细节、原理,进行一下简单的分析和说明。
首先,在一个企业级的架构应用中,究竟何时需引入消息队列呢?本人认为,最经常的情况,无非这几种:做业务解耦、事件消息广播、消息流控处理。其中,对于业务解耦是作为消息队列,要解决的一个首要问题。所谓业务解耦,就是说在一个业务流程处理上,只关注具体的流程,尽到通知的责任即可,不必等待消息处理的结果。
总得来看,企业级系统模块通信的方式通常情况下,无非两种。
同步方式:REST、RPC方式实现;异步方式:消息中间件(消息队列)方式实现。
同步方式的优点:可以基于http协议之上,无需中间件代理,系统架构相对而言比较简单。缺点是:客户端和服务端紧密耦合,并且要实时在线通信,否则会导致消息发送失败。
异步方式的优点:客户端和服务端互相解耦,双方可以不产生依赖。缺点是:由于引入了消息中间件,在编程的时候会增加难度系数。此外,消息中间件的可靠性、容错性、健壮性往往成为这类架构的决定性因素。
举一个本人工作中的例子向大家说明一下:移动业务中的产品订购中心,每当一个用户通过某些渠道(营业厅、自助终端等等)开通、订购了某个套餐之后,如果这些套餐涉及第三方平台派单的话,产品订购中心会向第三方平台发起订购请求操作。试想一下,如果遇到高峰受理时间段,由于业务受理量的激增,导致一些外围系统的响应速度降低(比如业务网关响应速度不及时、网络延时等等原因),最终用户开通一个套餐花在主流程的时间会延长很多,这个会造成极不好的用户体验,最终可能导致受理失败。在上述的场景里面,我们就可以很好的引入一个消息队列进行业务的解耦,具体来说,产品订购中心只要“通知”第三方平台,我们的套餐开通成功了,并不一定非要同步阻塞地等待其真正的开通处理完成。正因为如此,消息队列逐渐成为当下系统模块通信的主要方式手段。
当今在Java的消息队列通信领域,有很多主流的消息中间件,比如RabbitMQ、ActiveMQ、以及炙手可热Kafka。其中ActiveMQ是基于JMS的标准之上开发定制的一套消息队列系统,性能稳定,访问接口也非常友好,但是这类的消息队列在访问吞吐量上有所折扣;另外一个方面,比如Kafka这样,以高效吞吐量著称的消息队列系统,但是在稳定性和可靠性上,能力似乎还不够,因此更多的是用在服务日志传输、短消息推送等等对于可靠性不高的业务场景之中。总结起来,不管是ActiveMQ还是Kafka,其框架的背后涉及到很多异步网络通信、多get='_blank'>线程、高并发处理方面的专业技术知识。但本文的重点,也不在于介绍这些消息中间件背后的技术细节,而是想重点阐述一下,如何透过上述消息队列的基本原理,在必要的时候,开发定制一套符合自身业务要求的消息队列系统时,能够获得更加全面的视角去设计、考量这些问题。
因此本人用心开发实现了一个,基于Netty的消息队列系统:AvatarMQ。当然,在设计、实现AvatarMQ的时候,我会适当参考这些成熟消息中间件中用到的很多重要的思想理念。
当各位从github上面下载到AvatarMQ的源代码的时候,可以发现,其中的包结构如下所示:
现在对每个包的主要功能进行一下简要说明(下面省略前缀com.newlandframework.avatarmq)。
broker:消息中间件的服务器模块,主要负责消息的路由、负载均衡,对于生产者、消费者进行消息的应答回复处理(ACK),AvatarMQ中的中心节点,是连接生产者、消费者的桥梁纽带。
consumer:消息中间件中的消费者模块,负责接收生产者过来的消息,在设计的时候,会对消费者进行一个集群化管理,同一个集群标识的消费者,会构成一个大的消费者集群,作为一个整体,接收生产者投递过来的消息。此外,还提供消费者接收消息相关的API给客户端进行调用。
producer:消息中间件中的生产者模块,负责生产特定主题(Topic)的消息,传递给对此主题感兴趣的消费者,同时提供生产者生产消息的API接口,给客户端使用。
core:AvatarMQ中消息处理的核心模块,负责消息的内存存储、应答控制、对消息进行多线程任务分派处理。
model:主要定义了AvatarMQ中的数据模型对象,比如MessageType消息类型、MessageSource消息源头等等模型对象的定义。
msg:主要定义了具体的消息类型对应的结构模型,比如消费者订阅消息SubscribeMessage、消费者取消订阅消息UnSubscribeMessage,消息服务器应答给生产者的应答消息ProducerAckMessage、消息服务器应答给消费者的应答消息ConsumerAckMessage。
netty:主要封装了Netty网络通信相关的核心模块代码,比如订阅消息事件的路由分派策略、消息的编码、解码器等等。
serialize:利用Kryo这个优秀高效的对象序列化、反序列框架对消息对象进行序列化网络传输。
spring:Spring的容器管理类,负责把AvatarMQ中的消息服务器模块:Broker,进行容器化管理。这个包里面的AvatarMQServerStartup是整个AvatarMQ消息服务器的启动入口。
test:这个就不用多说了,就是针对AvatarMQ进行消息路由传递的测试demo。
AvatarMQ运行原理示意图:
首先是消息生产者客户端(AvatarMQ Producer)发送带有主题的消息给消息转发服务器(AvatarMQ Broker),消息转发服务器确认收到生产者的消息,发送ACK应答给生产者,然后把消息继续投递给消费者(AvatarMQ Consumer)。同时broker服务器接收来自消费者的订阅、取消订阅消息,并发送ACK应该给对应的消费者,整个消息系统就是这样周而复始的工作。
现在再来看一下,AvatarMQ中的核心模块的组成,如下图所示:
Producer Manage:消息的生产者,其主要代码在(com.newlandframework.avatarmq.producer)包之下,其主要代码模块关键部分简要说明如下:
package com.newlandframework.avatarmq.producer;import com.newlandframework.avatarmq.core.AvatarMQAction;import com.newlandframework.avatarmq.model.MessageSource;import com.newlandframework.avatarmq.model.MessageType;import com.newlandframework.avatarmq.model.RequestMessage;import com.newlandframework.avatarmq.model.ResponseMessage;import com.newlandframework.avatarmq.msg.Message;import com.newlandframework.avatarmq.msg.ProducerAckMessage;import com.newlandframework.avatarmq.netty.MessageProcessor;import java.util.concurrent.atomic.AtomicLong;/** * @filename:AvatarMQProducer.java * @description:AvatarMQProducer功能模块 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */public class AvatarMQProducer extends MessageProcessor implements AvatarMQAction { private boolean brokerConnect = false; private boolean running = false; private String brokerServerAddress; private String topic; private String defaultClusterId = "AvatarMQProducerClusters"; private String clusterId = ""; private AtomicLong msgId = new AtomicLong(0L); //连接消息转发服务器broker的ip地址,以及生产出来消息附带的主题信息 public AvatarMQProducer(String brokerServerAddress, String topic) { super(brokerServerAddress); this.brokerServerAddress = brokerServerAddress; this.topic = topic; } //没有连接上消息转发服务器broker就发送的话,直接应答失败 private ProducerAckMessage checkMode() { if (!brokerConnect) { ProducerAckMessage ack = new ProducerAckMessage(); ack.setStatus(ProducerAckMessage.FAIL); return ack; } return null; } //启动消息生产者 public void start() { super.getMessageConnectFactory().connect(); brokerConnect = true; running = true; } //连接消息转发服务器broker,设定生产者消息处理钩子,用于处理broker过来的消息应答 public void init() { ProducerHookMessageEvent hook = new ProducerHookMessageEvent(); hook.setBrokerConnect(brokerConnect); hook.setRunning(running); super.getMessageConnectFactory().setMessageHandle(new MessageProducerHandler(this, hook)); } //投递消息API public ProducerAckMessage delivery(Message message) { if (!running || !brokerConnect) { return checkMode(); } message.setTopic(topic); message.setTimeStamp(System.currentTimeMillis()); RequestMessage request = new RequestMessage(); request.setMsgId(String.valueOf(msgId.incrementAndGet())); request.setMsgParams(message); request.setMsgType(MessageType.AvatarMQMessage); request.setMsgSource(MessageSource.AvatarMQProducer); message.setMsgId(request.getMsgId()); ResponseMessage response = (ResponseMessage) sendAsynMessage(request); if (response == null) { ProducerAckMessage ack = new ProducerAckMessage(); ack.setStatus(ProducerAckMessage.FAIL); return ack; } ProducerAckMessage result = (ProducerAckMessage) response.getMsgParams(); return result; } //关闭消息生产者 public void shutdown() { if (running) { running = false; super.getMessageConnectFactory().close(); super.closeMessageConnectFactory(); } } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getClusterId() { return clusterId; } public void setClusterId(String clusterId) { this.clusterId = clusterId; }}
原标题:Netty构建分布式消息队列实现原理浅析
关键词:net
*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们:
admin#shaoqun.com
(#换成@)。