你的位置:首页 > 数据库

[数据库]Redis学习笔记~分布式的Pub/Sub模式


回到目录

redis的客户端有很多,这次用它的pub/sub发布与订阅我选择了StackExchange.Redis,发布与订阅大家应该很清楚了,首先一个订阅者,订阅一个服务,服务执行一些处理程序(可能是写个日志,插入个数据,发个email)然后当另一个项目的某个业务发布这个服务后,被订阅的程序将会被执行,这个听起来很有意思,redis有好的实现了这个pub/sub功能。

看一下结构图

 

Sub订阅(消息消费者)

对于订阅方,这里类似于一个服务,它会长期运行着,被启动后动态订阅一些服务进来,以便以后再被其它服务调用

       //sub a function in A-project      PubSubManager.Instance.Subscribe("UserLog", (msg) =>      {        //订阅者处理自己的业务逻辑,这相关于队列服务要干的事        Console.WriteLine(msg);      });

Pub发布(消息生产者)

而对于其它项目,如A网站,它可能在被在POST动作后发布这个UserLog的服务,这时上面的订阅方将会消费它(消费者模式),或者服务代码被执行!

     [HttpPost]    public ActionResult Index(string user)    {      PubSubManager.Instance.Publish("UserLog", user + "这个用户提交表单了");      return View();    }

对于一直运行的服务,将会收到来自不同项目的消息,而它只负责消费它!

封装了一些标准的pub/sub方法,它可以有多种实现方法,本例使用redis这个中间件

   /// <summary>  /// 发布订阅的接口规则  /// </summary>  public interface IPubSub  {    /// <summary>    /// 发布,有顺序,对象源是字符串    /// </summary>    /// <param name="channel"></param>    /// <param name="value"></param>    void Publish(string channel, string value);    /// <summary>    /// 订阅,对象源是字符串    /// </summary>    /// <param name="channel"></param>    /// <param name="action"></param>    void Subscribe(string channel, Action<string> action);    /// <summary>    /// 异步发布,无顺序,对象源是字符串    /// </summary>    /// <param name="channel"></param>    /// <param name="value"></param>    void PublishAsync(string channel, string value);    /// <summary>    /// 异步订阅,无顺序,对象源是字符串    /// </summary>    /// <param name="channel"></param>    /// <param name="action"></param>    void SubscribeAsync(string channel, Action<string> action);    /// <summary>    /// 发布,有顺序,对象源是Byte[]    /// </summary>    /// <param name="channel"></param>    /// <param name="value"></param>    void PublishByte(string channel, byte[] value);    /// <summary>    /// 订阅,对象源是Byte[]    /// </summary>    /// <param name="channel"></param>    /// <param name="action"></param>    void SubscribeByte(string channel, Action<byte[]> action);    /// <summary>    /// 异步发布,有顺序,对象源是Byte[]    /// </summary>    /// <param name="channel"></param>    /// <param name="value"></param>    void PublishByteAsync(string channel, byte[] value);    /// <summary>    /// 异步订阅,对象源是Byte[]    /// </summary>    /// <param name="channel"></param>    /// <param name="action"></param>    void SubscribeByteAsync(string channel, Action<byte[]> action);    /// <summary>    /// 发布,有顺序,对象源是泛型对象    /// </summary>    /// <typeparam name="T"></typeparam>    /// <param name="channel"></param>    /// <param name="value"></param>    void Publish<T>(string channel, T value);    /// <summary>    /// 订阅,对象源是泛型对象    /// </summary>    /// <typeparam name="T"></typeparam>    /// <param name="channel"></param>    /// <param name="action"></param>    void Subscribe<T>(string channel, Action<T> action);    /// <summary>    /// 异步发布,有顺序,对象源是泛型对象    /// </summary>    /// <typeparam name="T"></typeparam>    /// <param name="channel"></param>    /// <param name="value"></param>    void PublishAsync<T>(string channel, T value);    /// <summary>    /// 异步订阅,对象源是泛型对象    /// </summary>    /// <typeparam name="T"></typeparam>    /// <param name="channel"></param>    /// <param name="action"></param>    void SubscribeAsync<T>(string channel, Action<T> action);    /// <summary>    /// 取消指定订阅    /// </summary>    /// <param name="channel"></param>    void UnSubscribe(string channel);    /// <summary>    /// 取消所有订阅    /// </summary>    /// <param name="channel"></param>    void UnSubscribeAll();  }

 

回到目录