分布式锁
基于Redis的分布式锁
/
* Redis分布式锁
*
* @author lzlg
* 2022/11/27 12:22
*/
public final class RedisLock implements Lock {
private final StringRedisTemplate redisTemplate;
private final String lockName;
private final String uuid;
private long expire;
RedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid, long expire) {
this.redisTemplate = redisTemplate;
this.lockName = lockName;
this.uuid = uuid + ":" + Thread.currentThread().getId();
this.expire = expire;
}
@Override
public void lock() {
this.tryLock();
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new InterruptedException();
}
@Override
public boolean tryLock() {
try {
return this.tryLock(-1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time != -1) {
this.expire = unit.toSeconds(time);
}
// 获取锁的lua脚本
String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
"then " +
" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
" redis.call('expire', KEYS[1], ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
// 创建执行脚本对象
RedisScript<Boolean> redisScript = new DefaultRedisScript<>(script, Boolean.class);
// 如果没有上锁成功,则循环进行获取锁
while (!Optional.ofNullable(redisTemplate.execute(
redisScript,
Collections.singletonList(lockName),
uuid, String.valueOf(expire)))
.orElse(Boolean.FALSE)) {
TimeUnit.MILLISECONDS.sleep(50);
}
// 进行定时的锁延期任务
this.renewExpire();
return true;
}
@Override
public void unlock() {
// 解锁的lua脚本
String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
"then " +
" return nil " +
"elseif " +
" redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
"then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
// 创建执行脚本对象
RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
// 执行脚本,进行解锁
Long flag = redisTemplate.execute(redisScript, Collections.singletonList(lockName), uuid);
// 如果返回null,则是非法解锁,报异常
if (ObjectUtils.isEmpty(flag)) {
throw new IllegalMonitorStateException("this lock is not yours.");
}
}
@Override
public Condition newCondition() {
throw new IllegalArgumentException();
}
/
* 使用Timer定时器进行锁的自动续期
* 判断自己的锁是否还存在,如果存在,则进行续期,配合lua脚本
* if redis.call('hexists', 'lock', uuid) == 1
* then
* return redis.call('expire', 'lock', 30)
* else
* return 0
* end
*
* keys: lock
* args: uuid 过期时间
*
* if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end
*
* redis运行测试:
* eval "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end" 1 lock 12345678 30
*
*/
private void renewExpire() {
String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
"then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
RedisScript<Boolean> redisScript = new DefaultRedisScript<>(script, Boolean.class);
// 延期三分之一的时间进行执行
new Timer().schedule(new TimerTask() {
@Override
public void run() {
// 如果续期成功,则再次执行续期方法,否则不再进行续期(因为自己的锁不存在了)
if (Optional.ofNullable(redisTemplate.execute(redisScript,
Collections.singletonList(lockName),
uuid, String.valueOf(expire)))
.orElse(Boolean.FALSE)) {
renewExpire();
}
}
}, expire * 1000 / 3);
}
}
基于Zookeeper的分布式锁
/
* Zookeeper客户端
*
* @author lzlg
* 2022/11/27 19:49
*/
@Component
public final class ZookeeperClient {
private ZooKeeper zooKeeper;
/
* 程序启动初始化ZooKeeper客户端
*/
@PostConstruct
public void init() {
CountDownLatch cdl = new CountDownLatch(1);
try {
zooKeeper = new ZooKeeper("192.168.174.160:2181", 30_000, event -> {
Watcher.Event.KeeperState state = event.getState();
if (state.equals(SyncConnected) && event.getType().equals(None)) {
cdl.countDown();
System.out.println("初始化ZooKeeper完成");
}
});
cdl.await();
} catch (IOException | InterruptedException e) {
// 初始化失败,则抛出异常
throw new IllegalStateException(e.getMessage());
}
}
/
* 程序停止销毁ZooKeeper客户端
*/
@PreDestroy
public void destroy() {
if (!ObjectUtils.isEmpty(zooKeeper)) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/
* 获取Zookeeper锁
*
* @param lockName 锁名称
* @return Zookeeper锁
*/
public ZookeeperLock getLock(String lockName) {
return new ZookeeperLock(zooKeeper, lockName);
}
}
/
* Zookeeper分布式锁
* @author lzlg
* 2022/11/27 19:49
*/
public final class ZookeeperLock implements Lock {
private final ZooKeeper zooKeeper;
private final String lockName;
private String currentNodePath;
// 锁的根路径
private static final String ROOT_PATH = "/locks";
private static final ThreadLocal<Integer> renewCount = new ThreadLocal<>();
ZookeeperLock(ZooKeeper zooKeeper, String lockName) {
this.zooKeeper = Objects.requireNonNull(zooKeeper);
this.lockName = Objects.requireNonNull(lockName);
try {
// 如果根路径节点不存在,则进行创建
if (ObjectUtils.isEmpty(this.zooKeeper.exists(ROOT_PATH, false))) {
// 创建根永久节点
String result = this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("初始化根节点完成:" + result);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void lock() {
this.tryLock();
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new InterruptedException();
}
/
* 加锁方法,如果已经存在LockName对应的节点,再次创建Zookeeper会创建失败
* 注意,必须创建临时节点,如果使用永久节点,万一服务器宕机,则会发生死锁问题.
*
* 使用临时序列化节点+监听机制实现阻塞锁,不再递归调用
*/
@Override
public boolean tryLock() {
// 如果已经有重入记录且重入次数大于0,则直接获取锁
Integer flag = renewCount.get();
if (!ObjectUtils.isEmpty(flag) && flag > 0) {
renewCount.set(flag + 1);
return true;
}
try {
// 使用临时序列化节点,返回的是当前序列化节点路径
currentNodePath = this.zooKeeper.create(this.lockPath(), null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("获取锁成功,结果:" + currentNodePath);
// 获取前一个节点名称
Optional<String> preNodeOptional = this.getPreNode();
// 如果有前一个节点
if (preNodeOptional.isPresent()) {
String preNode = preNodeOptional.get();
CountDownLatch cdl = new CountDownLatch(1);
// 判断前一个节点是否存在
if (ObjectUtils.isEmpty(this.zooKeeper.exists(ROOT_PATH + "/" + preNode, event -> {
cdl.countDown();
}))) {
// 如果节点不存在,则当前节点已经是第一个了,获取锁成功
renewCount.set(1);
return true;
}
// 如果有前一个节点,应该阻塞,直到前一个节点获取锁完成业务逻辑删除锁
cdl.await();
}
// 返回空则是当前节点
renewCount.set(1);
return true;
} catch (KeeperException | InterruptedException e) {
System.out.println("加锁失败:" + e.getMessage());
// 如果加锁失败,则进行重试
// try {
// TimeUnit.MILLISECONDS.sleep(60);
// // 再次请求获取锁
// this.tryLock();
// } catch (InterruptedException ex) {
// ex.printStackTrace();
// }
}
return false;
}
/
* 获取前一个节点名称
*
* @return 前节点名称
*/
private Optional<String> getPreNode() {
// 获取根路径下的子节点列表
try {
List<String> children = this.zooKeeper.getChildren(ROOT_PATH, false);
// 如果没有子节点,则抛出异常,因为刚刚增加了一个临时序列节点
if (ObjectUtils.isEmpty(children)) {
throw new IllegalMonitorStateException("非法操作");
}
// 取出是相同前缀的子节点
List<String> prefixSameNodes = children.stream().filter(n -> n.startsWith(lockName + "-"))
.collect(Collectors.toList());
// 如果没有,则也是抛出异常
if (ObjectUtils.isEmpty(prefixSameNodes)) {
throw new IllegalMonitorStateException("非法操作");
}
// 进行排序
Collections.sort(prefixSameNodes);
// 找到当前节点的下标
String currentNodeName = currentNodePath.substring(currentNodePath.lastIndexOf("/") + 1);
int index = prefixSameNodes.indexOf(currentNodeName);
// 如果当前节点下标小于0,则为非法操作
if (index < 0) {
throw new IllegalMonitorStateException("非法操作");
// 如果当前节点大于0,则返回前一个节点名称
} else if (index > 0) {
// 获取前一个节点的名称
return Optional.of(prefixSameNodes.get(index - 1));
}
return Optional.empty();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
throw new IllegalMonitorStateException("非法操作");
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new InterruptedException();
}
/
* 解锁方法,就是删除对应LockName的节点就可以了
*/
@Override
public void unlock() {
Integer flag = renewCount.get();
if (ObjectUtils.isEmpty(flag)) {
throw new IllegalMonitorStateException("非法操作");
}
// 重入次数减1
renewCount.set(flag - 1);
// 如果重入次数为0,则删除锁
if (renewCount.get() == 0) {
try {
this.zooKeeper.delete(currentNodePath, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
@Override
public Condition newCondition() {
throw new IllegalArgumentException();
}
/
* 获取锁路径
*
* @return 锁路径
*/
private String lockPath() {
return ROOT_PATH + "/" + lockName + "-";
}
}
基于MySQL的分布式锁
@Service
public class LockServiceImpl extends ServiceImpl<LockMapper, Lock> implements LockService {
@Autowired
private RedisStockService redisStockService;
/
* 扣减库存
*/
@Override
public void stockDeduct() {
Lock lock = new Lock();
// 字段lock_name是唯一索引
lock.setLockName("mysql-lock");
try {
this.save(lock);
// 插入成功,加锁成功,则进行业务处理
redisStockService.stockDeduct();
// 处理完成,进行解锁
this.removeById(lock.getId());
} catch (Exception e) {
System.out.println("加锁失败:" + e.getMessage());
// 如果插入出现异常,则进行重试
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
this.stockDeduct();
}
}
}