Posts 基于 Redis 的分布式锁的实现方案
Post
Cancel

基于 Redis 的分布式锁的实现方案

  经过上次的调研(这篇blog),最终和 Leader 确定使用 Redis 来实现我们的分布式锁.

实现思路

   首先, 我们的 redis 是集群(Cluster)模式,集群里面的 redis 机器都会有一个 backup. 然后当我们的微服务部署后,一般来说分布式锁的名字都是会带上一个数据库中表数据的 id 来保证唯一性,并且使用{}来包裹这个id. 这样在集群模式下 redis 在计算把这个 key 分配给那个节点的时候只会对{}包裹的部分进行计算,并不需要计算整个字符串.可以节约点性能. 举个例子, 我们的一个接口是购买一个商品, 首先我们需要检查库存避免超售.那么我们就需要在业务开始的时候去请求锁.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// logger 是 winston loggger 对象
// POD_ID 是 K8S 中每个 pod 的唯一标识
// CONSUMER_NAME 是消费者的名字, 因为使用了 redis stream 来发生解锁的通知 而不是加锁失败然后重试, 这样也减轻 redis 的请求压力
const CONSUMER_NAME = process.env.POD_ID ? `${appName}-consumer-${process.env.POD_ID}` : `${appName}-consumer`
const lockName = `buy_{${goodsId}}`;
const redlock = new RedLock(lockName, 'EX', 5, CONSUMER_NAME, logger);

// begin: get lock
await redlock.lock();

// then: do some thing

// end: release lock
await redlock.unlock();

   redis 在集群模式下有一个 slot 的概念, redis cluster 模式可以保证分布在多个机器的相同的服务在向 redis 请求同一个 key 的时候能访问到同一个节点. 并且上面的这段代码中 goodsId 是一个变量, 所以在购买不同商品会有不同的锁. redis 面对不同的 id 的锁也会均衡的把请求分配到不同的节点上.

   接下来就是我们的分布式锁实现的具体代码了, @comm/redis.ts 文件中只是 redis 相关的配置, RedLock.tsLockManager.ts 是实现分布式的两个文件, 其实理论上只需要 RedLock.ts 的相关代码就能实现这样的一个分布式锁. 但是我们还引入了 LockManager.ts 来实现本机服务队列的这样的功能.如果超过队列的最大限制,我们就拒绝请求.具体细节可以看下面的代码和细节.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// @comm/redis.ts
import ioredis from 'ioredis';

const isUseLocalServer = !!process.env.IS_LOCAL_SERVER;

const config: ioredis.RedisOptions = {
  commandTimeout: 1000,
  connectTimeout: 1000,
  db: isUseLocalServer ? 0 : (process.env.REDIS_INDEX ?? 0),
};

const uriSingle = process.env.REDIS ?? 'redis://localhost:6379';
const uriCluster = [
  'redis://redis-0.redis.kube-node-test.svc.cluster.local:6379',
  'redis://redis-1.redis.kube-node-test.svc.cluster.local:6379',
  'redis://redis-2.redis.kube-node-test.svc.cluster.local:6379',
  'redis://redis-3.redis.kube-node-test.svc.cluster.local:6379',
  'redis://redis-4.redis.kube-node-test.svc.cluster.local:6379',
  'redis://redis-5.redis.kube-node-test.svc.cluster.local:6379',
];

const redis = isUseLocalServer ? new ioredis(uriSingle, config) : new ioredis.Cluster(uriCluster, config);

export default redis;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// RedLock.ts
import { nanoid } from 'nanoid';
import redis from '@comm/redis';
import Logger from '@/utils/logger';
import LockManger from './LockManager';

type ExpireModel = 'EX' | 'PX';
type ConText = {
  resolve: () => void;
  reject: (err: Error) => void;
}

class RedLock {
  private context?: ConText;
  private timeout?: NodeJS.Timeout;
  private readonly id: string;                // 锁的唯一id 只有知道这个 id 才可以解锁, 也就是只有自己能解锁, 避免被别的服务解锁
  private readonly lockName: string;          // 锁的名字
  private readonly expireModel: ExpireModel   // 锁的过期模式 'EX' | 'PX'
  private readonly ttl: number;               // 锁的过期时间
  private readonly consumerName;              // 消费者的名字 redis stream 中的一个消费者的概念
  private readonly logger: Logger;            // winston logger
  private readonly maxWaitingLength: number;  // 最大等待队列长度

  constructor(lockName: string, expireModel: ExpireModel, ttl: number, consumerName: string, logger: Logger, maxWaitingLength = 100) {
    this.id = nanoid(36);
    this.ttl = ttl; 
    this.logger = logger; 
    this.lockName = lockName; 
    this.expireModel = expireModel; 
    this.consumerName = consumerName; 
    this.maxWaitingLength = maxWaitingLength;
  }

  getId = (): string => this.id;
  getTtl = (): number => this.ttl;
  getLogger = (): Logger => this.logger;
  getLockName = (): string => this.lockName;
  getConsumerName = (): string => this.consumerName;
  getContext = (): ConText | undefined => this.context;
  getExpireModel = (): ExpireModel => this.expireModel;
  getMaxWaitingLength = (): number => this.maxWaitingLength;

  onResume(): void {
    this.context?.resolve();
  }

  onError(err: Error): void {
    this.context?.reject(err);
  }

  // 对外暴露的加锁的方法
  async lock(): Promise<void> { 
    const lockList = LockManger.getLockListMap();
    const data = lockList.get(this.lockName);
    // 首先确保我们的等待队列不超过 maxWaitingLength 如果超过了,直接抛出服务正忙的异常
    if ((data?.waiting?.length ?? 0) > this.maxWaitingLength) {
      this.logger.warn(`lock ${this.lockName} waiting length is ${data?.waiting?.length}, max waiting length is ${this.maxWaitingLength}`);
      throw new Error('Server is busy, please try again later');
    }
    // 判断当前请求前面是否有别的请求在等待, 如果有则 await. 等待 onResume() 方法被调用唤醒, onResume 方法会在 LockManager 的 execNext 方法中被调用
    if (LockManger.isQueueLocked(this)) {
      this.logger.info(`lock ${this.lockName} is locked.`);
      await new Promise<void>((resolve, reject) => {
        this.context = {
          resolve,
          reject,
        };
      });
    }
    await this.getRedisLock();
  }

  // 对外暴露解锁的方法
  async unlock(): Promise<void> {
    this.logger.info(`clear timeout for lock ${this.lockName}, id: ${this.id}, timer: ${this.timeout}`);
    // 解锁的时候清除定时器
    if (this.timeout != null) {
      clearTimeout(this.timeout);
      await this.onRelease();
    }
  }

  // 在 redis 加锁的实际逻辑
  private async getRedisLock(): Promise<void> {
    LockManger.setCurrentLock(this);
    this.logger.info(`Set current lock, lockName: ${this.lockName}, id: ${this.id}`);
    const result = await redis.set(this.lockName, this.id, this.expireModel, this.ttl, 'NX');
    this.logger.info(`get redis lock return value: ${result}`);
    if (result === 'OK') {
      // 加锁成功后设置一个定时器,如果没有调用 unlock() 方法, 则会在锁过期后自动触发 onTimeout() 方法自动解锁 
      const time = this.expireModel === 'PX' ? this.ttl : this.ttl * 1000;
      this.timeout = setTimeout(this.onTimeout.bind(this), time);
    } else {
      // 如果加锁失败, 则监听 redis stream 等待服务向 stream 发送解锁消息. 当收到解锁消息后再去请求锁
      const messages = await redis.xreadgroup('GROUP', LockManger.group, this.consumerName, 'block', 0, 'STREAMS', LockManger.streams, '>');
      const [_, [[key, [lockName, message]]]] = messages[0];
      this.logger.info(`consumerName ${this.consumerName}, listen unlock message, key: ${key}, lockName: ${lockName}, message: ${message}`);
      // 收到解锁消息,并 ack 这条消息, 表示这个消息已经被处理过了
      await redis.xack(LockManger.streams, LockManger.group, key);
      this.logger.info(`ack unlock message success, key: ${key}, streams: ${LockManger.streams}, message: ${LockManger.group}`);
      await this.getRedisLock();
    }
  }

  // 锁到了过期时间后被定时器调用释放锁 
  private async onTimeout(): Promise<void> {
    this.logger.warn(`Lock ${this.lockName} id: ${this.id} is expired in ${this.ttl} seconds`);
    this.timeout = undefined;
    await this.onRelease();
  }

  // 执行 lua 脚本释放锁 并且发送一个 unlock 消息到 redis stream
  // 这里记录了很多 warn 日志, 因为可能会存在锁超时而业务代码还没有执行完的情况, 所以我们需要定期检查下这些高危日志, 这也是我们分布式锁的一个问题
  // 当然 如果我们也可以自动给锁续期, 虽然能解决这个问题,但是如果业务一直不结束, 那么我们的锁也就会有死锁问题. 所以最终我们选定的方案是给锁设置一个大大超过业务执行时间的一个过期时间
  private async onRelease(): Promise<void> {
    const script = 'if redis.call("get",KEYS[1]) == ARGV[1] then' +
      '   return redis.call("del",KEYS[1]) ' +
      'else' +
      '   return 0 ' +
      'end';
    const result = await redis.eval(script, 1, this.lockName, this.id);
    if (result === 1) {
      this.logger.info(`unlock ${this.lockName}, id: ${this.id} success`);
      // 判断是否解锁消息发送成功,不成功则重试
      let isSendMessageSuccess = false;
      let retryCount = 0;
      do {
        try {
          const key = await redis.xadd(LockManger.streams, '*', this.lockName, `unlock ${this.lockName}`);
          this.logger.info(`send unlock message to ${LockManger.streams} success, key: ${key}`);
          isSendMessageSuccess = true;
        } catch (error) {
          this.logger.warn(`xadd unlock message failed with error: ${error}`);
          retryCount++;
          await Time.waitForMs(100); // 不做任何操作 只是休眠 100ms
        }
      } while (!isSendMessageSuccess && retryCount < 10);
    } else {
      this.logger.warn(`unlock ${this.lockName}, id: ${this.id} failed, ${this.lockName} maybe expired`);
    }
    this.logger.info('begin exec next request');
    LockManger.execNext(this);
  }
}

export default RedLock;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// LockManger.ts
import redis from '@comm/redis';
import Logger from '@/utils/logger';
import RedLock from './RedLock';

// winston logger
const logger = new Logger('LockManager');

interface RedLockQueue {
  current: RedLock | null;
  waiting: RedLock[];
}

class LockManager {
  private lockListmap = new Map<string, RedLockQueue>();
  constructor(public streams = 'lockStream', public group = 'lockStreamGroup') {}

  // 服务启动的时候创建一个 redis stream 和消费组, 这个是原子操作, 多个微服务只会有一个创建成功
  async initialize() {
    try {
      await redis.xgroup('create', this.streams, this.group, '$');
      logger.info(`streams: ${this.streams}, group: ${this.group} created successfully.`);
    } catch (error) {
      logger.info(`streams: ${this.streams}, group: ${this.group} created faile with error: ${error}`);
    }
  }

  getLockListMap = (): Map<string, RedLockQueue> => this.lockListmap;

  setCurrentLock(lock: RedLock) {
    const lockName = lock.getLockName();
    const data = this.lockListmap.get(lock.getLockName());
    if (!data) {
      this.lockListmap.set(lockName, {
        current: lock,
        waiting: [],
      });
    } else {
      data.current = lock;
    }
  }

  // 判断当时锁是否有需要等待的请求,有就排队
  isQueueLocked(lock: RedLock) {
    const lockName = lock.getLockName();
    const data = this.lockListmap.get(lockName);
    logger.info(`check lock queue, lockName: ${lockName}, current lockName: ${data?.current?.getLockName()}, waiting length: ${data?.waiting.length}`);
    if (!data?.current) {
      return false;
    } else {
      this.pushWaitingQueue(lock);
      return true;
    }
  }

  execNext(lock: RedLock) {
    const id = lock.getId();
    const lockName = lock.getLockName();
    const data = this.lockListmap.get(lockName);
    if (!data) {
      logger.info(`cannot find ${lockName} in lockListmap`);
      return;
    }

    const item = data.waiting.pop();
    logger.info(`exec next request, next lock: ${item?.getLockName()}, waiting length: ${data.waiting.length}`);
    if (item) {
      item.onResume();
      data.current = item;
    } else {
      data.current = null;
      logger.info(`waiting queue is empty, lockName: ${lockName}`);
    }
  }
  
  private pushWaitingQueue(lock: RedLock) {
    lock.getLockName();
    const lockName = lock.getLockName();
    const data = this.lockListmap.get(lockName);
    if (data) {
      data.waiting.push(lock);
    }
  }
}

const LockManger = new LockManager();
LockManger.initialize();
export default LockManger;

   这份代码是第一版的实现方案,RedLock这个类的配置信息和构造参数目前来说还是太多了,有些细节还需要再调整下.比如说可以将RedLock这个类作为一个基类, 每个微服务根据业务的不同可以继承RedLock类然后把一些构造参数和配置固定, 然后在 new RedLock 的时候就可以简单一些了.

This post is licensed under CC BY 4.0 by the author.

Javascript 设计模式与开发实践总结

我的2021年终总结