使用redis作为消息队列的用法(redisson中的看门狗机制总结)

1:普通的Redis分布式锁的缺陷 我们在网上看到的redis分布式锁的工具方法,大都满足互斥、防止死锁的特性,有些工具方法会满足可重入特性。如果只满足上述3种特性会有哪些隐患呢?redis分布式锁无法自动续期,比如,一个锁设置了1分钟超时释放,如果拿到这个锁的线程在一分钟内没有执行完毕,那么这个锁就会被其他线程拿到,可能会导致严重的线上问题,我已经在秒杀系统故障排查文章中,看到好多因为这个缺陷导致的超卖了。

使用redis作为消息队列的用法(redisson中的看门狗机制总结)(1)

redisson 锁的加锁机制如上图所示,线程去获取锁,获取成功则执行lua脚本,保存数据到redis数据库。如果获取失败: 一直通过while循环尝试获取锁(可自定义等待时间,超时后返回失败),获取成功后,执行lua脚本,保存数据到redis数据库。Redisson提供的分布式锁是支持锁自动续期的,也就是说,如果线程仍旧没有执行完,那么redisson会自动给redis中的目标key延长超时时间,这在Redisson中称之为 Watch Dog 机制。同时 redisson 还有公平锁、读写锁的实现。

使用redis作为消息队列的用法(redisson中的看门狗机制总结)(2)

public void test() throws Exception{ Rlock lock = redissonClient.getLock("guodong"); // 拿锁失败时会不停的重试 // 具有Watch Dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s lock.lock(); // 尝试拿锁10s后停止重试,返回false 具有Watch Dog 自动延期机制 默认续30s boolean res1 = Lock.tryLock(10, TimeUnit.SECONDS); // 没有Watch Dog ,10s后自动释放 lock.lock(10, TimeUnit.SECONDS); // 尝试拿锁100s后停止重试,返回false 没有Watch Dog ,10s后自动释放 boolean res2 = lock.tryLock(100, 10, TimeUnit.SECONDS); Thread.sleep(40000L); lock.unlock(); }

使用redis作为消息队列的用法(redisson中的看门狗机制总结)(3)

2.Wath Dog的自动延期机制 如果拿到分布式锁的节点宕机,且这个锁正好处于锁住的状态时,会出现锁死的状态,为了避免这种情况的发生,锁都会设置一个过期时间。这样也存在一个问题,假如一个线程拿到了锁设置了30s超时,在30s后这个线程还没有执行完毕,锁超时释放了,就会导致问题,Redisson给出了自己的答案,就是 watch dog 自动延期机制。Redisson提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期,也就是说,如果一个拿到锁的线程一直没有完成逻辑,那么看门狗会帮助线程不断的延长锁超时时间,锁不会因为超时而被释放。默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。另外Redisson 还提供了可以指定leaseTime参数的加锁方法来指定加锁的时间。超过这个时间后锁便自动解开了,不会延长锁的有效期。3.源码解读 其实要想对一个框架深刻的了解,主要还是多看源码,目前的Redisson的源码版本基于:3.16.4,同时需要注意的是:
  • watchDog 只有在未显示指定加锁时间(leaseTime)时才会生效。(这点很重要)
  • lockWatchdogTimeout设定的时间不要太小 ,比如我之前设置的是 100毫秒,由于网络直接导致加锁完后,watchdog去延期时,这个key在redis中已经被删除了。

在调用lock方法时,会最终调用到tryAcquireAsync。调用链为:lock()->tryAcquire->tryAcquireAsync,详细解释如下:

使用redis作为消息队列的用法(redisson中的看门狗机制总结)(4)

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; //如果指定了加锁时间,会直接去加锁 if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.eval_LONG); } else { //没有指定加锁时间 会先进行加锁,并且默认时间就是 LockWatchdogTimeout的时间 //这个是异步操作 返回RFuture 类似netty中的future ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } //这里也是类似netty Future 的addListener,在future内容执行完成后执行 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { // leaseTime不为-1时,不会自动延期 if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { //这里是定时执行 当前锁自动延期的动作,leaseTime为-1时,才会自动延期 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }

使用redis作为消息队列的用法(redisson中的看门狗机制总结)(5)

scheduleExpirationRenewal 中会调用renewExpiration。 这里我们可以看到是,启用了一个timeout定时,去执行延期动作

使用redis作为消息队列的用法(redisson中的看门狗机制总结)(6)

private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " getRawName() " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { //如果 没有报错,就再次定时延期 // reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); } // 这里我们可以看到定时任务 是 lockWatchdogTimeout 的1/3时间去执行 renewExpirationAsync }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }

使用redis作为消息队列的用法(redisson中的看门狗机制总结)(7)

最终 scheduleExpirationRenewal会调用到 renewExpirationAsync,执行下面这段 lua脚本。他主要判断就是 这个锁是否在redis中存在,如果存在就进行 pexpire 延期。

使用redis作为消息队列的用法(redisson中的看门狗机制总结)(8)

protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " "redis.call('pexpire', KEYS[1], ARGV[1]); " "return 1; " "end; " "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }

使用redis作为消息队列的用法(redisson中的看门狗机制总结)(9)

4.结论

  • watch dog 在当前节点存活时每10s给分布式锁的key续期 30s;
  • watch dog 机制启动,且代码中没有释放锁操作时,watch dog 会不断地给锁续期;
  • 如果程序释放锁操作时因为异常没有被执行,那么锁无法被释放,所以释放锁操作一定要放到 finally {} 中;
  • 要使 watchLog机制生效 ,lock时 不要设置 过期时间
  • watchlog的延时时间 可以由 lockWatchdogTimeout指定默认延时时间,但是不要设置太小。如100
  • watchdog 会每 lockWatchdogTimeout/3时间,去延时。
  • watchdog 通过 类似netty的 Future功能来实现异步延时
  • watchdog 最终还是通过 lua脚本来进行延时

原文链接:https://www.cnblogs.com/jelly12345/p/14699492.html

使用redis作为消息队列的用法(redisson中的看门狗机制总结)(10)

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页