1. 简介

随着技术的快速发展,业务系统规模的不断扩大,分布式系统越来越普及。一个应用往往会部署到多台机器上,在一些业务场景中,为了保证数据的一致性,要求在同一时刻同一任务只在一个节点上运行,保证同一个方法同一时刻只能被一个线程执行。这时候分布式锁就运用而生了。

分布式锁有很多的解决方案。常见的有:

  1. 基于数据库的:悲观锁,乐观锁。

  2. 基于zookeeper的分布式锁。

  3. 本章中讲的基于redis的分布式锁。

2. 超卖

下单减库存是互联网项目中必不可少的环节。然而,如果我么考虑不得当,将会带来很多问题。比如最不能忍受的:超卖

如下代码,一个初始化库存的方法和一个购买图书的方法,我们没有做任何的并发处理,查看下最终结果。

package com.ldx.redisson.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Objects;

/**
 * redis 实现分布式锁
 *
 * @author ludangxin
 * @date 2021/8/15
 */
@Slf4j
@RestController
@RequestMapping("redis")
public class RedisLockTestController {
   @Resource
   private StringRedisTemplate stringRedisTemplate;
   // 商品key
   private static final String KEY = "book";
   // 库存数量
   private static final  Long STOCK = 50L;

   /**
    * 初始化
    */
   @GetMapping("init")
   public String init() {
      stringRedisTemplate.opsForValue().set(KEY, String.valueOf(STOCK));
      return "初始化成功~";
   }

   /**
    * 购买图书
    */
   @GetMapping("buy")
   public String buy() {
      // 获取到当前库存
      String buyBefore = stringRedisTemplate.opsForValue().get(KEY);
      if(Objects.isNull(buyBefore)) {
         log.error("未找到\"{}\"的库存信息~", KEY);
         return "暂未上架~";
      }
      long buyBeforeL = Long.parseLong(buyBefore);
      if(buyBeforeL > 0) {
         // 对库存进行-1操作
         Long buyAfter = stringRedisTemplate.opsForValue().decrement(KEY);
         log.info("剩余图书==={}", buyAfter);
         return "购买成功~";
      }
      else {
         log.info("库存不足~");
         return "库存不足~";
      }
   }
}

启动测试:

​ 这里我们使用jemter来进行并发请求。配置如下:

线程组配置:

请求配置:

请求结果:

只复制了部分日志

​ 通过日志很明显的看到,即使在业务代码中判断了库存 > 0但还是超卖了。

......
2021-08-15 21:01:22.614  INFO 66913 --- [io-8080-exec-30] c.l.r.c.RedisLockTestController          : 库存不足~
2021-08-15 21:01:22.614  INFO 66913 --- [io-8080-exec-99] c.l.r.c.RedisLockTestController          : 剩余图书===-42
2021-08-15 21:01:22.614  INFO 66913 --- [io-8080-exec-29] c.l.r.c.RedisLockTestController          : 库存不足~
2021-08-15 21:01:22.622  INFO 66913 --- [io-8080-exec-89] c.l.r.c.RedisLockTestController          : 剩余图书===-40
2021-08-15 21:01:22.622  INFO 66913 --- [io-8080-exec-90] c.l.r.c.RedisLockTestController          : 剩余图书===-35
2021-08-15 21:01:22.622  INFO 66913 --- [o-8080-exec-135] c.l.r.c.RedisLockTestController          : 库存不足~
2021-08-15 21:01:22.622  INFO 66913 --- [o-8080-exec-177] c.l.r.c.RedisLockTestController          : 库存不足~
2021-08-15 21:01:22.622  INFO 66913 --- [io-8080-exec-92] c.l.r.c.RedisLockTestController          : 剩余图书===-34
2021-08-15 21:01:22.622  INFO 66913 --- [io-8080-exec-86] c.l.r.c.RedisLockTestController          : 剩余图书===-37
2021-08-15 21:01:22.642  INFO 66913 --- [io-8080-exec-11] c.l.r.c.RedisLockTestController          : 库存不足~
2021-08-15 21:01:22.642  INFO 66913 --- [o-8080-exec-115] c.l.r.c.RedisLockTestController          : 库存不足~
2021-08-15 21:01:22.642  INFO 66913 --- [io-8080-exec-72] c.l.r.c.RedisLockTestController          : 剩余图书===-33
2021-08-15 21:01:22.643  INFO 66913 --- [nio-8080-exec-3] c.l.r.c.RedisLockTestController          : 库存不足~

3. redis setnx

主要是用redis的 setnx (set not exists)命令实现分布式锁。

3.1 编写逻辑

在超买的场景中,我们了解了分布式锁的必要性。

上面的场景如果是单机的话,直接使用jvm锁就能解决问题,但是在分布式场景下下jvm锁无法处理。

接下来我们将使用redis命令来解决一下超卖问题。

  1. 新增了锁标识key。

  2. 在进行业务处理之前,给redis中setIfAbsent(LOCK_KEY, clientId, 30, TimeUnit.SECONDS)作为lock。

    LOCK_KEY:锁的标识,比如秒杀的商品id_lock:当对该商品进行秒杀下单时,加锁使其线性执行。

    clientId:当前请求的唯一值,为了在删除锁时进行锁判断。即只能删除自己加的锁。防止误删锁。

    30:失效时间,防止死锁(如果加锁时不设置过期时间,当系统执行完加锁还未进行解锁时系统宕机,那其他节点也无法进行下单,因为锁一直在)。

  3. 解锁逻辑最好放在finally中进行,防止报错导致死锁。

   // 锁标识   
   private static final String LOCK_KEY = "book_lock";

   /**
    * 购买图书
    */
   @GetMapping("buy1")
   public String buy1() {
      String clientId = UUID.randomUUID().toString();
      /*
       * 给redis设置一个key,并设置过期时间防止死锁
       * setIfAbsent(setnx):当key不存在时才设置值
       * flag=true:值设置成功(获取锁) flag=false:设置值失败(获取锁失败)
       */
      Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY, clientId, 30, TimeUnit.SECONDS);
      try {
         if(Objects.nonNull(flag) && flag) {
            String buyBefore = stringRedisTemplate.opsForValue().get(KEY);
            if(Objects.isNull(buyBefore)) {
               log.error("未找到\"{}\"的库存信息~", KEY);
               return "暂未上架~";
            }
            long buyBeforeL = Long.parseLong(buyBefore);
            if(buyBeforeL > 0) {
               Long buyAfter = stringRedisTemplate.opsForValue().decrement(KEY);
               log.info("剩余图书==={}", buyAfter);
               return "购买成功~";
            }
            else {
               log.info("库存不足~");
               return "库存不足~";
            }
         }
         else {
            log.error("获取锁失败~");
         }
      }
      catch(Exception e) {
         e.printStackTrace();
      }
      finally {
         // 防止误删锁
         if(clientId.equals(stringRedisTemplate.opsForValue().get(LOCK_KEY))) {
            stringRedisTemplate.delete(LOCK_KEY);
         }
      }
      return "系统错误~";
   }

3.2 启动测试

​ 启动两个服务,并配置nginx负载均衡。

​ nginx配置如下:

​ jemter配置如下:

启动测试:

​ 部分日志如下:

redis中查看库存:

打完收工~

3.3 小节

这里主要是用了setnx来实现分布式锁。虽然解决了超卖问题,但其中还是有很多缺陷。比如:

  1. 当请求获取锁失败时,能不能尝试重新获取锁或者阻塞等待获取锁,而不是直接返回系统繁忙之类的提示语。
  2. 如果持有锁的请求处理时间超过了设置的超时时间,也就是业务逻辑还没有处理完呢,但锁已经失效了,此时刚好又进来一个请求,又有并发问题了
版权声明:本文为ludangxin原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/ludangxin/p/15145779.html