原创

分布式锁


分布式锁

基于Redis的分布式锁

/
 * Redis分布式锁
 *
 * @author lzlg
 * 2022/11/27 12:22
 */
public final class RedisLock implements Lock {

    private final StringRedisTemplate redisTemplate;

    private final String lockName;

    private final String uuid;

    private long expire;

    RedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid, long expire) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = uuid + ":" + Thread.currentThread().getId();
        this.expire = expire;
    }

    @Override
    public void lock() {
        this.tryLock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new InterruptedException();
    }

    @Override
    public boolean tryLock() {
        try {
            return this.tryLock(-1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (time != -1) {
            this.expire = unit.toSeconds(time);
        }
        // 获取锁的lua脚本
        String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
                "then " +
                "   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
                "   redis.call('expire', KEYS[1], ARGV[2]) " +
                "   return 1 " +
                "else " +
                "   return 0 " +
                "end";
        // 创建执行脚本对象
        RedisScript<Boolean> redisScript = new DefaultRedisScript<>(script, Boolean.class);
        // 如果没有上锁成功,则循环进行获取锁
        while (!Optional.ofNullable(redisTemplate.execute(
                redisScript,
                Collections.singletonList(lockName),
                uuid, String.valueOf(expire)))
                .orElse(Boolean.FALSE)) {
            TimeUnit.MILLISECONDS.sleep(50);
        }
        // 进行定时的锁延期任务
        this.renewExpire();
        return true;
    }

    @Override
    public void unlock() {
        // 解锁的lua脚本
        String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
                "then " +
                "   return nil " +
                "elseif " +
                "   redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
                "then " +
                "   return redis.call('del', KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        // 创建执行脚本对象
        RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
        // 执行脚本,进行解锁
        Long flag = redisTemplate.execute(redisScript, Collections.singletonList(lockName), uuid);
        // 如果返回null,则是非法解锁,报异常
        if (ObjectUtils.isEmpty(flag)) {
            throw new IllegalMonitorStateException("this lock is not yours.");
        }
    }

    @Override
    public Condition newCondition() {
        throw new IllegalArgumentException();
    }

    /
     * 使用Timer定时器进行锁的自动续期
     * 判断自己的锁是否还存在,如果存在,则进行续期,配合lua脚本
     * if redis.call('hexists', 'lock', uuid) == 1
     * then
     *   return redis.call('expire', 'lock', 30)
     * else
     *   return 0
     * end
     *
     * keys: lock
     * args: uuid 过期时间
     *
     * if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end
     *
     * redis运行测试:
     * eval "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end" 1 lock 12345678 30
     *
     */
    private void renewExpire() {
        String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
                "then " +
                "   return redis.call('expire', KEYS[1], ARGV[2]) " +
                "else " +
                "   return 0 " +
                "end";
        RedisScript<Boolean> redisScript = new DefaultRedisScript<>(script, Boolean.class);
        // 延期三分之一的时间进行执行
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                // 如果续期成功,则再次执行续期方法,否则不再进行续期(因为自己的锁不存在了)
                if (Optional.ofNullable(redisTemplate.execute(redisScript,
                        Collections.singletonList(lockName),
                        uuid, String.valueOf(expire)))
                        .orElse(Boolean.FALSE)) {
                    renewExpire();
                }
            }
        }, expire * 1000 / 3);
    }
}

基于Zookeeper的分布式锁

/
 * Zookeeper客户端
 *
 * @author lzlg
 * 2022/11/27 19:49
 */
@Component
public final class ZookeeperClient {

    private ZooKeeper zooKeeper;

    /
     * 程序启动初始化ZooKeeper客户端
     */
    @PostConstruct
    public void init() {
        CountDownLatch cdl = new CountDownLatch(1);
        try {
            zooKeeper = new ZooKeeper("192.168.174.160:2181", 30_000, event -> {
                Watcher.Event.KeeperState state = event.getState();
                if (state.equals(SyncConnected) && event.getType().equals(None)) {
                    cdl.countDown();
                    System.out.println("初始化ZooKeeper完成");
                }
            });
            cdl.await();
        } catch (IOException | InterruptedException e) {
            // 初始化失败,则抛出异常
            throw new IllegalStateException(e.getMessage());
        }
    }

    /
     * 程序停止销毁ZooKeeper客户端
     */
    @PreDestroy
    public void destroy() {
        if (!ObjectUtils.isEmpty(zooKeeper)) {
            try {
                zooKeeper.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /
     * 获取Zookeeper锁
     *
     * @param lockName 锁名称
     * @return Zookeeper锁
     */
    public ZookeeperLock getLock(String lockName) {
        return new ZookeeperLock(zooKeeper, lockName);
    }
}

/
 * Zookeeper分布式锁
 * @author lzlg
 * 2022/11/27 19:49
 */
public final class ZookeeperLock implements Lock {

    private final ZooKeeper zooKeeper;

    private final String lockName;

    private String currentNodePath;

    // 锁的根路径
    private static final String ROOT_PATH = "/locks";

    private static final ThreadLocal<Integer> renewCount = new ThreadLocal<>();

    ZookeeperLock(ZooKeeper zooKeeper, String lockName) {
        this.zooKeeper = Objects.requireNonNull(zooKeeper);
        this.lockName = Objects.requireNonNull(lockName);
        try {
            // 如果根路径节点不存在,则进行创建
            if (ObjectUtils.isEmpty(this.zooKeeper.exists(ROOT_PATH, false))) {
                // 创建根永久节点
                String result = this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
                System.out.println("初始化根节点完成:" + result);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void lock() {
        this.tryLock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new InterruptedException();
    }

    /
     * 加锁方法,如果已经存在LockName对应的节点,再次创建Zookeeper会创建失败
     * 注意,必须创建临时节点,如果使用永久节点,万一服务器宕机,则会发生死锁问题.
     *
     * 使用临时序列化节点+监听机制实现阻塞锁,不再递归调用
     */
    @Override
    public boolean tryLock() {
        // 如果已经有重入记录且重入次数大于0,则直接获取锁
        Integer flag = renewCount.get();
        if (!ObjectUtils.isEmpty(flag) && flag > 0) {
            renewCount.set(flag + 1);
            return true;
        }

        try {
            // 使用临时序列化节点,返回的是当前序列化节点路径
            currentNodePath = this.zooKeeper.create(this.lockPath(), null,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("获取锁成功,结果:" + currentNodePath);

            // 获取前一个节点名称
            Optional<String> preNodeOptional = this.getPreNode();
            // 如果有前一个节点
            if (preNodeOptional.isPresent()) {
                String preNode = preNodeOptional.get();
                CountDownLatch cdl = new CountDownLatch(1);
                // 判断前一个节点是否存在
                if (ObjectUtils.isEmpty(this.zooKeeper.exists(ROOT_PATH + "/" + preNode, event -> {
                    cdl.countDown();
                }))) {
                    // 如果节点不存在,则当前节点已经是第一个了,获取锁成功
                    renewCount.set(1);
                    return true;
                }
                // 如果有前一个节点,应该阻塞,直到前一个节点获取锁完成业务逻辑删除锁
                cdl.await();
            }
            // 返回空则是当前节点
            renewCount.set(1);
            return true;
        } catch (KeeperException | InterruptedException e) {
            System.out.println("加锁失败:" + e.getMessage());
            // 如果加锁失败,则进行重试
//            try {
//                TimeUnit.MILLISECONDS.sleep(60);
//                // 再次请求获取锁
//                this.tryLock();
//            } catch (InterruptedException ex) {
//                ex.printStackTrace();
//            }
        }
        return false;
    }

    /
     * 获取前一个节点名称
     *
     * @return 前节点名称
     */
    private Optional<String> getPreNode() {
        // 获取根路径下的子节点列表
        try {
            List<String> children = this.zooKeeper.getChildren(ROOT_PATH, false);
            // 如果没有子节点,则抛出异常,因为刚刚增加了一个临时序列节点
            if (ObjectUtils.isEmpty(children)) {
                throw new IllegalMonitorStateException("非法操作");
            }
            // 取出是相同前缀的子节点
            List<String> prefixSameNodes = children.stream().filter(n -> n.startsWith(lockName + "-"))
                    .collect(Collectors.toList());
            // 如果没有,则也是抛出异常
            if (ObjectUtils.isEmpty(prefixSameNodes)) {
                throw new IllegalMonitorStateException("非法操作");
            }
            // 进行排序
            Collections.sort(prefixSameNodes);
            // 找到当前节点的下标
            String currentNodeName = currentNodePath.substring(currentNodePath.lastIndexOf("/") + 1);
            int index = prefixSameNodes.indexOf(currentNodeName);
            // 如果当前节点下标小于0,则为非法操作
            if (index < 0) {
                throw new IllegalMonitorStateException("非法操作");
                // 如果当前节点大于0,则返回前一个节点名称
            } else if (index > 0) {
                // 获取前一个节点的名称
                return Optional.of(prefixSameNodes.get(index - 1));
            }
            return Optional.empty();
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
            throw new IllegalMonitorStateException("非法操作");
        }

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        throw new InterruptedException();
    }

    /
     * 解锁方法,就是删除对应LockName的节点就可以了
     */
    @Override
    public void unlock() {
        Integer flag = renewCount.get();
        if (ObjectUtils.isEmpty(flag)) {
            throw new IllegalMonitorStateException("非法操作");
        }
        // 重入次数减1
        renewCount.set(flag - 1);
        // 如果重入次数为0,则删除锁
        if (renewCount.get() == 0) {
            try {
                this.zooKeeper.delete(currentNodePath, -1);
            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public Condition newCondition() {
        throw new IllegalArgumentException();
    }

    /
     * 获取锁路径
     *
     * @return 锁路径
     */
    private String lockPath() {
        return ROOT_PATH + "/" + lockName + "-";
    }
}

基于MySQL的分布式锁

@Service
public class LockServiceImpl extends ServiceImpl<LockMapper, Lock> implements LockService {

    @Autowired
    private RedisStockService redisStockService;

    /
     * 扣减库存
     */
    @Override
    public void stockDeduct() {
        Lock lock = new Lock();
        // 字段lock_name是唯一索引
        lock.setLockName("mysql-lock");
        try {
            this.save(lock);
            // 插入成功,加锁成功,则进行业务处理
            redisStockService.stockDeduct();
            // 处理完成,进行解锁
            this.removeById(lock.getId());
        } catch (Exception e) {
            System.out.println("加锁失败:" + e.getMessage());
            // 如果插入出现异常,则进行重试
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            this.stockDeduct();
        }
    }
}
程序员内功
码出好代码
  • 作者:lzlg520
  • 发表时间:2022-10-19 00:45
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 公众号转载:请在文末添加作者公众号二维码