你的位置:首页 > ASP.net教程

[ASP.net教程]Redis分布式锁学习


最近研究了一下Redis分布锁,在github上搜了一下 
就拿star最多的 https://github.com/KidFashion/redlock-cs来参考。。

但是相比Redisson(java的实现)有以下几点不足:


1.当一个资源解锁的时候,释放锁之后,其他之前等待的锁不能自动重试申请锁。
2.如果设置了一个超时时间,但是确实执行时间超过了超时时间,那么锁会被自动释放,原来持锁的客户端再次解锁的时候会出现问题

  也要保证一旦一个客户端持锁,在客户端可用时不会被其他客户端解锁。
3.如果关键业务,可能需要重入场景,无法实现重入锁。
 

于是我仿照Redisson在c#的实现
测试一下:
TODO:接收事件去重新获取锁 


public class RedisLock{     #region Property    //KEYS[1] :需要加锁的key,这里需要是字符串类型。    //ARGV[1] :锁的超时时间,防止死锁    //ARGV[2] :锁的唯一标识    private const String LOCK_SCRIPT = @"                if (redis.call('exists', KEYS[1]) == 0) then                     redis.call('hset', KEYS[1], ARGV[2], 1);                    redis.call('pexpire', KEYS[1], ARGV[1]);                    return nil;                end;                if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then                     redis.call('hincrby', KEYS[1], ARGV[2], 1);                    redis.call('pexpire', KEYS[1], ARGV[1]);                    return nil;                end;                return redis.call('pttl', KEYS[1]);            ";     //– KEYS[1] :需要加锁的key,这里需要是字符串类型。    //– KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}”    //– ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。    //– ARGV[2] :锁的超时时间,防止死锁    //– ARGV[3] :锁的唯一标识    private const String UN_LOCK_SCRIPT = @"                if (redis.call('exists', KEYS[1]) == 0) then                     redis.call('publish', KEYS[2], ARGV[1]);                     return 1;                end;                if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then                     return nil;                end;                local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);                if (counter > 0) then                    redis.call('pexpire', KEYS[1], ARGV[2]);                     return 0;                else                    redis.call('del', KEYS[1]);                    redis.call('publish', KEYS[2], ARGV[1]);                    return 1;                end;                return nil;            ";    private const double ClockDriveFactor = 0.01;     /// <summary>    /// 默认的30秒过期时间    /// </summary>    private readonly TimeSpan LeaseTimeSpan = new TimeSpan(0, 0, 30);     private readonly ConcurrentDictionary<string, CancellationTokenSource> ExpirationRenewalMap =  new ConcurrentDictionary<string, CancellationTokenSource>();     private readonly ConnectionMultiplexer Server;     #endregion     #region Constructor     public RedisLock(ConnectionMultiplexer connection)    {        Server = connection;        Server.PreserveAsyncOrder = false;    }     #endregion     #region Public     /// <summary>    /// 加锁    /// </summary>    /// <param name="resource">锁名</param>    /// <param name="waitTimeSpan">如果没有锁成功,允许动重试申请锁的最大时长</param>    /// <param name="leaseTimeSpan">如果锁成功,对于锁(key)的过期时间</param>    /// <param name="lockObject">锁成功信息包装成对象返回</param>    /// <returns>true:成功</returns>    public bool TryLock(RedisKey resource, TimeSpan waitTimeSpan, TimeSpan leaseTimeSpan, out Lock lockObject)    {        lockObject = null;        try        {            var startTime = DateTime.Now;            var val = CreateUniqueLockId();            //申请锁,返回还剩余的锁过期时间            var ttl = tryAcquire(resource, val, leaseTimeSpan);            var drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);            var validity_time = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);             // 如果为空,表示申请锁成功            if (ttl.IsNull)            {                lockObject = new Lock(resource, val, validity_time);                //开始一个调度程序                ScheduleExpirationRenewal(leaseTimeSpan, lockObject);                return true;            }            // 订阅监听redis消息            Subscriber(resource);             startTime = DateTime.Now;            while (true)            {                 // 再次尝试一次申请锁                ttl = tryAcquire(resource, val, leaseTimeSpan);                // 获得锁,返回                if (ttl.IsNull)                {                    lockObject = new Lock(resource, val, validity_time);                    ScheduleExpirationRenewal(leaseTimeSpan, lockObject);                    return true;                }                 drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);                validity_time = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);                if (validity_time.TotalMilliseconds < 0)                {                    //说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。                    Console.WriteLine("已经超过了客户端设置的最大wait time,Thread ID:" + Thread.CurrentThread.ManagedThreadId);                    return false;                }            }        }        catch (Exception)        {            return false;        }        finally        {            // 无论是否获得锁,都要取消订阅解锁消息            UnSubscriber(resource);        }     }       /// <summary>    /// 解锁    /// </summary>    /// <param name="lockObject">锁成功的返回对象</param>    /// <returns></returns>    public RedisResult UnLock(Lock lockObject)    {        if (lockObject == null) return null;        CancelExpirationRenewal(lockObject);        RedisKey[] key = { lockObject.Resource, GetChannelName(lockObject.Resource) };        RedisValue[] values = { Thread.CurrentThread.ManagedThreadId, 10000, lockObject.Value };        return Server.GetDatabase().ScriptEvaluate(UN_LOCK_SCRIPT, key, values);    }     #endregion     #region Private    private void Subscriber(RedisKey resource)    {        Console.WriteLine("Thread ID:" + Thread.CurrentThread.ManagedThreadId + " 订阅广播");        var aa = Thread.CurrentThread.ManagedThreadId;        ISubscriber sub = Server.GetSubscriber();        sub.Subscribe(GetChannelName(resource), (channel, message) =>        {            Console.WriteLine("Thread ID:" + aa + ",收到广播:Thread ID:" + message + " 已解锁");        });     }     private void UnSubscriber(RedisKey resource)    {        ISubscriber sub = Server.GetSubscriber();        sub.Unsubscribe(GetChannelName(resource));    }     private string GetChannelName(RedisKey resource)    {        return "redisson_lock__channel__{" + resource.ToString() + "}";    }     private RedisResult tryAcquire(RedisKey resource, string value, TimeSpan? leaseTimeSpan)    {        if (leaseTimeSpan != null)        {            return LockInnerAsync(resource, leaseTimeSpan.Value, value);        }        else        {            return LockInnerAsync(resource, value);        }    }      private RedisResult LockInnerAsync(RedisKey resource, TimeSpan waitTime, string threadId)    {        RedisKey[] key = { resource };        RedisValue[] values = { waitTime.TotalMilliseconds, threadId };        return Server.GetDatabase().ScriptEvaluate(LOCK_SCRIPT, key, values);    }     private RedisResult LockInnerAsync(RedisKey resource, string threadId)    {        var task = LockInnerAsync(resource, this.LeaseTimeSpan, threadId);        return task;    }     protected static string CreateUniqueLockId()    {        return string.Concat(Guid.NewGuid().ToString(), Thread.CurrentThread.ManagedThreadId);    }     protected void setTimeOut(ElapsedEventHandler doWork, int time)     {        System.Timers.Timer timer = new System.Timers.Timer();        timer.Interval = time;        timer.Elapsed += delegate (object s, ElapsedEventArgs args) { timer.Stop(); };        timer.Elapsed += doWork;        timer.Start();    }     protected CancellationTokenSource TaskTimeOut(Func<Lock, bool> action, Lock lockObj, int time)    {        var timeoutCancellationTokenSource = new CancellationTokenSource();        var task = Task.Run(() =>        {            while (!timeoutCancellationTokenSource.IsCancellationRequested)            {                Thread.Sleep(time);                if (action(lockObj))                {                    Console.WriteLine("锁:" + lockObj.Value + " 重置过期时间");                }             }         }, timeoutCancellationTokenSource.Token);        return timeoutCancellationTokenSource;    }    private void ScheduleExpirationRenewal(TimeSpan leaseTimeSpan, Lock lockObject)    {        ScheduleExpirationRenewal((lockObj) => Server.GetDatabase().KeyExpire(lockObj.Resource, leaseTimeSpan), lockObject,            Convert.ToInt32(leaseTimeSpan.TotalMilliseconds) / 3);    }     private void ScheduleExpirationRenewal(Func<Lock, bool> action, Lock lockObj, int time)    {        // 保证任务不会被重复创建        if (ExpirationRenewalMap.ContainsKey(lockObj.Resource))        {            return;        }         var task = TaskTimeOut(action, lockObj, time);         //如果已经存在,停止任务,也是为了在极端的并发情况下,保证任务不会被重复创建        if (!ExpirationRenewalMap.TryAdd(lockObj.Resource, task))        {            task.Cancel();        }     }     private void CancelExpirationRenewal(Lock lockObj)    {        CancellationTokenSource task;        if (ExpirationRenewalMap.TryRemove(lockObj.Resource, out task))        {            if (task != null)            {                task.Cancel();            }        }     }     #endregion}