当前位置: 首页 > news >正文

Redis中的分布式锁(步步为营)

分布式锁

概述

分布式锁指的是,所有服务中的所有线程都去获取同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,直到持有锁的线程释放锁。

分布式锁是可以跨越多个实例,多个进程的锁

分布式锁具备的条件:

  • 互斥性:任意时刻,只能有一个客户端持有锁
  • 锁超时释放:持有锁超时,可以释放,防止死锁
  • 可重入性:一个线程获取了锁之后,可以再次对其请求加锁
  • 高可用、高性能:加锁和解锁开销要尽可能低,同时保证高可用
  • 安全性:锁只能被持有该锁的服务(或应用)释放。
  • 容错性:在持有锁的服务崩溃时,锁仍能得到释放,避免死锁。

分布式锁实现方案

分布式锁都是通过第三方组件来实现的,目前比较流行的分布式锁的解决方案有:

  1. 数据库,通过数据库可以实现分布式锁,但是在高并发的情况下对数据库压力较大,所以很少使用。
  2. Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
  3. Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同

Redis实现分布式锁

SETNX

基本方案:Redis提供了setXX指令来实现分布式锁

highlighter-

格式: setnx key value
将key 的值设为value ,当且仅当key不存在。
若给定的 key已经存在,则SETNX不做任何动作。

设置分布式锁后,能保证并发安全,但上述代码还存在问题,如果执行过程中出现异常,程序就直接抛出异常退出,导致锁没有释放造成最终死锁的问题。(即使将锁放在finally中释放,但是假如是执行到中途系统宕机,锁还是没有被成功的释放掉,依然会出现死锁现象)

设置超时时间

highlighter- SQL

SET lock_key unique_value NX PX 10000

但是,即使设置了超时时间后,还存在问题。

假设有多个线程,假设设置锁的过期时间10s,线程1上锁后执行业务逻辑的时长超过十秒,锁到期释放锁,线程2就可以获得锁执行,此时线程1执行完删除锁,删除的就是线程2持有的锁,线程3又可以获取锁,线程2执行完删除锁,删除的是线程3的锁,如此往后,这样就会出问题。

让线程只删除自己的锁

解决办法就是让线程只能删除自己的锁,即给每个线程上的锁添加唯一标识(这里UUID实现,基本不会出现重复),删除锁时判断这个标识:

但上述红框中由于判定和释放锁不是原子的,极端情况下,可能判定可以释放锁,在执行删除锁操作前刚好时间到了,其他线程获取锁执行,前者线程删除锁删除的依然是别的线程的锁,所以要让删除锁具有原子性,可以利用redis事务或lua脚本实现原子操作判断+删除

Redis的单条命令操作是原子性的,但是多条命令操作并不是原子性的,因此Lua脚本实现的就是令Redis的多条命令也实现原子操作

redis事务不是原子操作的,详情请看 Redis的事务

但是,可以利用Redis的事务和watch实现的乐观锁 来监视锁的状态

java

    @RequestMapping(" /deduct_stock")public String deductStock() {String REDIS_LOCK = "good_lock";// 每个人进来先要进行加锁,key值为"good_lock"String value = UUID.randomUUID().toString().replace("-","");try{// 为key加一个过期时间Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);// 加锁失败if(!flag){return "抢锁失败!";}System.out.println( value+ " 抢锁成功");String result = template.opsForValue().get("goods:001");int total = result == null ? 0 : Integer.parseInt(result);if (total > 0) {// 如果在此处需要调用其他微服务,处理时间较长。。。int realTotal = total - 1;template.opsForValue().set("goods:001", String.valueOf(realTotal));System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002");return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002";} else {System.out.println("购买商品失败,服务端口为8002");}return "购买商品失败,服务端口为8002";}finally {// 谁加的锁,谁才能删除// 也可以使用redis事务// https://redis.io/commands/set// 使用Lua脚本,进行锁的删除Jedis jedis = null;try{jedis = RedisUtils.getJedis();String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +"then " +"return redis.call('del',KEYS[1]) " +"else " +"   return 0 " +"end";Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));if("1".equals(eval.toString())){System.out.println("-----del redis lock ok....");}else{System.out.println("-----del redis lock error ....");}}catch (Exception e){}finally {if(null != jedis){jedis.close();}}// redis事务
//            while(true){
//                template.watch(REDIS_LOCK);
//                if(template.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
//                    template.setEnableTransactionSupport(true);
//                    template.multi();
//                    template.delete(REDIS_LOCK);
//                    List<Object> list = template.exec();
//                    if(list == null){
//                        continue;
//                    }
//                }
//                template.unwatch();
//                break;
//            }}}
}

尽管这样,还是会有问题,锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁的释放,但其实此时业务还在执行中,还是应该将业务执行结束之后再释放锁。

续时

因此可以设定,任务不完成,锁就不释放。

可以维护一个定时线程池 ScheduledExecutorService,每隔 2s 去扫描加入队列中的 Task,判断失效时间是否快到了,如果快到了,则给锁续上时间。

那如何判断是否快到失效时间了呢?可以用以下公式:【失效时间】<= 【当前时间】+【失效间隔(三分之一超时)】

java

// 扫描的任务队列
private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
/*** 线程池,维护keyAliveTime*/
private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
{// 两秒执行一次「续时」操作SCHEDULER.scheduleAtFixedRate(() -> {// 这里记得加 try-catch,否者报错后定时任务将不会再执行=-=Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();while (iterator.hasNext()) {RedisLockDefinitionHolder holder = iterator.next();// 判空if (holder == null) {iterator.remove();continue;}// 判断 key 是否还有效,无效的话进行移除if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {iterator.remove();continue;}// 超时重试次数,超过时给线程设定中断if (holder.getCurrentCount() > holder.getTryCount()) {holder.getCurrentTread().interrupt();iterator.remove();continue;}// 判断是否进入最后三分之一时间long curTime = System.currentTimeMillis();boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;if (shouldExtend) {holder.setLastModifyTime(curTime);redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());holder.setCurrentCount(holder.getCurrentCount() + 1);}}}, 0, 2, TimeUnit.SECONDS);
}

Redisson

使用Redis + lua方式可能存在的问题

  1. 不可重入性。同一个线程无法多次获取同一把锁
  2. 不可重试。获取锁只尝试一次就返回false,没有重试机制
  3. 超时释放。锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁的释放,存在安全隐患
  4. 主从一致性。如果Redis是主从集群,主从同步存在延迟,当主机宕机时,从成为了主,但可能存在从此时还未完成同步,因此从上就没有锁标识,此时会出现线程安全问题。

RLock是Redisson分布式锁的最核心接口,继承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson执行异步实现的核心逻辑,也是Netty发挥的主要阵地。

RLock如何加锁解锁,实现可重入性?

从RLock进入,找到RedissonLock类,找到tryLock 方法再继续找到tryAcquireOnceAsync 方法,这是加锁的主要代码(版本不一此处实现有差别,和最新3.15.x有一定出入,但是核心逻辑依然未变。此处以3.13.6为例)

java

// waitTime 等待时间,多久时间内都会在这尝试获取锁
// leaseTime 加锁时是否设置过期时间
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}}

此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事件的续约任务。我们先来看有过期时间tryLockInnerAsync 部分

evalWriteAsync方法是eval命令执行lua的入口

java

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

eval命令执行Lua脚本的地方,此处将Lua脚本展开

lua

-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then -- 新增该锁并且hash中该线程id对应的count置1redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 设置过期时间redis.call('pexpire', KEYS[1], ARGV[1]); return nil; 
end; -- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 线程重入次数++redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; 
end; 
return redis.call('pttl', KEYS[1]);

lua

// keyName
KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId组合的唯一值
ARGV[2] = this.getLockName(threadId)

总共3个参数完成了一段逻辑:

  1. 判断该锁是否已经有对应hash表存在,
    • 没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime
    • 存在对应的hash表:则将该lockName的value执行+1操作,也就是计算进入次数,再设置失效时间leaseTime
  2. 最后返回这把锁的ttl剩余时间

再看看RLock如何解锁?

看unlock方法,同样查找方法名,一路到unlockInnerAsync

java

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}

将lua脚本展开

lua

-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;
end;
-- 存在,计数器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then -- 过期时间重设redis.call('pexpire', KEYS[1], ARGV[2]); return 0; 
else-- 删除并发布解锁消息redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1;
end; 
return nil;

该Lua KEYS有2个Arrays.asList(getName(), getChannelName())

lua

name 锁名称
channelName,用于pubSub发布消息的channel名称

ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)

lua

LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0
internalLockLeaseTime,watchDog配置的超时时间,默认为30s
lockName 这里的lockName指的是uuid和threadId组合的唯一值

具体执行步骤如下:

  1. 如果该锁不存在则返回nil;
  2. 如果该锁存在则将其线程的hash key计数器-1,
  3. 计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;

加锁解锁流程总结如下:

总的来说就是通过Hash类型来存储锁的次数:

RLock的锁重试问题

需要分析的是锁重试的,所以,在使用lock.tryLock()方法的时候,不能用无参的。

java

public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return this.tryLock(waitTime, -1L, unit);
}

在调用tryAcquire方法后,返回了一个Long的ttl

java

 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {return true;} else {time -= System.currentTimeMillis() - current;if (time <= 0L) {this.acquireFailed(waitTime, unit, threadId);return false;} else {//省略

继续跟着代码进去查看,最后会发现,调用tryLockInnerAsync方法。这个方法就是获取锁的Lua脚本的。

java

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

这个lua脚本上面提到了。就是 判断,如果获取到锁,返回一个nil.也就是null。如果没有获取到,就调用 pttl,name。其实就是获取当前name锁的剩余有效期。

获取到ttl。如果返回null说获取锁成功,直接返回true.如果返回的不是null,说明需要进行重试操作了。主要是根据时间进行判断的。经过一系列判断后,do,while是真正执行重试相关逻辑的。如下:

java

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);//如果返回null,说明获取到了锁,直接返回if (ttl == null) {return true;} else {//当前时间与进入方法时的时间进行比较//System.currentTimeMillis() - current表示前面获取锁消耗时间time -= System.currentTimeMillis() - current;time是重试锁的等待时间,if (time <= 0L) {//剩余等待时间,如果剩余等待时间<=0,设置获取锁失败。this.acquireFailed(waitTime, unit, threadId);return false;} else {//再次获取当前时间current = System.currentTimeMillis();//刚刚尝试完获取锁失败,如果继续立即尝试一般是获取不到锁的,因此这里选择订阅的方式//订阅当前锁,在unlock释放锁的时候有个:redis.call('publish', KEYS[2], ARGV[1]); 所以这里就订阅了RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);//进行等待RFuture的结果,等多久?等time的时间if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {//time时间过完了还没有等到锁释放的通知if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {//如果等待超时,就取消订阅this.unsubscribe(subscribeFuture, threadId);}});}this.acquireFailed(waitTime, unit, threadId);//返回获取锁失败return false;} else {//到这里表示在tme时间内获得了释放锁的通知boolean var16;try {//检查之前订阅等待的消耗时间time -= System.currentTimeMillis() - current;if (time <= 0L) {//当前的剩余等待时间this.acquireFailed(waitTime, unit, threadId);boolean var20 = false;return var20;}//这里开始进行重试相关逻辑。主要就是当前时间和进入方法时候的时间进行比较do {long currentTime = System.currentTimeMillis();//这里就是第一次重试ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {//null表示获取锁失败var16 = true;return var16;}//再试一次time -= System.currentTimeMillis() - currentTime;if (time <= 0L) {this.acquireFailed(waitTime, unit, threadId);var16 = false;return var16;}currentTime = System.currentTimeMillis();if (ttl >= 0L && ttl < time) { //也不是一直试,等别人释放((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;} while(time > 0L);//时间还充足,继续等待//时间到期了,还没获取到锁,返回失败this.acquireFailed(waitTime, unit, threadId);var16 = false;} finally {this.unsubscribe(subscribeFuture, threadId);}return var16;}}}
}

主要是do while机制进行锁重试的,while会检查时间是否还充足会继续循环。当然这个循环不是直接while(true)的盲等机制,而是利用信号量和订阅的方式实现的,会等别人释放锁,再进行尝试,这种方式对cpu友好

Redisson的超时续约

跟随tryLock代码,在RedissonLock类中的tryAcquireOnceAsync方法中,会看到如下代码:

java

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {//设置了锁过期时间return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {//leaseTime = -1时,即没有设置了锁过期时间RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),//,默认30秒TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);//ttlRemainingFuture完成以后ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {//没有抛异常if (ttlRemaining) {//获取锁成功this.scheduleExpirationRenewal(threadId);//自动更新续期时间的任务调度}}});return ttlRemainingFuture;}
}
  1. 在使用trylock的时候,如果设置了锁过期时间,就不会执行续命相关逻辑了。
  2. 其中默认的watchdogTimeout时间是30秒。

java

private void scheduleExpirationRenewal(long threadId) {RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();//获取一个entry,将entry放到map里,getEntryName()就是当前锁名称。//放到map里,即一个锁对应一个entryRedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);if (oldEntry != null) {//表示重入的,第二次放oldEntry.addThreadId(threadId);} else {//表示第一次放entry.addThreadId(threadId);this.renewExpiration();//第一次放,进行续约}}

看门狗机制:在获取锁成功以后,开启一个定时任务,每隔一段时间就会去重置锁的超时时间,以确保锁是在程序执行完unlock手动释放的,不会发生因为业务阻塞,key超时而自动释放的情况

到期续约方法:

java

private void renewExpiration() {RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee != null) {       //Timeout定时任务,或者叫周期任务Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());if (ent != null) {Long threadId = ent.getFirstThreadId();if (threadId != null) {//执行续命的操作RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);} else {if (res) {RedissonLock.this.renewExpiration();//再次调用}}});}}}//刷新周期, this.internalLockLeaseTime / 3L, 默认释放时间是30秒,除以3就是每10秒更新一次//续命时间为1/3的过期时间,设置续命单位是秒},this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task);}
}

查看renewExpirationAsync方法源码,其调用了Lua脚本执行续命操作的。

java

protected RFuture<Boolean> renewExpirationAsync(long threadId) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

pexpire重置锁的有效期。

总体逻辑如下:

  1. 开启一个任务,10秒钟后执行
  2. 开始的这个任务中重置有效期。假设设置的是默认30秒,则重置为30秒
  3. 更新后又重复步骤1、2

那么什么时候取消这个续约的任务呢?在释放锁unlock时

java

 public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise();RFuture<Boolean> future = this.unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {//取消这个任务this.cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);} else if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);result.tryFailure(cause);} else {result.trySuccess((Object)null);}});return result;
}

multilock解决主从一致性问题

如果Redis是主从集群,主从同步存在延迟,当主机宕机时,从成为了主,但可能存在从此时还未完成同步,因此从上就没有锁标识,此时会出现并发安全问题。

因此redisson提出来了MutiLock锁,使用这把锁就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

使用multilock()方法。必须在所有的节点都获取锁成功,才算成功。 缺点是运维成本高,实现复杂。

java

@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient2 redissonClient2;
@Resource
private RedissonClient3 redissonClient3;RLock lock = redissonClient.getMultilock(lock1,lock2,lock3)

总结Redisson

Redisson分布式锁解决前三个问题原理

总结Redisson分布式锁原理:

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能来实现等待、唤醒,获取锁失败的重试机制
  • 超时续约:利用watchDog,开启一个定时任务,每隔一段时间(releaseTime/3),重置超时时间。
  • 使用multilock: 多个独立的redis节点,必须在所有节点都获取重入锁,才算获取成功;

redLock

不管是redLock,还是redissonLock,两者底层都是通过相同的lua脚本来加锁、释放锁的,所以,两者只是外部形态的不同,底层是一样的。redLock是继承了redissonMultiLock,大部分的逻辑,都是在redissonMultiLock中去实现的,所以源码部分,大部分都是RedissonMultiLock

原理

  • redLock的使用,需要有奇数台独立部署的Redis节点
  • 在加锁的时候,会分别去N台节点上加锁,如果半数以上的节点加锁成功,就认为当前线程加锁成功

相关文章:

Redis中的分布式锁(步步为营)

分布式锁 概述 分布式锁指的是&#xff0c;所有服务中的所有线程都去获取同一把锁&#xff0c;但只有一个线程可以成功的获得锁&#xff0c;其他没有获得锁的线程必须全部等待&#xff0c;直到持有锁的线程释放锁。 分布式锁是可以跨越多个实例&#xff0c;多个进程的锁 分布…...

Linux下的三种 IO 复用

目录 一、Select 1、函数 API 2、使用限制 3、使用 Demo 二、Poll 三、epoll 0、 实现原理 1、函数 API 2、简单代码模板 3、LT/ET 使用过程 &#xff08;1&#xff09;LT 水平触发 &#xff08;2&#xff09;ET边沿触发 4、使用 Demo 四、参考链接 一、Select 在…...

微服务即时通讯系统的实现(服务端)----(2)

目录 1. 语音识别子服务的实现1.1 功能设计1.2 模块划分1.3 模块功能示意图1.4 接口的实现 2. 文件存储子服务的实现2.1 功能设计2.2 模块划分2.3 模块功能示意图2.4 接口的实现 3. 用户管理子服务的实现3.1 功能设计3.2 模块划分3.3 功能模块示意图3.4 数据管理3.4.1 关系数据…...

数据库原理-期末复习基础知识第二弹

1、数据的逻辑独立性是指 外模式/模式映像 当模式改变的时候&#xff0c;由数据库管理员对各个外模式/模式的映像做出相应改变&#xff0c;使外模式保持不变。由于应用程序是按照外模式进行编写的&#xff0c;故应用程序不必修改&#xff0c;保证了数据与程序的逻辑独立性。 …...

智能云在线编辑网站(完结篇)

开始及初步计划 1.前端tiptip编辑器框架vue3 2.后端Pythonflaskmysql 3.大模型调用&#xff1a;飞桨系列&#xff08;ppasr&#xff09; 前言&#xff1a;以此篇谨记从软件杯到天津生成式ai答辩过程及结束。 『如蚍蜉见青天&#xff0c;双肩难挑日月』&#xff0c;感叹世事多…...

多源传感器构建机器人的Gazebo模型

构建包含GNSS、IMU、LiDAR、Camera传感器的Gazebo模型涉及多个步骤&#xff0c;包括设置工作环境、创建URDF文件、安装必要的Gazebo插件和依赖项。以下是一个详细的步骤指南&#xff0c;帮助你开始构建这个Gazebo模型。 1. 设置工作环境 首先&#xff0c;确保你已经安装了ROS…...

linux中top 命令返回数据解释

当您在 Linux 终端中运行 top 命令时,它会显示一个动态更新的系统状态视图,其中包括许多有关系统性能的数据。下面是对 top 命令返回数据的详细解释: 标题栏 top - 22:46:12 up 2 days, 3:14, 1 user, load average: 0.05, 0.07, 0.09 22:46:12:当前时间。up 2 days, 3:14…...

【Vue3】【Naive UI】<NDropdown>标签

【Vue3】【Naive UI】 标签 基本设置自定义渲染交互事件其他属性 【VUE3】【Naive UI】&#xff1c;NCard&#xff1e; 标签 【VUE3】【Naive UI】&#xff1c;n-button&#xff1e; 标签 【VUE3】【Naive UI】&#xff1c;a&#xff1e; 标签 【VUE3】【Naive UI】&#xff1c…...

ADS学习笔记 7. 超外差接收机设计

基于ADS2023 update2 更多ADS学习笔记&#xff1a;ADS学习笔记 1. 功率放大器设计ADS学习笔记 2. 低噪声放大器设计ADS学习笔记 3. 功分器设计ADS学习笔记 4. 微带分支定向耦合器设计ADS学习笔记 5. 微带天线设计ADS学习笔记 6. 射频发射机设计 目录 -1、射频接收机性能指标…...

新型大语言模型的预训练与后训练范式,阿里Qwen

前言&#xff1a;大型语言模型&#xff08;LLMs&#xff09;的发展历程可以说是非常长&#xff0c;从早期的GPT模型一路走到了今天这些复杂的、公开权重的大型语言模型。最初&#xff0c;LLM的训练过程只关注预训练&#xff0c;但后来逐步扩展到了包括预训练和后训练在内的完整…...

k8s 1.28 二进制安装与部署

第一步 &#xff1a;配置Linux服务器 #借助梯子工具 192.168.196.100 1C8G kube-apiserver、kube-controller-manager、kube-scheduler、etcd、kubectl、haproxy、keepalived 192.168.196.101 1C8G kube-apiserver、kube-controller-manager、kube-scheduler、etcd、kubectl、…...

Ubuntu 常用解压与压缩命令

.zip文件 unzip FileName.zip # 解压 zip DirName.zip DirName # 将DirName本身压缩 zip -r DirName.zip DirName # 压缩&#xff0c;递归处理&#xff0c;将指定目录下的所有文件和子目录一起压缩 zip DirName.zip DirName 行为&#xff1a; 只压缩 DirName 目录本身&#xff…...

使用ECharts创建带百分比标注的环形图

在数据可视化领域&#xff0c;环形图是一种非常有效的图表类型&#xff0c;它能够清晰地展示各部分与整体的关系。今天&#xff0c;我们将通过ECharts来创建一个带百分比标注的环形图&#xff0c;并详细解释如何实现这一效果。 1. 数据准备 首先&#xff0c;我们定义了一些基础…...

lvs虚拟服务器之LVS-NAT模式

一.集群 二.LVS:虚拟服务器:工作在传输层,解决高并发 三.LVS-NAT一.集群1.概念:集群就是一组计算机集群核心&#xff1a;任务调度集群目的提高性能,降低成本,提高可扩展性,增强可靠性集群分类HA&#xff1a;高可用集群&#xff08;High Availability Cluster&#xff09;:避免单…...

虚拟机添加硬盘驱动,Windows 系统添加 VirtIO 驱动(Windows ISO 安装镜像添加驱动)

为什么要在 ISO 镜像里添加驱动&#xff1f;而不是在进系统以后装驱动&#xff1f;一切都是形势所迫。如果你也是爱折腾的人&#xff0c;那么这也会成为一个有用的方案之一。   最近&#xff0c;因为在给公司研究部署 OpenStack&#xff0c;在制作初始镜像&#xff08;也就是…...

20241128解决Ubuntu20.04安装libesd0-dev异常的问题

20241128解决Ubuntu20.04安装libesd0-dev异常的问题 2024/11/28 16:36 缘起&#xff1a;中科创达的高通CM6125开发板的Android10的编译环境需要。 安装异常&#xff1a;rootrootrootroot-X99-Turbo:~$ rootrootrootroot-X99-Turbo:~$ sudo apt-get install libesd0-dev Readi…...

Linux命令进阶·如何切换root以及回退、sudo命令、用户/用户组管理,以及解决创建用户不显示问题和Ubuntu不显示用户名只显示“$“符号问题

目录 1. root用户&#xff08;超级管理员&#xff09; 1.1 用于账户切换的系统命令——su 1.2 退回上一个用户命令——exit 1.3 普通命令临时授权root身份执行——sudo 1.3.1 为普通用户配置sudo认证 2. 用户/用户组管理 2.1 用户组管理 2.2 用户管理 2.2.1 …...

基于链表的基础笔试/面试题

1. 反转链表 问题描述&#xff1a;反转一个单向链表。 示例&#xff1a; 输入&#xff1a;1 → 2 → 3 → 4 → 5 输出&#xff1a;5 → 4 → 3 → 2 → 1 class ListNode {int val;ListNode next;ListNode(int x) {val x;} }public class LinkedList {public ListNode …...

文件比较和文件流

文件比较和文件流 一、文本比较工具 diff1.基本用法1.1输出格式 2.常用选项 二、文件流1.文件的打开模式2.文件流的分类ifstreamofstreamfstrem区别 3.文件流的函数1. 构造函数2. is_open 用于判断文件是否打开3. open4. getline5. close6. get()7. read8. write9. put10. gcou…...

unity如何让一个物体拥有按钮功能

在 Unity 中&#xff0c;要让一个物体&#xff08;例如一个 3D 模型、UI 元素或其他对象&#xff09;变成一个按钮&#xff0c;你需要为它添加交互功能。这通常意味着让物体能够响应点击事件&#xff0c;像 UI 按钮那样触发某些行为。对于 3D 物体&#xff0c;可以通过 射线检测…...

【RISC-V CPU Debug 专栏 1 -- RISC-V debug 规范】

文章目录 RISC-V Debug调试用例支持的功能限制和不包括的内容RISC-V 调试架构的主要组件用户与调试主机调试翻译器调试传输硬件调试传输模块(DTM)调试模块(DM)调试功能触发模块版本介绍RISC-V Debug RISC-V 调试规范为 RISC-V 处理器提供了一套标准化的调试接口和功能,旨…...

【论文阅读】Federated learning backdoor attack detection with persistence diagram

目的&#xff1a;检测联邦学习环境下&#xff0c;上传上来的模型是不是恶意的。 1、将一个模型转换为|L|个PD,&#xff08;其中|L|为层数&#xff09; 如何将每一层转换成一个PD&#xff1f; 为了评估第&#x1d457;层的激活值&#xff0c;我们需要&#x1d450;个输入来获…...

IDEA Maven 打包找不到程序包错误或找不到符号,报错“程序包不存在“

参考文章&#xff1a;https://blog.csdn.net/yueeryuanyi/article/details/14211090 问题&#xff1a;IDEA Maven 打包找不到程序包错误或找不到符号,报错“程序包不存在“编译都没问题 解决思路 – >【清除缓存】 1. 强制刷新Maven缓存 选择 Maven 标签&#xff0c;Exe…...

MySQL数据库做题笔记

题目链接https://leetcode.cn/problems/invalid-tweets-ii/description/https://leetcode.cn/problems/invalid-tweets-ii/description/ # Write your MySQL query statement below SELECT tweet_id FROM Tweets where LENGTH(content)>140 OR (length(content)-length(rep…...

100个python经典面试题详解(新版)

应老粉要求,每晚加餐一个最新面试题 包括Python面试中常见的问题,涵盖列表、元组、字符串插值、比较操作符、装饰器、类与对象、函数调用方式、数据结构操作、序列化、数据处理函数等多个方面。 旨在帮助数据科学家和软件工程师准备面试或提升Python技能。 7、Python面试题…...

Leetcode3232:判断是否可以赢得数字游戏

题目描述&#xff1a; 给你一个 正整数 数组 nums。 Alice 和 Bob 正在玩游戏。在游戏中&#xff0c;Alice 可以从 nums 中选择所有个位数 或 所有两位数&#xff0c;剩余的数字归 Bob 所有。如果 Alice 所选数字之和 严格大于 Bob 的数字之和&#xff0c;则 Alice 获胜。 如…...

Python 爬虫实战基于 Class 的天气查询与反爬虫练习

需求&#xff1a; 要实现一个简单的天气查询爬虫&#xff0c;使用 requests 库来获取网页内容&#xff0c;使用 BeautifulSoup 来解析网页并提取天气信息。以下是一个基本示例&#xff0c;展示了如何抓取天气信息并输出当天的温度和天气状况。 以下是使用 class 类方式实现带有…...

C语言——库函数

常用的函数 https://cplusplus.com/reference/ 没事儿多看看 1 数学函数 #include <math.h> #include <stdio.h> int main() {printf("%lf\n", sqrt(4));//开平方根——>double类型printf("%lf\n", pow(2, 10));//求几次方的——>do…...

软件测试丨Pytest 第三方插件与 Hook 函数

Pytest不仅是一个用于编写简单和复杂测试的框架&#xff0c;还有大量的第三方插件以及灵活的Hook函数供我们使用&#xff0c;这些功能大大增强了其在软件测试中的应用。通过使用Pytest&#xff0c;测试开发变得简便、安全、高效&#xff0c;同时也能帮助我们更快地修复Bug&…...

[ACTF2020 新生赛]BackupFile--详细解析

信息搜集 让我们寻找源文件&#xff0c;目录扫描&#xff1a; 找到了/index.php.bak文件&#xff0c;也就是index.php的备份文件。 后缀名是.bak的文件是备份文件&#xff0c;是文件格式的扩展名。 我们访问这个路径&#xff0c;就会直接下载该备份文件。 我们把.bak后缀删掉…...

ElasticSearch的学习

介绍 ElasticSearch&#xff08;简称ES&#xff09;是一个开源的分布式搜索和数据分析引擎&#xff0c;是用Java开发并且是当前最流行的开源的企业级搜索引擎&#xff0c;能够达到近实时搜索&#xff0c;它专门设计用于处理大规模的文本数据和实现高性能的全文检索。 Elastic…...

机器学习6-梯度下降法

梯度下降法 目的 梯度下降法(Gradient Descent)是一个算法&#xff0c;但不是像多元线性回归那样是一个具体做回归任务的算法&#xff0c;而是一个非常通用的优化算法来帮助一些机器学习算法求解出最优解的&#xff0c;所谓的通用就是很多机器学习算法都是用它&#xff0c;甚…...

算法之旅:LeetCode 拓扑排序由简入繁完全攻略

前言 欢迎来到我的算法探索博客&#xff0c;在这里&#xff0c;我将通过解析精选的LeetCode题目&#xff0c;与您分享深刻的解题思路、多元化的解决方案以及宝贵的实战经验&#xff0c;旨在帮助每一位读者提升编程技能&#xff0c;领略算法之美。 &#x1f449;更多高频有趣Lee…...

vue3项目中使用星火API

在node环境epxress中使用讯飞ai接口进行二次封装&#xff0c;通过ai对话回复提取&#xff0c;获得ai提取的文章摘要 本文章只是简单使用&#xff0c;更复杂功能比如调用星火API制作对话机器人可以查看文档&#xff0c;对于初次使用星火AI接口或许有帮助 讯飞星火大模型API-大模…...

蓝桥杯第 23 场 小白入门赛

一、前言 好久没打蓝桥杯官网上的比赛了&#xff0c;回来感受一下&#xff0c;这难度区分度还是挺大的 二、题目总览 三、具体题目 3.1 1. 三体时间【算法赛】 思路 额...签到题 我的代码 // Problem: 1. 三体时间【算法赛】 // Contest: Lanqiao - 第 23 场 小白入门赛 …...

Cause: java.sql.SQLException: No value specified for parameter 4

问题 执行更新sql时报错&#xff0c;异常栈如下 org.springframework.jdbc.BadSqlGrammarException: ### Error updating database. Cause: java.sql.SQLException: No value specified for parameter 4 ### The error may exist in com/my/mapper/MyMapper.java (best gue…...

第五课 Unity资源导入工作流效率优化(AssetGraph工具)

上期我们学习了简单的animation动画的优化&#xff0c;接下来我们继续资源导入效率的优化 工程目录 首先我们来学习一下工程目录结构及用途 Asset文件夹&#xff1a;用来储存和重用的项目资产 Library文件夹&#xff1a;用来储存项目内部资产数据信息的目录 Packages文件夹…...

create-vue创建vue3项目

create-vue是Vue官方新的脚手架工具 前提条件&#xff1a; 已安装16.0或更高版本的Node.js &#xff08;node -v查看&#xff09; 创建一个Vue应用 npm init vuelatest 这一指令会帮我们安装并执行create-vue cd vue-project npm install —— 安装依赖 npm run dev...

27 基于51单片机的方向盘模拟系统

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于STC89C52单片机&#xff0c;采用两个MPX4115压力传感器作为两路压力到位开关电路&#xff0c; 采用滑动变阻器连接数模转换器模拟重力加速度传感器电路&#xff1b; 一个按键控制LED灯的点亮与…...

HarmonyOS

UIAbility UIAbility 组件是一种包含UI的应用组件&#xff0c;主要用于和用户交互 设计理念&#xff1a;原生支持应用组件的跨端迁移和多端协同、支持多设备和多窗口的形态 UIAbility组件是系统调度的基本单位&#xff0c;为应用提供绘制界面的窗口。 /** 为使应用能够正常使用…...

字符串处理(二)

第1题 篮球比赛 查看测评数据信息 学校举行篮球比赛&#xff0c;请设计一个计分系统统计KIN、WIN两队分数&#xff0c;并输出分数和结果&#xff01; 如果平分就输出‘GOOD’&#xff0c;否则输出获胜队名&#xff01; 输入格式 输入数据共n1行&#xff0c; 第1行n&#xf…...

达梦数据库文件故障的恢复方法

目录 1、概述 1.1 概述 1.2 环境介绍 2、使用备份集的恢复方法 2.1 实验准备 2.2 误删除“用户表空间数据文件” 2.3 误删除SYSTEM.DBF 2.4 误删除ROLL.DBF 2.5 REDO日志文件 3、无备份集的恢复方法 3.1 误删除“表空间数据文件” 3.2误删除控制文件 3.3 误删除RO…...

Redis(5):哨兵

一、作用和架构 1. 作用 在介绍哨兵之前&#xff0c;首先从宏观角度回顾一下Redis实现高可用相关的技术。它们包括&#xff1a;持久化、复制、哨兵和集群&#xff0c;其主要作用和解决的问题是&#xff1a; 1&#xff09;持久化&#xff1a;持久化是最简单的高可用方法(有时甚…...

准确--在 AlmaLinux 9.2 上快速搭建 FTP 服务器

FTP 服务器配置与验证完整步骤 以下内容是针对在 192.168.6.101 配置 FTP 服务器&#xff0c;端口为 59999 的完整详细操作步骤&#xff0c;包括配置与验证。每个步骤都附有详细注释。 配置 FTP 服务器 1. 安装 vsftpd 根据系统类型&#xff0c;执行以下命令安装 FTP 服务&a…...

Monitor 显示器软件开发设计入门二

基础篇--显示驱动方案输出接口介绍 写在前面&#xff1a;首先申明&#xff0c;这篇文章是写给那些初入显示器软件行业的入门者&#xff0c;或是对显示器没有基本知识的小白人员。如您是行业大咖大神&#xff0c;可以绕行&#xff0c;可看后期进阶文章。 上篇介绍了输入接口及相…...

MySQL 数据库学习教程一:开启数据库探索之旅

在当今数字化时代&#xff0c;数据已然成为企业和组织最为宝贵的资产之一。而数据库管理系统则是存储、管理和操作这些数据的核心工具。MySQL 作为一款广泛应用的开源关系型数据库管理系统&#xff0c;以其可靠性、高性能和易用性而备受青睐。如果你渴望踏入数据库领域&#xf…...

课程答疑微信小程序设计与实现

私信我获取源码和万字论文&#xff0c;制作不易&#xff0c;感谢点赞支持。 课程答疑微信小程序设计与实现 摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了课程答疑微信小程序设计与实现的开发全过程。通过分析…...

基于yolov8、yolov5的铝材缺陷检测识别系统(含UI界面、训练好的模型、Python代码、数据集)

摘要&#xff1a;铝材缺陷检测在现代工业生产和质量管理中具有重要意义&#xff0c;不仅能帮助企业实时监控铝材质量&#xff0c;还为智能化生产系统提供了可靠的数据支撑。本文介绍了一款基于YOLOv8、YOLOv5等深度学习框架的铝材缺陷检测模型&#xff0c;该模型使用了大量包含…...

docker 僵尸进程问题

docker僵尸进程 子进程结束后&#xff0c;父进程没有回收该进程资源&#xff08;父进程可能没有wait&#xff09;&#xff0c;子进程残留资源存放与内核中&#xff0c;就变为僵尸进程(zombie) 场景分析&#xff1a;python脚本A中执行B应用&#xff0c;将A部署在docker中&#…...

Web 毕设篇-适合小白、初级入门练手的 Spring Boot Web 毕业设计项目:电影院后台管理系统(前后端源码 + 数据库 sql 脚本)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 项目介绍 2.0 用户登录功能 3.0 用户管理功能 4.0 影院管理功能 5.0 电影管理功能 6.0 影厅管理功能 7.0 电影排片管理功能 8.0 用户评论管理功能 9.0 用户购票功…...