Redis实战-Redisson-分布式锁
1. 简介
随着技术的快速发展,业务系统规模的不断扩大,分布式系统越来越普及。一个应用往往会部署到多台机器上,在一些业务场景中,为了保证数据的一致性,要求在同一时刻,同一任务只在一个节点上运行,保证同一个方法同一时刻只能被一个线程执行。这时候分布式锁就运用而生了。
分布式锁有很多的解决方案。常见的有:
-
基于数据库的:悲观锁,乐观锁。
-
基于zookeeper的分布式锁。
-
本章中讲的基于redis的分布式锁。
2. 超卖
下单减库存是互联网项目中必不可少的环节。然而,如果我么考虑不得当,将会带来很多问题。比如最不能忍受的:超卖
如下代码,一个初始化库存的方法和一个购买图书的方法,我们没有做任何的并发处理,查看下最终结果。
package com.ldx.redisson.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Objects;
/**
* redis 实现分布式锁
*
* @author ludangxin
* @date 2021/8/15
*/
@Slf4j
@RestController
@RequestMapping("redis")
public class RedisLockTestController {
@Resource
private StringRedisTemplate stringRedisTemplate;
// 商品key
private static final String KEY = "book";
// 库存数量
private static final Long STOCK = 50L;
/**
* 初始化
*/
@GetMapping("init")
public String init() {
stringRedisTemplate.opsForValue().set(KEY, String.valueOf(STOCK));
return "初始化成功~";
}
/**
* 购买图书
*/
@GetMapping("buy")
public String buy() {
// 获取到当前库存
String buyBefore = stringRedisTemplate.opsForValue().get(KEY);
if(Objects.isNull(buyBefore)) {
log.error("未找到\"{}\"的库存信息~", KEY);
return "暂未上架~";
}
long buyBeforeL = Long.parseLong(buyBefore);
if(buyBeforeL > 0) {
// 对库存进行-1操作
Long buyAfter = stringRedisTemplate.opsForValue().decrement(KEY);
log.info("剩余图书==={}", buyAfter);
return "购买成功~";
}
else {
log.info("库存不足~");
return "库存不足~";
}
}
}
启动测试:
这里我们使用jemter
来进行并发请求。配置如下:
线程组配置:
请求配置:
请求结果:
只复制了部分日志
通过日志很明显的看到,即使在业务代码中判断了库存 > 0
但还是超卖了。
......
2021-08-15 21:01:22.614 INFO 66913 --- [io-8080-exec-30] c.l.r.c.RedisLockTestController : 库存不足~
2021-08-15 21:01:22.614 INFO 66913 --- [io-8080-exec-99] c.l.r.c.RedisLockTestController : 剩余图书===-42
2021-08-15 21:01:22.614 INFO 66913 --- [io-8080-exec-29] c.l.r.c.RedisLockTestController : 库存不足~
2021-08-15 21:01:22.622 INFO 66913 --- [io-8080-exec-89] c.l.r.c.RedisLockTestController : 剩余图书===-40
2021-08-15 21:01:22.622 INFO 66913 --- [io-8080-exec-90] c.l.r.c.RedisLockTestController : 剩余图书===-35
2021-08-15 21:01:22.622 INFO 66913 --- [o-8080-exec-135] c.l.r.c.RedisLockTestController : 库存不足~
2021-08-15 21:01:22.622 INFO 66913 --- [o-8080-exec-177] c.l.r.c.RedisLockTestController : 库存不足~
2021-08-15 21:01:22.622 INFO 66913 --- [io-8080-exec-92] c.l.r.c.RedisLockTestController : 剩余图书===-34
2021-08-15 21:01:22.622 INFO 66913 --- [io-8080-exec-86] c.l.r.c.RedisLockTestController : 剩余图书===-37
2021-08-15 21:01:22.642 INFO 66913 --- [io-8080-exec-11] c.l.r.c.RedisLockTestController : 库存不足~
2021-08-15 21:01:22.642 INFO 66913 --- [o-8080-exec-115] c.l.r.c.RedisLockTestController : 库存不足~
2021-08-15 21:01:22.642 INFO 66913 --- [io-8080-exec-72] c.l.r.c.RedisLockTestController : 剩余图书===-33
2021-08-15 21:01:22.643 INFO 66913 --- [nio-8080-exec-3] c.l.r.c.RedisLockTestController : 库存不足~
3. redis setnx
主要是用redis的 setnx (set not exists)命令实现分布式锁。
3.1 编写逻辑
在超买的场景中,我们了解了分布式锁的必要性。
上面的场景如果是单机的话,直接使用jvm锁就能解决问题,但是在分布式场景下下jvm锁无法处理。
接下来我们将使用redis命令来解决一下超卖问题。
-
新增了锁标识key。
-
在进行业务处理之前,给redis中
setIfAbsent(LOCK_KEY, clientId, 30, TimeUnit.SECONDS)
作为lock。LOCK_KEY
:锁的标识,比如秒杀的商品id_lock
:当对该商品进行秒杀下单时,加锁使其线性执行。clientId
:当前请求的唯一值,为了在删除锁时进行锁判断。即只能删除自己加的锁。防止误删锁。30
:失效时间,防止死锁(如果加锁时不设置过期时间,当系统执行完加锁还未进行解锁时系统宕机,那其他节点也无法进行下单,因为锁一直在)。 -
解锁逻辑最好放在
finally
中进行,防止报错导致死锁。
// 锁标识
private static final String LOCK_KEY = "book_lock";
/**
* 购买图书
*/
@GetMapping("buy1")
public String buy1() {
String clientId = UUID.randomUUID().toString();
/*
* 给redis设置一个key,并设置过期时间防止死锁
* setIfAbsent(setnx):当key不存在时才设置值
* flag=true:值设置成功(获取锁) flag=false:设置值失败(获取锁失败)
*/
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY, clientId, 30, TimeUnit.SECONDS);
try {
if(Objects.nonNull(flag) && flag) {
String buyBefore = stringRedisTemplate.opsForValue().get(KEY);
if(Objects.isNull(buyBefore)) {
log.error("未找到\"{}\"的库存信息~", KEY);
return "暂未上架~";
}
long buyBeforeL = Long.parseLong(buyBefore);
if(buyBeforeL > 0) {
Long buyAfter = stringRedisTemplate.opsForValue().decrement(KEY);
log.info("剩余图书==={}", buyAfter);
return "购买成功~";
}
else {
log.info("库存不足~");
return "库存不足~";
}
}
else {
log.error("获取锁失败~");
}
}
catch(Exception e) {
e.printStackTrace();
}
finally {
// 防止误删锁
if(clientId.equals(stringRedisTemplate.opsForValue().get(LOCK_KEY))) {
stringRedisTemplate.delete(LOCK_KEY);
}
}
return "系统错误~";
}
3.2 启动测试
启动两个服务,并配置nginx负载均衡。
nginx配置如下:
jemter配置如下:
启动测试:
部分日志如下:
redis中查看库存:
打完收工~
3.3 小节
这里主要是用了setnx
来实现分布式锁。虽然解决了超卖问题,但其中还是有很多缺陷。比如:
- 当请求获取锁失败时,能不能尝试重新获取锁或者阻塞等待获取锁,而不是直接返回
系统繁忙
之类的提示语。 - 如果持有锁的请求处理时间超过了设置的超时时间,也就是业务逻辑还没有处理完呢,但锁已经失效了,此时刚好又进来一个请求,又有并发问题了