分布式锁实现

分布式锁实现

电商场景,当用户下单的时候,redis 里库存只有一件,并发执行的时候可能会造成库存超卖问题

通过在执行第二步加锁,可以保证并发请求在下单的时候操作是串行化的,但是并发增多,增加一台机器

此时还是会造成库存超卖问题。原因是:两个系统运行在两个不同的JVM里面,他们加的锁只对属于自己JVM里面的线程有效,对于其他JVM的线程是无效的。即 Java提供的原生锁机制在多机部署场景下失效了

分布式锁:redis 或 zookeeper

1. Redis 实现方式

思路:在redis中设置一个值表示加了锁,然后释放锁的时候就把这个key删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 获取锁
// NX是指如果key不存在就成功,key存在返回false,PX可以指定过期时间
SET anyLock unique_value NX PX 30000


// 释放锁:通过执行一段lua脚本
// 释放锁涉及到两条指令,这两条指令不是原子性的
// 需要用到redis的lua脚本支持特性,redis执行lua脚本是原子性的
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

需要注意的地方:

  1. 一定要用SET key value NX PX milliseconds 命令
    如果不用,先设置了值,再设置过期时间,这个不是原子性操作,有可能在设置过期时间之前宕机,会造成死锁(key永久存在)
  2. value 要具有唯一性
    这个是为了在解锁的时候,需要验证value是和加锁的一致才删除key。
    这是避免了一种情况:假设A获取了锁,过期时间30s,此时35s之后,锁已经自动释放了,A去释放锁,但是此时可能B获取了锁。A客户端就不能删除B的锁了。

这样有可能会有一个问题是:设置了key的过期时间,但是业务处理逻辑的时间可能大于过期时间,这样A获取了锁,但是处理超时了,key被过期,B获取了锁,也有可能会恶性循环

组件 Redission 实现

  1. redisson所有指令都通过lua脚本执行,redis支持lua脚本原子性执行
  2. redisson中有一个watchdog的概念,翻译过来就是看门狗,它会在你获取锁之后,每隔10秒帮你把key的超时时间设为30s。这样的话,就算一直持有锁也不会出现key过期了,其他线程获取到锁的问题了。同时也保证了没有死锁的产生,哪怕机器宕机,key也会在时间到了之后自己过期
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// 加锁逻辑
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 调用一段lua脚本,设置一些key、过期时间
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}

Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 看门狗逻辑
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}


<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}



// 看门狗最终会调用了这里
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}

// 这个任务会延迟10s执行
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {

// 这个操作会将key的过期时间重新设置为30s
RFuture<Boolean> future = renewExpirationAsync(threadId);

future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}

if (future.getNow()) {
// reschedule itself
// 通过递归调用本方法,无限循环延长过期时间
scheduleExpirationRenewal(threadId);
}
}
});
}

}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}

2. ZooKeeper 实现方式

Zookeeper是一种提供配置管理、分布式协同以及命名的中心化服务。

zk的模型是这样的:zk包含一系列的节点,叫做znode,就好像文件系统一样每个znode表示一个目录,然后znode有一些特性:

  • 有序节点
    • 假如当前有一个父节点为/lock,我们可以在这个父节点下面创建子节点
      zookeeper提供了一个可选的有序特性,例如我们可以创建子节点“/lock/node-”并且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号
      也就是说,如果是第一个创建的子节点,那么生成的子节点为/lock/node-0000000000,下一个节点则为/lock/node-0000000001,依次类推。
  • 临时节点
    • 客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。
  • 事件监听
    • 在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端。当前zookeeper有如下四种事件:
      • 节点创建
      • 节点删除
      • 节点数据修改
      • 子节点变更

实现分布式锁的思路

  1. 使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下。
  2. 创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点
  3. 如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。
  4. 如果当前线程创建的节点不是所有节点序号最小的节点,则对节点序号的前一个节点添加一个事件监听。

比如当前线程获取到的节点序号为 /lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。
如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小。比如/lock/001释放了,/lock/002监听到事件,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。