你的位置:首页 > Java教程

[Java教程]JMS发布/订阅消息传送例子


基于上篇文章"基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送"很容易就可以编写一个发布/订阅消息传送例子,相关环境准备与该篇文章基本类似,主要的区别如下。

在Tomcat中配置JNDI

配置连接工厂和话题

  <Resource name="topic/connectionFactory" auth="Container"    type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory"    factory="org.apache.activemq.jndi.JNDIReferenceFactory"    brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&amp;maxReconnectAttempts=5"    brokerName="LocalActiveMQBroker" useEmbeddedBroker="false" />      <Resource name="topic/topic0"     auth="Container"    type="org.apache.activemq.command.ActiveMQTopic" description="My Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory"    physicalName="TestTopic" />

在Web工厂中编写代码

新建一个发布者Servlet

package pubSub;import java.io.IOException;import java.io.PrintWriter;import javax.naming.InitialContext;import javax.servlet.ServletException;import javax.servlet.annotation.WebServlet;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import javax.jms.Topic;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.TopicPublisher;import javax.jms.DeliveryMode;import javax.jms.TopicSession;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;/** * Servlet implementation class JMSTest */@WebServlet("/Publish")public class Publisher extends HttpServlet {  private static final long serialVersionUID = 1L;  /**   * @see HttpServlet#HttpServlet()   */  public Publisher() {    super();    // TODO Auto-generated constructor stub  }  /**   * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse   *   response)   */  protected void doGet(HttpServletRequest request,      HttpServletResponse response) throws ServletException, IOException {    PrintWriter out = response.getWriter();    try {      // get the initial context      InitialContext ctx = new InitialContext();      // lookup the topic object      Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");      // lookup the topic connection factory      TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx          .lookup("java:comp/env/topic/connectionFactory");      // create a topic connection      TopicConnection topicConn = connFactory.createTopicConnection();      // create a topic session      TopicSession topicSession = topicConn.createTopicSession(false,          Session.AUTO_ACKNOWLEDGE);      // create a topic publisher      TopicPublisher topicPublisher = topicSession.createPublisher(topic);      topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      // create the "Hello World" message      TextMessage message = topicSession.createTextMessage();      message.setText("Hello World");      // publish the messages      topicPublisher.publish(message);      // print what we did      out.write("Message published: " + message.getText());      // close the topic connection      topicConn.close();    } catch (Exception e) {      // TODO Auto-generated catch block      e.printStackTrace();    }  }  /**   * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse   *   response)   */  protected void doPost(HttpServletRequest request,      HttpServletResponse response) throws ServletException, IOException {    // TODO Auto-generated method stub  }}

新建一个订阅者Servlet

package pubSub;import java.io.IOException;import java.io.PrintWriter;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicSession;import javax.jms.TopicSubscriber;import javax.naming.InitialContext;import javax.servlet.ServletException;import javax.servlet.annotation.WebServlet;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;/** * Servlet implementation class Receive */@WebServlet("/Subscribe")public class Subscriber extends HttpServlet {  private static final long serialVersionUID = 1L;  /**   * @see HttpServlet#HttpServlet()   */  public Subscriber() {    super();    // TODO Auto-generated constructor stub  }  /**   * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse   *   response)   */  protected void doGet(HttpServletRequest request,      HttpServletResponse response) throws ServletException, IOException {    PrintWriter out = response.getWriter();    try {      // get the initial context      InitialContext ctx = new InitialContext();      // lookup the topic object      Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");      // lookup the topic connection factory      TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx          .lookup("java:comp/env/topic/connectionFactory");      // create a topic connection      TopicConnection topicConn = connFactory.createTopicConnection();      // create a topic session      TopicSession topicSession = topicConn.createTopicSession(false,          Session.AUTO_ACKNOWLEDGE);      // create a topic subscriber      TopicSubscriber topicSubscriber = topicSession          .createSubscriber(topic);      // start the connection      topicConn.start();      // receive the message      TextMessage message = (TextMessage) topicSubscriber.receive();      // print the message      out.write("Message received: " + message.getText());      // close the topic connection      topicConn.close();    } catch (Exception e) {      // TODO Auto-generated catch block      e.printStackTrace();    }  }  /**   * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse   *   response)   */  protected void doPost(HttpServletRequest request,      HttpServletResponse response) throws ServletException, IOException {    // TODO Auto-generated method stub  }}

运行Web工程,分别打开多个标签访问订阅servlet,然后访问发布servlet,结果如下:

在订阅者订阅消息的时候,一开始没接收到消息,一旦发布者发布消息后,订阅者马上收到消息。

 

代码参考自:http://howtodoinjava.com/jms/jms-publish-subscribe-message-example/