你的位置:首页 > 软件开发 > Java > ActiveMQ(一)初步接触

ActiveMQ(一)初步接触

发布时间:2017-04-03 00:00:31
声明 转载请注明出处! Reprint please indicate the source!http://www.hiknowledge.top/?p=86&preview=true什么是JMSJMS即Java消息服务(Java Message Service)应用 ...

声明 转载请注明出处! Reprint please indicate the source!

http://www.hiknowledge.top/?p=86&preview=true

什么是JMS

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

引用自百度百科

什么是ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

引用自百度百科

官方主页

权威书籍

ActiveMQ in Action

环境配置

Maven依赖

<dependency>  <groupId>org.apache.activemq</groupId>  <artifactId>activemq-all</artifactId>  <version>5.14.4</version></dependency>

启动broker

要运行起来demo得先,启动broker。我是在虚拟机上测试的。ip:192.168.235.100

进入到 执行:

activemq start

测试

访问页面 http://192.168.235.100:8161/

ActiveMQ(一)初步接触

默认用户名/密码:admin/admin

demo

点对点模式

JMSProducer.java

import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * Created with IntelliJ IDEA.<br> * Description: JMS ActiveMQ Demo测试 消息生产者<br> * 运行前,需要打开本地的activemq。 * 如果需要更改broker地址,要提前运行相应的broker。 * User: jahen<br> * Date: 2017-04-02<br> * Time: 11:06<br> */public class JMSProducer {  private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接  private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码//  private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616  private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)  private static final int SENDNUM = 10; // 发送的消息数量  public static void main(String[] args) {    ConnectionFactory connectionFactory; // 连接工程,生产Connection    Connection connection = null; // 连接    Session session; // 会话 接受或者发送消息的线程    Destination destination; // 消息的目的地    MessageProducer messageProducer; // 消息生产者    // 实例化连接工厂    connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);    // 创建连接    try {      connection = connectionFactory.createConnection();      connection.start(); // 启动连接      session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 提交事务,自动确认      destination = session.createQueue("FirstQueue"); // 创建消息队列      messageProducer = session.createProducer(destination); // 创建消息发送者      sendMessage(session, messageProducer); // 发送消息      session.commit(); // 提交事务    } catch (JMSException e) {      e.printStackTrace();    } finally {      if (connection!=null)        try {          connection.close();        } catch (JMSException e) {          e.printStackTrace();        }    }  }  /**   * 发送消息   * @param session 会话   * @param messageProducer 消息生产者   */  private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {    for (int i=0; i<JMSProducer.SENDNUM; i++) {      TextMessage message = session.createTextMessage("ActiveMQ 发送的消息 "+i);      System.out.println("发送消息: ActiveMQ 发送的消息 "+i);      messageProducer.send(message);    }  }}

运行一下JMSProudcer,生产10条消息。

JMSConsumer.java

import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.ActiveMQMessageConsumer;import javax.jms.*;/** * Created with IntelliJ IDEA.<br> * Description: 消息消费者1-点对点模式<br> *   实现方式1 循环检测<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 13:44<br> */public class JMSConsumer {  private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接  private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码  //  private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616  private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)  public static void main(String args[]) {    ConnectionFactory connectionFactory; // 连接工程,生产Connection    Connection connection = null; // 连接    Session session; // 会话 接受或者发送消息的线程    Destination destination; // 消息的目的地    MessageConsumer messageConsumer; // 消息消费者    // 实例化连接工厂    connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);    try {      connection = connectionFactory.createConnection();      connection.start();      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消费消息不需要事务,自动确认      destination = session.createQueue("FirstQueue"); // 创建消息队列      messageConsumer = session.createConsumer(destination); // 创建消息消费者      while (true) {        TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);// 设置延时为100s        if (textMessage!=null) { // 接收到消息          System.out.println("接收的消息:"+textMessage.getText());        }else {          break;        }      }    } catch (JMSException e) {      e.printStackTrace();    }  }}

运行一下JMSConsumer,消费10条消息。

ActiveMQ(一)初步接触

这种方式消费消息,通过循环检查,显然是不高明的。

下面,通过设置监听的方式,实现消息消费。

再次运行一下JMSProudcer,生产10条消息。

ActiveMQ(一)初步接触

首先实现一下**

Listenr.java

import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * Created with IntelliJ IDEA.<br> * Description: 消息监听者<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 14:30<br> */public class Listener implements MessageListener {  @Override  public void onMessage(Message message) {    try {      System.out.println("收到消息:" + ((TextMessage)message).getText());    } catch (JMSException e) {      e.printStackTrace();    }  }}

JMSConsumer2.java

import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * Created with IntelliJ IDEA.<br> * Description: 消息消费者2-点对点模式<br> *   实现方式2 设置监听<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 13:44<br> */public class JMSConsumer2 {  private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接  private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码  //  private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616  private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)  public static void main(String args[]) {    ConnectionFactory connectionFactory; // 连接工程,生产Connection    Connection connection = null; // 连接    Session session; // 会话 接受或者发送消息的线程    Destination destination; // 消息的目的地    MessageConsumer messageConsumer; // 消息消费者    // 实例化连接工厂    connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);    try {      connection = connectionFactory.createConnection();      connection.start();      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消费消息不需要事务,自动确认      destination = session.createQueue("FirstQueue"); // 创建消息队列      messageConsumer = session.createConsumer(destination); // 创建消息消费者      messageConsumer.setMessageListener(new Listener());// 注册消息监听    } catch (JMSException e) {      e.printStackTrace();    }  }}

运行一下JMSConsumer2,新产生的消息被消费了。

ActiveMQ(一)初步接触

消息发布/订阅模式

注意:发布/订阅要先运行订阅,再运行发布才能收到消息。

JMSConsumer.java

package com.jahentao.activemq;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * Created with IntelliJ IDEA.<br> * Description: 消息消费者-发布订阅模式 消息订阅者<br> *   实现方式 设置监听<br> *   消息订阅者1<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 13:44<br> */public class JMSConsumer {  private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接  private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码  //  private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616  private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)  public static void main(String args[]) {    ConnectionFactory connectionFactory; // 连接工程,生产Connection    Connection connection = null; // 连接    Session session; // 会话 接受或者发送消息的线程    Destination destination; // 消息的目的地    MessageConsumer messageConsumer; // 消息消费者    // 实例化连接工厂    connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);    try {      connection = connectionFactory.createConnection();      connection.start();      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消费消息不需要事务,自动确认//      destination = session.createQueue("FirstQueue"); // 创建消息队列      destination = session.createTopic("FirstTopic"); // 创建消息订阅者      messageConsumer = session.createConsumer(destination); // 创建消息消费者      messageConsumer.setMessageListener(new Listener());// 注册消息监听    } catch (JMSException e) {      e.printStackTrace();    }  }}

Listener.java

package com.jahentao.activemq;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * Created with IntelliJ IDEA.<br> * Description: 订阅者1消息**<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 14:46:52<br> */public class Listener implements MessageListener {  @Override  public void onMessage(Message message) {    try {      System.out.println("订阅者一 收到消息:" + ((TextMessage)message).getText());    } catch (JMSException e) {      e.printStackTrace();    }  }}

JMSConsumer2.java

package com.jahentao.activemq;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * Created with IntelliJ IDEA.<br> * Description: 消息消费者-发布订阅模式 消息订阅者<br> *   实现方式 设置监听<br> *   消息订阅者2<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 13:44<br> */public class JMSConsumer2 {  private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接  private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码  //  private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616  private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)  public static void main(String args[]) {    ConnectionFactory connectionFactory; // 连接工程,生产Connection    Connection connection = null; // 连接    Session session; // 会话 接受或者发送消息的线程    Destination destination; // 消息的目的地    MessageConsumer messageConsumer; // 消息消费者    // 实例化连接工厂    connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);    try {      connection = connectionFactory.createConnection();      connection.start();      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消费消息不需要事务,自动确认//      destination = session.createQueue("FirstQueue"); // 创建消息队列      destination = session.createTopic("FirstTopic"); // 创建消息订阅者      messageConsumer = session.createConsumer(destination); // 创建消息消费者      messageConsumer.setMessageListener(new Listener2());// 注册消息监听    } catch (JMSException e) {      e.printStackTrace();    }  }}

Listener2.java

package com.jahentao.activemq;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * Created with IntelliJ IDEA.<br> * Description: 订阅者2消息**<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 14:46:52<br> */public class Listener2 implements MessageListener {  @Override  public void onMessage(Message message) {    try {      System.out.println("订阅者二 收到消息:" + ((TextMessage)message).getText());    } catch (JMSException e) {      e.printStackTrace();    }  }}

首先,分别运行JMSConsumer、JMSConsumer2进行订阅。

ActiveMQ(一)初步接触

JMSProducer.java

package com.jahentao.activemq;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * Created with IntelliJ IDEA.<br> * Description: JMS ActiveMQ Demo测试 发布订阅模式 消息发布者<br> * 运行前,需要打开本地的activemq。 * 如果需要更改broker地址,要提前运行相应的broker。 * User: jahen<br> * Date: 2017-04-02<br> * Time: 14:42:59<br> */public class JMSProducer {  private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接  private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码//  private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616  private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)  private static final int SENDNUM = 10; // 发送的消息数量  public static void main(String[] args) {    ConnectionFactory connectionFactory; // 连接工程,生产Connection    Connection connection = null; // 连接    Session session; // 会话 接受或者发送消息的线程    Destination destination; // 消息的目的地    MessageProducer messageProducer; // 消息生产者    // 实例化连接工厂    connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);    // 创建连接    try {      connection = connectionFactory.createConnection();      connection.start(); // 启动连接      session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 提交事务,自动确认//      destination = session.createQueue("FirstQueue"); // 创建消息队列      destination = session.createTopic("FirstTopic"); // 创建主题      messageProducer = session.createProducer(destination); // 创建消息发送者      sendMessage(session, messageProducer); // 发送消息      session.commit(); // 提交事务    } catch (JMSException e) {      e.printStackTrace();    } finally {      if (connection!=null)        try {          connection.close();        } catch (JMSException e) {          e.printStackTrace();        }    }  }  /**   * 发送消息   * @param session 会话   * @param messageProducer 消息生产者   */  private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {    for (int i = 0; i< JMSProducer.SENDNUM; i++) {      TextMessage message = session.createTextMessage("ActiveMQ 发送的消息 "+i);      System.out.println("发送消息: ActiveMQ 发送的消息 "+i);      messageProducer.send(message);    }  }}

然后运行JMSProducer。

ActiveMQ(一)初步接触

参考

java1234上发布的教程"一头扎进ActiveMQ"

这里学习的源码,托管在码云上


 

海外公司注册、海外银行开户、跨境平台代入驻、VAT、EPR等知识和在线办理:https://www.xlkjsw.com

原标题:ActiveMQ(一)初步接触

关键词:

*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: admin#shaoqun.com (#换成@)。

可能感兴趣文章

我的浏览记录