你的位置:首页 > Java教程

[Java教程]ActiveMQ Apollo之MQTT


Apollo是apache旗下的基金项目,它是以Apache ActiveMQ5.x为基础,采用全新的线程和消息调度架构重新实现的消息中间件,针对多核处理器进行了优化处理,它的速度更快、更可靠、更易于维护。apollo与ActiveQQ一样支持多协议:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介绍MQTT协议的使用。 
关于ActiveMQ5请参考:http://activemq.apache.org,本文只介绍Apollo在windows下安装和应用,Apollo的详细文档请参考官网:http://activemq.apache.org/apollo/documentation/user-manual.html.

 

Apollo的下载和安装

 

1.下载并安装

进入http://activemq.apache.org/apollo/download.html,下载windows版本的压缩包,并解压到自己工作目录(如:E:\apache-apollo-1.7),并创建环境变量APOLLO_HOME=E:\apache-apollo-1.7。如果操作是系统是Windows Vista或更高版本,则需要安装Microsoft Visual C++ 2010 Redistributable (64位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=14632;32位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=5555)。

 

2.创建broker实例并启动服务

进入E:\apache-apollo-1.7之下的bin目录,打开cmd窗口,执行命令:apollo create E:\apollo_broker,命令执行成功后,在E盘下会有apollo_broker目录,在其下有个bin目录,其中有两个文件:apollo-broker.cmd和apollo-broker-service.exe,第一个是通过cmd命令启动apollo服务的,第二个是创建window服务的。 
cmd命令启动:apollo-broker run,启动成功可以在浏览器中查看运行情况(http://127.0.0.1:61680/,默认用户名/密码:admin/password); 
windows服务启动:执行apollo-broker-service.exe,创建windows服务,就可以以windows服务的方式启动apollo服务。

 

3.MQTT协议的应用

MQTT协议有众多客户端实现,相关请参考:http://activemq.apache.org/apollo/versions/1.7/website/documentation/mqtt-manual.html。 
本文采用eclipse的paho客户端实现(https://eclipse.org/paho/)。

a.javascript客户端:https://eclipse.org/paho/clients/js/

将javascript客户端项目下载下来,并在其项目根目录下执行mvn命令,进行编译,生成target目录,其下生成mqttws31.js、mqttws31-min.js两个js文件,将其拷贝到自己项目相关目录下,并在页面中引用,即可实现javascript客户端的消息订阅和发布,demo代码如下: 
var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId"); 
// 61623是ws连接的默认端口,可以在apollo中间件中进行配置(关于apollo的配置请参考:http://activemq.apache.org/apollo/documentation/user-manual.html) 
// set callback handlers 
client.onConnectionLost = onConnectionLost; 
client.onMessageArrived = onMessageArrived; 
// connect the client 
client.connect({userName:'admin',password:'password',onSuccess:onConnect}); 
// called when the client connects 
function onConnect() { // 连接成功后的处理 
// Once a connection has been made, make a subscription and send a message. 
console.log("onConnect"); 
client.subscribe("/topic/event"); // 订阅消息的主题 
var message = new Paho.MQTT.Message("Hello,this is a test"); 
message.destinationName = "/topic/event"; 
client.send(message); // 发送消息 

// called when the client loses its connection 
function onConnectionLost(responseObject) { // 连接丢失后的处理 
if (responseObject.errorCode !== 0) { 
console.log("onConnectionLost:"+responseObject.errorMessage); 


// called when a message arrives 
function onMessageArrived(message) { // 消息接收成功后的处理 
console.log("onMessageArrived:"+message.payloadString); 
}

b. java客户端实现

paho目前只支持J2SE和安卓,下载地址:https://eclipse.org/paho/clients/java/,我们采用maven方式。 
maven库地址: 
https://repo.eclipse.org/content/repositories/paho-releases/ - Official Releases 
https://repo.eclipse.org/content/repositories/paho-snapshots/ - Nightly Snapshots 
maven dependency: 
<dependency> 
<groupId>org.eclipse.paho</groupId> 
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> 
<version>1.0.1</version> 
</dependency>
 
说明:版本为1.0.0或0.9.0时,其jar包根本加载不进来,最后搜到1.0.1版本才可以正常使用。 
java端实现: 
public interface IMessage { 
String getHost(); 
Integer getPort(); 
Integer getQos(); 
String getTopic(); 
String getClientId(); 
String getContent(); 
byte[] getContentBytes(); 

 


Map<String,Object> getOption(); 
Object getSender(); 
Date getSendTime(); 
}
 
public final class MessageProcessingCenter { 
protected static Logger logger=LoggerFactory.getLogger(MessageProcessingCenter.class); 
protected static final String BROKER_PREFIX="tcp://"; 
protected static final String BROKER_HOST="localhost"; 
protected static final int PORT=61613; 
protected static final int QOS=2; 
protected static final String TOPIC="/topic/event"; 
protected static final String CLIENT_ID="clientId"; 
protected static final String MQ_USER="admin"; 
protected static final String MQ_PASSWORD="password"; 
public static void send(IMessage message){ 
String topic= StringUtils.isEmpty(message.getTopic())?TOPIC: message.getTopic(); 
int qos=null == message.getQos()?QOS: message.getQos(); 
String broker=BROKER_PREFIX+ (StringUtils.isEmpty(message.getHost())?BROKER_HOST:message.getHost()); 
int port=null == message.getPort()?PORT:message.getPort(); 
broker+=":"+port; 
String clientId = StringUtils.isEmpty(message.getClientId())?CLIENT_ID:message.getClientId(); 
Map<String,Object> opts=message.getOption(); 
String user=MQ_USER; 
String password=MQ_PASSWORD; 
if(null != opts){ 
if(null != opts.get("userName")){ 
user=opts.get("userName").toString(); 

if(null != opts.get("password")){ 
password=opts.get("password").toString(); 


MemoryPersistence persistence = new MemoryPersistence(); 
try { 
MqttClient sampleClient = new MqttClient(broker, clientId, persistence); 
MqttConnectOptions connOpts = new MqttConnectOptions(); 
connOpts.setUserName(user); 
connOpts.setPassword(password.toCharArray()); 
connOpts.setCleanSession(true); 
sampleClient.connect(connOpts); 
MqttMessage mqm = new MqttMessage(message.getContentBytes()); 
mqm.setQos(qos); 
sampleClient.publish(topic, mqm); 
sampleClient.disconnect(); 
} catch(MqttException me) { 
logger.info("********************* send message exception :"); 
logger.info("********************* reason : " + me.getReasonCode()); 
logger.info("********************* msg : " + me.getMessage()); 
logger.info("********************* loc : " + me.getLocalizedMessage()); 
logger.info("********************* cause : " + me.getCause()); 
logger.info("********************* excep : " + me); 
me.printStackTrace(); 


public static void send(Set<IMessage> set){ 
for(IMessage message:set){ 
send(message); 


}

 

小结

至此,MQTT协议已部署完毕,java端可以发布消息,而javascript端则可以订阅并接收到java端发布的信息。 
本文只是依照官网手册而实现的简单应用,讲解不一定十分准确,有什么不对的地方还请多多指点,更详细的应用请参考官网文档: 
apollo:http://activemq.apache.org/apollo/documentation/user-manual.html 
eclipse paho:https://eclipse.org/paho/