你的位置:首页 > ASP.net教程

[ASP.net教程]SignalR与ActiveMQ结合构建实时通信


一、概述

本教程主要阐释了如何利用SignalR与消息队列的结合,实现不同客户端的交互

  • SignalR如何和消息队列交互(暂使用ActiveMQ消息队列)
  • SignalR寄宿在web中和其他SignalR、控制台客户端交互。
  • SignalR单独寄宿在控制台中和其他SignalR、控制台客户端交互。

下面屏幕截图展示了各个客户端通过ActiveMQ相互通信

  1、SignalR寄宿在web:

  2、SignalR寄宿在控制台中,web客户端调用SignalR,读者自行测试。

 工程目录:

 

一、创建项目

  1、创建生产者项目,该项目要是通过控制台输入消息,发送到消息队列

    创建控制台应用程序命名为ActiveMQNetProcucer,然后用包管理器安装ActiveMQ的.Net客户端

    Install-Package Apache.NMS.ActiveMQ

    主要代码如下:

 1 using Apache.NMS; 2 using Apache.NMS.ActiveMQ; 3 using System; 4 using System.Collections.Generic; 5 using System.Linq; 6 using System.Text; 7 using System.Threading.Tasks; 8 namespace ActiveMQNet 9 {10   class Program11   {12     static IConnectionFactory _factory = null;13     static IConnection _connection = null;14     static ITextMessage _message = null;15 16     static void Main(string[] args)17     {18       //创建工厂19       _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");20 21       try22       {23         //创建连接24         using (_connection = _factory.CreateConnection())25         {26           //创建会话27           using (ISession session = _connection.CreateSession())28           {29             //创建一个主题30             IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");31 32             //创建生产者33             IMessageProducer producer = session.CreateProducer(destination);34 35             Console.WriteLine("Please enter any key to continue! ");36             Console.ReadKey();37             Console.WriteLine("Sending: ");38 39             //创建一个文本消息40             _message = producer.CreateTextMessage("Hello AcitveMQ....");41 42             //发送消息43             producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);44             while (true)45             {46               var msg = Console.ReadLine();47               _message = producer.CreateTextMessage(msg);48               producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);49             }50            51           }52         }53 54       }55       catch (Exception ex)56       {57         Console.WriteLine(ex.ToString());58       }59 60       Console.ReadLine();61 62     }63   }64 }

View Code

   2、创建消费者项目,该项目主要是订阅消息队列中的消息  

    创建控制台应用程序命名为ActiveMQNetCustomer,然后用包管理器安装ActiveMQ的.Net客户端

    Install-Package Apache.NMS.ActiveMQ

    主要代码:

 1 using Apache.NMS; 2 using Apache.NMS.ActiveMQ; 3 using System; 4 using System.Collections.Generic; 5 using System.Linq; 6 using System.Text; 7 using System.Threading.Tasks; 8  9 namespace ActiveMQNetCustomer10 {11   class Program12   {13     static IConnectionFactory _factory = null;14 15     static void Main(string[] args)16     {17       try18       {19         //创建连接工厂20         _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");21         //创建连接22         using (IConnection conn = _factory.CreateConnection())23         {24           //设置客户端ID25          // conn.ClientId = "Customer";26           conn.Start();27           //创建会话28           using (ISession session = conn.CreateSession())29           {30             //创建主题31             var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");32 33             //创建消费者34             IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false);35 36             //注册监听事件37             consumer.Listener += new MessageListener(consumer_Listener);38 39             //这句代码非常重要,40             //这里没有read方法,Session会话会被关闭,那么消费者将监听不到生产者的消息41             Console.Read();42           }43 44           //关闭连接45           conn.Stop();46           conn.Close();47         }48 49       }50       catch (Exception ex)51       {52         Console.Write(ex.ToString());53       }54 55     }56 57     /// <summary>58     /// 消费监听事件59     /// </summary>60     /// <param name="message"></param>61     static void consumer_Listener(IMessage message)62     {63       ITextMessage msg = (ITextMessage)message;64       Console.WriteLine("Receive: " + msg.Text);65     }66   }67 }

View Code

   3、创建包装ActiveMQ生产者和消费者项目,供SignalR.ActiveMQ.WebHost项目使用,来发布消息和订阅消息

    创建类库项目Signalr.ActiveMQ,然后用包管理器安装ActiveMQ的.Net客户端

    Install-Package Apache.NMS.ActiveMQ

    主要代码;

    生产者类:创建单实例生产者对象调用Send发放,发送消息到ActiveMQ消息队列    

using Apache.NMS;using Apache.NMS.ActiveMQ;using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace Signalr.ActiveMQ{ public class Procucer  {    private IMessageProducer producer;    private static Procucer instance=null;    private Procucer(string customerId,string address)    {      instance = this;      //创建工厂      IConnectionFactory _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");      try      {        //创建连接        IConnection _connection = _factory.CreateConnection();        {          //创建会话          ISession session = _connection.CreateSession();          {            //创建一个主题            IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");            //创建生产者            producer = session.CreateProducer(destination);            Console.WriteLine("Please enter any key to continue! ");           // Console.ReadKey();            Console.WriteLine("Sending: ");                     }        }      }      catch (Exception ex)      {        Console.WriteLine(ex.ToString());      }      //Console.ReadLine();    }    public static Procucer GetInstance(string customerId="",string address= "tcp://127.0.0.1:61616/")    {      if (instance == null)        instance = new Procucer(customerId, address);      return instance;    }    public void Send(string msg)    {      //创建一个文本消息      ITextMessage _message = producer.CreateTextMessage(msg);      //发送消息      producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);    }  }}

View Code

 

     消费者类:启用单独的线程监听消息队列中的消息,当监听到消息后 广播给所有的 SinglaR客户端,其中静态属性Clients保存了所有的SinglaR客户端,当SinglaR客户端连接或者断开的时候会更新Clients属性详细代码在SignalR.ActiveMQ.WebHost中的 MyHub文件中。为了阻止当前线程退出调用了 System.Threading.Thread.CurrentThread.Join();阻塞当前线程,避免当web中方法执行完毕后对象被回收,起不到监听消息队列的作用。

using Apache.NMS;using Apache.NMS.ActiveMQ;using Microsoft.AspNet.SignalR.Hubs;using System;using System.Collections.Generic;using System.Diagnostics;using System.Linq;using System.Web;namespace SignalR.ActiveMQ{  public class Customer  {    private static object lockObj = new object();    private static IHubCallerConnectionContext<dynamic> _clients;    public static IHubCallerConnectionContext<dynamic> Clients    {      get { return _clients; }      set      {        lock (lockObj)        {          _clients = value;        }      }    }    public static void Run(string cutomerId="",string address= "tcp://127.0.0.1:61616/")    {      System.Threading.Thread t = new System.Threading.Thread(() =>      {        try        {          //创建连接工厂          IConnectionFactory _factory = new ConnectionFactory(address);          //创建连接          using (IConnection conn = _factory.CreateConnection())          {            //设置客户端ID            conn.ClientId = cutomerId;            conn.Start();            //创建会话            using (ISession session = conn.CreateSession())            {              //创建主题              var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");              //创建消费者              IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false);              //注册监听事件              consumer.Listener += new MessageListener(consumer_Listener);              //阻塞当前线程,监听消息              System.Threading.Thread.CurrentThread.Join();            }            //关闭连接            conn.Stop();            conn.Close();          }        }        catch (Exception ex)        {          Debug.WriteLine(ex.ToString());          Console.WriteLine(ex.ToString());        }              });      t.Start();    }    static void consumer_Listener(IMessage message)    {      ITextMessage msg = (ITextMessage)message;      if (Clients != null)      {        Clients.All.broadcastMessage(msg.Text);      }      Debug.WriteLine("Receive: " + msg.Text);      Console.WriteLine("Receive: " + msg.Text);    }  }}

View Code

   4、创建web自宿主的SignalR项目,该项目既发布消息,也订阅消息

    创建MVC项目SignalR.ActiveMQ.WebHost,然后用包管理器安装ActiveMQ的.Net客户端

    Install-Package Apache.NMS.ActiveMQ

    创建SignalR的hub:当有客户端连接或者断开的时候更新Customer.Clients 静态属性,保存所有的SignalR客户端。

    web端通过调用代理的Send方法发送消息到消息队列。

using System;using System.Collections.Generic;using System.Linq;using System.Web;using Microsoft.AspNet.SignalR;using Signalr.ActiveMQ;using System.Threading.Tasks;namespace SignalR.ActiveMQ.Sample.Signal.Class{    public class chatHub : Hub  {    public void Send(string clientName, string message)    {      Procucer.GetInstance().Send(message);          }    public override Task OnConnected()    {      Customer.Clients = this.Clients;      return base.OnConnected();    }    public override Task OnDisconnected(bool stopCalled)    {      Customer.Clients = this.Clients;      return base.OnDisconnected(stopCalled);    }  }}

View Code

    Startup类中启动消费者监听线程,调用的项目Signalr.ActiveMQ中的Customer.Run()方法:

using Microsoft.AspNet.SignalR;using Microsoft.Owin;using Owin;using SignalR.ActiveMQ;[assembly: OwinStartupAttribute(typeof(SignalR.ActiveMQ.Sample.Startup))]namespace SignalR.ActiveMQ.Sample{  public partial class Startup  {    public void Configuration(IAppBuilder app)    {           app.MapSignalR();           Customer.Run();//启动消费者监听线程    }  }}

View Code

 二、启动顺序:

1、启动ActiveMQ程序 可参考  http://www.cnblogs.com/xwdreamer/archive/2012/02/21/2360818.html

2、启动ActiveMQNetProcucer项目

3、ActiveMQNetCustomer项目

4、启动SignalR.ActiveMQ.WebHost,开多个浏览器窗口,模拟多个SignalR客户端 

 三、SignalR宿主和web客户端分离两个项目 

Signalr.ActiveMQ.SelfHost 用控制台寄宿SignalR提供的服务供Signalr.ActiveMQ.Web使用

Signalr.ActiveMQ.Web 通过chart.html调用Signalr.ActiveMQ.SelfHost的服务 

Signalr.ActiveMQ.SelfHost 和SignalR.ActiveMQ.WebHost不能同时启动,现在两个项目绑定到了同一个端口。

四、测试

  在生产者窗口中输入消息回车,观察其他客户端的变化

     在Singlar的web客户端发送消息,观察其他客户端的变化

源代码:https://github.com/zhaoyingju/SignalrActiveMQ.git