你的位置:首页 > Java教程

[Java教程]ActiveMQ应用


0.下载地址

https://activemq.apache.org/download.html

 

1.解压并启动activemq服务(需根据系统的不同选择不同的启动文件)

/apache-activemq-5.13.1/bin/macosx/activemq start

 

2.登录activemq服务器进行查看

地址:http://localhost:8161/

点击[Manage ActiveMQ broker]登录查看详细数据,默认用户名密码admin/admin

 

 

3.创建eclipse项目

/apache-activemq-5.13.1/lib下倒入所需jar包

 

3.1 通用jms示例

public class Sender {  private static final int SEND_NUMBER=5;    public static void main(String[] args){    ConnectionFactory connectionFactory;    Connection connection =null;    Session session;    Destination destination;    MessageProducer producer;        connectionFactory=new ActiveMQConnectionFactory(        ActiveMQConnection.DEFAULT_USER,        ActiveMQConnection.DEFAULT_PASSWORD,        "tcp://localhost:61616");    try{      connection = connectionFactory.createConnection();      connection.start();            session=connection.createSession(Boolean.TRUE,          Session.AUTO_ACKNOWLEDGE);            destination=session.createQueue("JMeterQueue");            producer=session.createProducer(destination);      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      sendMessage(session,producer);      session.commit();    }catch(Exception e){      e.printStackTrace();    }finally{      try{        if(null!=connection){          connection.close();        }      }catch(Throwable ignore){              }    }  }    public static void sendMessage(Session session,MessageProducer producer) throws JMSException{    for(int i=1;i<SEND_NUMBER;i++){      TextMessage message=session.createTextMessage("ActiveMq send "+i);      System.out.println("ActiveMq send "+i);      producer.send(message);    }  }}

 

public class Receiver {  public static void main(String[] args){    ConnectionFactory connectionFactory ;    Connection connection=null;    Session session;    Destination destination;    MessageConsumer consumer;        connectionFactory = new ActiveMQConnectionFactory(        ActiveMQConnection.DEFAULT_USER,        ActiveMQConnection.DEFAULT_PASSWORD,        "tcp://localhost:61616");    try{      connection = connectionFactory.createConnection();      connection.start();      session=connection.createSession(Boolean.TRUE,           Session.AUTO_ACKNOWLEDGE);      destination=session.createQueue("JMeterQueue");      consumer=session.createConsumer(destination);      while(true){        TextMessage message=(TextMessage)consumer.receive(10000);              if(null !=message){          System.out.println("Message receive "+ message.getText());        }else{          break;        }      }      session.commit();      //session.commit 之后,Messages Enqueued 中的消息才会被被消费掉,Messages Dequeued 才会增加;      //如果不commit,Messages Dequeued会一直为0,每次启动receiver后都会受到所有未消费的消息    }catch (Exception e) {      e.printStackTrace();    } finally {      try {        if (null != connection)          connection.close();      } catch (Throwable ignore) {      }    }  }}

 

3.2 p2p示例

public class QueueSender {    // 发送次数  public static final int SEND_NUM = 5;  // tcp 地址  public static final String BROKER_URL = "tcp://localhost:61616";  // 目标,在ActiveMQ管理员控制台创建   public static final String DESTINATION = "mq.p2p.queue";    public static void run() throws Exception {    QueueConnection connection = null;    QueueSession session = null;    try {      // 创建链接工厂      QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);      // 通过工厂创建一个连接      connection = factory.createQueueConnection();      // 启动连接      connection.start();      // 创建一个session会话      session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);      // 创建一个消息队列      Queue queue = session.createQueue(DESTINATION);      // 创建消息发送者      javax.jms.QueueSender sender = session.createSender(queue);      // 设置持久化模式      sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      sendMessage(session, sender);      // 提交会话      session.commit();          } catch (Exception e) {      throw e;    } finally {      // 关闭释放资源      if (session != null) {        session.close();      }      if (connection != null) {        connection.close();      }    }  }    public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {    for (int i = 0; i < SEND_NUM; i++) {      String message = "发送消息第" + (i + 1) + "条";      Message msg=session.createTextMessage(message);      sender.send(msg);    }  }    public static void main(String[] args) throws Exception {    QueueSender.run();  }}

 

public class QueueReceiver {     // tcp 地址  public static final String BROKER_URL = "tcp://localhost:61616";  // 目标,在ActiveMQ管理员控制台创建   public static final String TARGET = "mq.p2p.queue";    public static void run() throws Exception {    QueueConnection connection = null;    QueueSession session = null;    try {      // 创建链接工厂      QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);      // 通过工厂创建一个连接      connection = factory.createQueueConnection();      // 启动连接      connection.start();      // 创建一个session会话      session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);      // 创建一个消息队列      Queue queue = session.createQueue(TARGET);      // 创建消息制作者      javax.jms.QueueReceiver receiver = session.createReceiver(queue);            receiver.setMessageListener(new MessageListener() {         public void onMessage(Message msg) {           if (msg != null) {            TextMessage map = (TextMessage) msg;            try {              System.out.println(map.getText());            } catch (JMSException e) {              e.printStackTrace();            }          }        }       });       // 休眠100ms再关闭      Thread.sleep(1000 * 20);             // 提交会话      session.commit();          } catch (Exception e) {      throw e;    } finally {      // 关闭释放资源      if (session != null) {        session.close();      }      if (connection != null) {        connection.close();      }    }  }    public static void main(String[] args) throws Exception {    QueueReceiver.run();  }}

 

3.3 订阅示例

public class TopicSender {    // 发送次数  public static final int SEND_NUM = 5;  // tcp 地址  public static final String BROKER_URL = "tcp://localhost:61616";  // 目标,在ActiveMQ管理员控制台创建  public static final String DESTINATION = "mq.topic";    public static void run() throws Exception {    TopicConnection connection = null;    TopicSession session = null;    try {      // 创建链接工厂      TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);      // 通过工厂创建一个连接      connection = factory.createTopicConnection();      // 启动连接      connection.start();      // 创建一个session会话      session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);      // 创建一个消息队列      Topic topic = session.createTopic(DESTINATION);      // 创建消息发送者      TopicPublisher publisher = session.createPublisher(topic);      // 设置持久化模式      publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      sendMessage(session, publisher);      // 提交会话      session.commit();          } catch (Exception e) {      throw e;    } finally {      // 关闭释放资源      if (session != null) {        session.close();      }      if (connection != null) {        connection.close();      }    }  }    public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {    for (int i = 0; i < SEND_NUM; i++) {      String message = "发送消息第" + (i + 1) + "条";      TextMessage msg =session.createTextMessage(message);      publisher.send(msg);    }  }    public static void main(String[] args) throws Exception {    TopicSender.run();  }}

 

public class TopicReceiver {     // tcp 地址  public static final String BROKER_URL = "tcp://localhost:61616";  // 目标,在ActiveMQ管理员控制台创建  public static final String TARGET = "mq.topic";    public static void run() throws Exception {       TopicConnection connection = null;    TopicSession session = null;    try {      // 创建链接工厂      TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);      // 通过工厂创建一个连接      connection = factory.createTopicConnection();      // 启动连接      connection.start();      // 创建一个session会话      session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);      // 创建一个消息队列      Topic topic = session.createTopic(TARGET);      // 创建消息制作者      TopicSubscriber subscriber = session.createSubscriber(topic);      subscriber.setMessageListener(new MessageListener() {         public void onMessage(Message msg) {           System.out.println(msg);        }       });       // 休眠100ms再关闭      Thread.sleep(1000 * 20);             // 提交会话      session.commit();          } catch (Exception e) {      throw e;    } finally {      // 关闭释放资源      if (session != null) {        session.close();      }      if (connection != null) {        connection.close();      }    }  }    public static void main(String[] args) throws Exception {    TopicReceiver.run();  }}