分布式锁的三种实现方式 - Tom-shushu
一、基本概念
1、引入
新的阅读体验:http://www.zhouhong.icu/post/143
本篇文章所有代码:https://github.com/Tom-shushu/Distributed-system-learning-notes/
2、互联网系统架构的演进
3、单体应用锁的局限性
4、什么是分布式锁
5、分布式锁的设计思路
6、目前存在的分布式的方案
- 数据库,通过数据库可以实现分布式锁,但是在高并发的情况下对数据库压力较大,所以很少使用。
- Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
- Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同。
二、电商平台中针对超卖的解决思路
① 单体架构下针对超卖的解决方案
1、超卖现象一
2、超卖现象二
3、具体代码实现
- 创建数据库表
CREATE DATABASE /*!32312 IF NOT EXISTS*/`distribute` /*!40100 DEFAULT CHARACTER SET utf8mb4 */; USE `distribute`; DROP TABLE IF EXISTS `order`; CREATE TABLE `order` ( `id` int(11) NOT NULL AUTO_INCREMENT, `order_status` int(1) NOT NULL, `receiver_name` varchar(255) NOT NULL, `receiver_mobile` varchar(11) NOT NULL, `order_amount` decimal(11,0) NOT NULL, `create_time` time NOT NULL, `create_user` varchar(255) NOT NULL, `update_time` time NOT NULL, `update_user` varchar(255) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8mb4; DROP TABLE IF EXISTS `order_item`; CREATE TABLE `order_item` ( `id` int(11) NOT NULL AUTO_INCREMENT, `order_id` int(11) NOT NULL, `product_id` int(11) NOT NULL, `purchase_price` decimal(11,0) NOT NULL, `purchase_num` int(3) NOT NULL, `create_time` time NOT NULL, `create_user` varchar(255) NOT NULL, `update_time` time NOT NULL, `update_user` varchar(255) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4; DROP TABLE IF EXISTS `product`; CREATE TABLE `product` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT \'id\', `product_name` varchar(255) NOT NULL COMMENT \'商品名称\', `price` decimal(11,0) NOT NULL COMMENT \'价格\', `count` int(5) NOT NULL COMMENT \'库存\', `product_desc` varchar(255) NOT NULL COMMENT \'描述\', `create_time` time NOT NULL COMMENT \'创建时间\', `create_user` varchar(255) NOT NULL COMMENT \'创建人\', `update_time` time NOT NULL COMMENT \'更新时间\', `update_user` varchar(255) NOT NULL COMMENT \'更新人\', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=100101 DEFAULT CHARSET=utf8mb4; insert into `product`(`id`,`product_name`,`price`,`count`,`product_desc`,`create_time`,`create_user`,`update_time`,`update_user`) values (100100,\'测试商品\',\'1\',1,\'测试商品\',\'18:06:00\',\'周红\',\'19:19:21\',\'xxx\'); /**后续分布式锁需要用到**/ DROP TABLE IF EXISTS `distribute_lock`; CREATE TABLE `distribute_lock` ( `id` int(11) NOT NULL AUTO_INCREMENT, `business_code` varchar(255) NOT NULL COMMENT \'根据业务代码区分,不同业务使用不同锁\', `business_name` varchar(255) NOT NULL COMMENT \'注释,标记编码用途\', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4; insert into `distribute_lock`(`id`,`business_code`,`business_name`) values (1,\'demo\',\'test\');
- 订单创建,库存减1等主要逻辑代码(这里使用 ReentrantLock 当然,也可以使用其他锁 )
// 注意:这边不能使用注解的方式回滚,不然会在事务提交前下一个线程会进来 // @Transactional(rollbackFor = Exception.class) public Integer createOrder() throws Exception{ Product product = null; lock.lock(); try { // 开启事务 TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); // 查询到所要购买的商品 product = productMapper.selectByPrimaryKey(purchaseProductId); if (product==null){ platformTransactionManager.rollback(transaction1); throw new Exception("购买商品:"+purchaseProductId+"不存在"); } // 获取商品当前库存 Integer currentCount = product.getCount(); System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount); // 校验库存 (购买商品数量大于库存数量,抛出异常) if (purchaseProductNum > currentCount){ platformTransactionManager.rollback(transaction1); throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买"); } productMapper.updateProductCount(purchaseProductNum,"xxx",new Date(),product.getId()); platformTransactionManager.commit(transaction1); }finally { lock.unlock(); } // 创建订单 TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition); Order order = new Order(); order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum))); order.setOrderStatus(1);//待处理 order.setReceiverName("xxx"); order.setReceiverMobile("15287653421"); order.setCreateTime(new Date()); order.setCreateUser("不不不不"); order.setUpdateTime(new Date()); order.setUpdateUser("哈哈哈哈"); orderMapper.insertSelective(order); // 创建订单明细 OrderItem orderItem = new OrderItem(); orderItem.setOrderId(order.getId()); orderItem.setProductId(product.getId()); orderItem.setPurchasePrice(product.getPrice()); orderItem.setPurchaseNum(purchaseProductNum); orderItem.setCreateUser("不不不"); orderItem.setCreateTime(new Date()); orderItem.setUpdateTime(new Date()); orderItem.setUpdateUser("哈哈哈哈"); orderItemMapper.insertSelective(orderItem); // 事务提交 platformTransactionManager.commit(transaction); return order.getId(); }
- 测试(使用五个线程同时并发的下单)
@Test public void concurrentOrder() throws InterruptedException { Thread.sleep(60000); CountDownLatch cdl = new CountDownLatch(5); CyclicBarrier cyclicBarrier = new CyclicBarrier(5); // 创建5个线程执行下订单操作 ExecutorService es = Executors.newFixedThreadPool(5); for (int i =0;i<5;i++){ es.execute(()->{ try { // 等5个线程同时达到 await()时再执行创建订单服务,这时候5个线程会堆积到同一时间执行 cyclicBarrier.await(); Integer orderId = orderService.createOrder(); System.out.println("订单id:"+orderId); } catch (Exception e) { e.printStackTrace(); }finally { // 每个线程执行完成之后会减一 cdl.countDown(); } }); } cdl.await(); es.shutdown(); }
② 分布式架构下分布式锁的实现
一、基于数据库实现分布式锁
- 在mapper.xml 里面加入如下自定义的SQL
<select id="selectDistributeLock" resultType="com.example.distributelock.model.DistributeLock"> select * from distribute_lock where business_code = #{businessCode,jdbcType=VARCHAR} for update </select>
- 主要的逻辑实现
@RequestMapping("singleLock") /** * 没有添加 Transactional 注解前,查询分布式锁和sleep二者不是原子操作,在获取到分布式锁后自动提交事务, * 故不会阻止第二个请求获取锁。添加了注解后,在sleep结束前,事务一直未提交,故会等待sleep结束后再行提交事务, * 此时第二个请求才能从数据库中获取分布式锁 */ @Transactional(rollbackFor = Exception.class) public String singleLock() throws Exception { log.info("我进入了方法!"); DistributeLock distributeLock = distributeLockMapper.selectDistributeLock("demo"); if (distributeLock==null) throw new Exception("分布式锁找不到"); log.info("我进入了锁!"); try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } return "我已经执行完成!"; }
- 另一个项目和这个相同,只需要更改端口号即可
- 简单方便,易于理解,易于操作
- 并发量大,对数据库压力较大
- 作为锁的数据库与业务数据库分开
二、基于Redis的SetNX实现分布式锁
① 获取锁的Redis命令
- Set resource_name my_random_value NX PX 30000
- resource_name:资源名称,可根据不同的业务区分不同的锁
- my_random_value:随机值,每个线程的随机值都不相同,用于释放锁时的校验
- NX: key不存在是设置成功,key存在则设置不成功
- PX:自动失效时间,出现异常情况,锁可以过期失效
② 实现原理
- 利用NX的原子性,多个线程并发时,只有一个线程可以设置成功
- 设置成功即获得锁,可以执行后续的业务处理
- 如果出现异常,过了锁的有效期,锁自动释放。
- 释放锁采用了Redis的delete命令
- 释放锁时校验值钱设置的随机数,相同才能释放
- 释放锁的LUA脚本
if redis.call("get",KEYS[1])==argv[1] then return redis.call("del",KEYS[1]) else return 0 end
③ 为什么要添加LUA脚本校验:
④ Redis分布式锁关键代码封装
package com.example.distributelock.lock; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.data.redis.core.types.Expiration; import java.util.Arrays; import java.util.List; import java.util.UUID; @Slf4j public class RedisLock implements AutoCloseable { private RedisTemplate redisTemplate; private String key; private String value; // 过期时间 单位:秒 private int expireTime; public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){ this.redisTemplate = redisTemplate; this.key = key; this.expireTime=expireTime; this.value = UUID.randomUUID().toString(); } /** * 获取分布式锁 * @return */ public boolean getLock(){ RedisCallback<Boolean> redisCallback = connection -> { //设置NX RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent(); //设置过期时间 Expiration expiration = Expiration.seconds(expireTime); //序列化key byte[] redisKey = redisTemplate.getKeySerializer().serialize(key); //序列化value byte[] redisValue = redisTemplate.getValueSerializer().serialize(value); //执行setnx操作 Boolean result = connection.set(redisKey, redisValue, expiration, setOption); return result; }; //获取分布式锁 Boolean lock = (Boolean)redisTemplate.execute(redisCallback); return lock; } // 释放锁 public boolean unLock() { String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class); List<String> keys = Arrays.asList(key); Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value); log.info("释放锁的结果:"+result); return result; } @Override public void close() throws Exception { unLock(); } }
@RequestMapping("redisLock") public String redisLock(){ log.info("我进入了方法!"); try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){ if (redisLock.getLock()) { log.info("我进入了锁!!"); Thread.sleep(15000); } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } log.info("方法执行完成"); return "方法执行完成"; }
⑥ 在项目中使用ESJOB定时任务没问题,但是如果项目中使用了SpringTask来定时,那么在集群中可能会出现任务重复执行的情况。
⑦ 代码实现
public class SchedulerService { @Autowired private RedisTemplate redisTemplate; @Scheduled(cron = "0/5 * * * * ?") public void sendSms(){ try(RedisLock redisLock = new RedisLock(redisTemplate,"autoSms",30)) { if (redisLock.getLock()){ log.info("每五秒执行这个程序!"); } } catch (Exception e) { e.printStackTrace(); } } }
三、基于Zookeeper实现分布式锁
zookeeper的观察器
- 可设置观察器的3个方法:getData();getChildren();exists();
- 节点数据发生变化,发送给客户端;
- 观察器只能监控一次,再监控需重新设置;
实现原理
- 利用zookeeper的瞬时有序节点的特性;
- 多线程并发创建瞬时节点时,得到有序的序列;
- 序列号最小的线程获得锁;
- 其他线程监听自己序号的前一个序号;
- 前一个线程执行完成,删除自己序号节点;
- 下一个序号的线程得到通知,继续执行;
- 以此类推,创建节点时,已经确定了线程的执行顺序;
代码实现:
package com.example.distributelock.lock; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; @Slf4j public class ZkLock implements Watcher,AutoCloseable { private ZooKeeper zooKeeper; private String businessName; private String znode; public ZkLock(String connectString,String businessName) throws IOException { this.zooKeeper = new ZooKeeper(connectString,30000,this); this.businessName = businessName; } public boolean getLock() throws KeeperException, InterruptedException { Stat existsNode = zooKeeper.exists("/" + businessName, false); if (existsNode == null){ zooKeeper.create("/" + businessName,businessName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } znode = zooKeeper.create("/" + businessName + "/" + businessName + "_", businessName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); znode = znode.substring(znode.lastIndexOf("/")+1); List<String> childrenNodes = zooKeeper.getChildren("/" + businessName, false); Collections.sort(childrenNodes); String firstNode = childrenNodes.get(0); if (!firstNode.equals(znode)){ String lastNode = firstNode; for (String node:childrenNodes){ if (!znode.equals(node)){ lastNode = node; }else { zooKeeper.exists("/"+businessName+"/"+lastNode,true); break; } } synchronized (this){ wait(); } } return true; } @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.NodeDeleted){ synchronized (this){ notify(); } } } @Override public void close() throws Exception { zooKeeper.delete("/"+businessName+"/"+znode,-1); zooKeeper.close(); log.info("我释放了锁"); } }
测试:
@RequestMapping("zkLock") public String zkLock(){ log.info("我进入了方法!"); try (ZkLock zkLock = new ZkLock("localhost:2181","order")){ if (zkLock.getLock()) { log.info("我进入了锁!!"); Thread.sleep(15000); } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } log.info("方法执行完成"); return "方法执行完成"; }