Posts 对于 Promise.all() 的误解
Post
Cancel

对于 Promise.all() 的误解

   今天在做一个需求的时候 sequelize 抛出了这样的一个问题 Error: commit has been called on this transaction(724d4efa-4707-4931-bf7e-5172eb7e5b49), you can no longer use it. 后来分析发现自己原来一直对 Promise.all() 的使用有些误解.

   简单讲下业务逻辑,就是需要实现一个 cdkey 兑换的功能,然后因为 cdkey 有不同类型,而不同类型的 cdkey 兑换逻辑有差异,但是奖励是一样的.所以单独写了一个函数 sendUnblockchainReward 用来发送奖励. 业务逻辑的执行顺序是: 路由方法调用 exchangeCommon 函数,然后调用 sendUnblockchainReward 函数.附上有问题的代码(省略了无关的代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
app.Router.post('/exchangeCdkey', async (ctx, next) => {
  // some code ......
  const transaction = await sequelize.transaction();
  const userLock = new RedLock(`post_exchangeCdkey_{${userId}}`, 10000, CONSUMER_NAME, logger); 
  const cedkeyLock = new RedLock(`post_exchangeCdkey_{${cdkey}}`, 10000, CONSUMER_NAME, logger); 
  try {
    await Promise.all([userLock.lock(), cedkeyLock.lock()]);
    // some business logic ......
    await exchangeCommonCdkey(userInfo, existedCdkeyInfo, transaction, logger);
    // some business logic ......
  } catch (e) {
    logger.error(`User: ${userId} request /exchangeCdkey faile. Error:${e}`);
    await transaction.rollback();
  } finally {
    await Promise.all([userLock.unlock(), cedkeyLock.unlock()]);
    await next();
  }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
export const exchangeCommonCdkey = async (
  userInfo: User, 
  existedCdkeyInfo: CdkeyModel, 
  transaction: Transaction, 
  logger: Logger
): Promise<CdkeyUseHistoryModel> => {
  // some code ......
  const databaseBatchOperation: Promise<unknown>[] = [];
  const exchangeCdkeyUseHistory = CdkeyUseHistoryModel.build({ cdkeyId, userId, groupId });

  existedCdkeyInfo.cdkeyAvailableCount -= 1;
  if (existedCdkeyInfo.cdkeyAvailableCount === 0) {
    logger.info(`cdkey ${cdkey} is used up, delete it.`);
    databaseBatchOperation.push(existedCdkeyInfo.destroy({ transaction }));
    databaseBatchOperation.push(HistoryExchangedCdkeyModel.create(existedCdkeyInfo.toJSON(), { transaction }));
  } else {
    databaseBatchOperation.push(existedCdkeyInfo.save({ transaction }));
  }

  if (!isBlockchainReward) {
    sendUnblockchainReward(userInfo, existedCdkeyInfo, exchangeCdkeyUseHistory, databaseBatchOperation, transaction, logger);
  }

  logger.info(`cdkey ${cdkey} is exchanged. operator: ${userId}, begin write change to database.`);
  await Promise.all<unknown>(databaseBatchOperation);
  return exchangeCdkeyUseHistory;
};

const sendUnblockchainReward = (
  userInfo: User,
  existedCdkeyInfo: CdkeyModel,
  exchangeCdkeyUseHistory: CdkeyUseHistoryModel,
  databaseBatchOperation: Promise<unknown>[],
  transaction: Transaction,
  logger: Logger,
): void => {
  try {
    // some code ......
    exchangeCdkeyUseHistory.isSend = true;
    exchangeCdkeyUseHistory.sendTime = new Date();
    databaseBatchOperation.push(userInfo.save({ transaction }));
    databaseBatchOperation.push(exchangeCdkeyUseHistory.save({ transaction }));
  } catch (error) {
    throw new Error(`'Database config error'. Error: ${error}`);
  }
};

  乍一看,这个代码很符合逻辑,先销毁 cdkey 并添加记录到 cdkey 使用历史表中,然后在给用户发送奖励.但是运行正常的情况下,这个接口是没问题的.上文提到的问题出现在了 sendUnblockchainReward 函数中. 就是 sendUnblockchainReward 函数中出现异常后, 会抛出异常给 exchangeCommonCdkey 函数,但是 exchangeCommonCdkey 函数并没有做异常处理,所以异常抛向了路由函数,而路由函数在捕获异常后执行了 await transaction.rollback() 回滚数据库. 当时我的想法就是, await transaction.rollback() 执行了回滚,那就说明肯定出现了异常, await Promise.all<unknown>(databaseBatchOperation) 这段代码肯定是不会执行的, 那既然这段代码不执行, databaseBatchOperation 的方法都不会执行, 那究竟是哪里使用到了 transaction呢?

   想了半天没想明白问题在哪里,因为这个时候我已经陷入了误区了.后来我才明白了,其实往 databaseBatchOperation push 的时候,对数据的操作就已经执行了只不过 Promise.all() 只是去取回这些 Promise 对象的结果. 换句话说, Promise.all() 只关注所有的 Promise 对象什么时候结束(什么时候能去取结果), 所以的 Promise 对象并不是在 Promise.all() 调用的时候才一起去操作, 它们是在一开始的地方就被调用了. 而因为 Promise.all() 还需要取回这些 Promise 对象的结果, 这个时候这些 Promise 对象结果的回调还在事件循环的微任务中等待当前同步代码执行完毕后执行 await Promise.all<unknown>(databaseBatchOperation) 来取回结果. 而因为出现了异常,导致这段代码没执行,所以回调还没被处理.而先去执行了回滚. 最终事务先被回滚了, 而回调函数还在使用这个事务所以就 sequelize 就抛出了上文的异常.

   解决办法就是提前 sendUnblockchainReward 函数先于其他数据库操作执行. 因为 sendUnblockchainReward 函数中的数据库操作和 exchangeCommonCdkey 函数中的数据库操作是在同一个事务中的,所以可以保证数据一致性,虽然业务顺序不合通常逻辑. 附上改动后的正确代码(只保留需要修改的函数):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
export const exchangeCommonCdkey = async (
  userInfo: User, 
  existedCdkeyInfo: CdkeyModel, 
  transaction: Transaction, 
  logger: Logger
): Promise<CdkeyUseHistoryModel> => {
  // some code ......
  const databaseBatchOperation: Promise<unknown>[] = [];
  const exchangeCdkeyUseHistory = CdkeyUseHistoryModel.build({ cdkeyId, userId, groupId });

  if (!isBlockchainReward) {
    sendUnblockchainReward(userInfo, existedCdkeyInfo, exchangeCdkeyUseHistory, databaseBatchOperation, transaction, logger);
  }

  existedCdkeyInfo.cdkeyAvailableCount -= 1;
  if (existedCdkeyInfo.cdkeyAvailableCount === 0) {
    logger.info(`cdkey ${cdkey} is used up, delete it.`);
    databaseBatchOperation.push(existedCdkeyInfo.destroy({ transaction }));
    databaseBatchOperation.push(HistoryExchangedCdkeyModel.create(existedCdkeyInfo.toJSON(), { transaction }));
  } else {
    databaseBatchOperation.push(existedCdkeyInfo.save({ transaction }));
  }

  logger.info(`cdkey ${cdkey} is exchanged. operator: ${userId}, begin write change to database.`);
  await Promise.all<unknown>(databaseBatchOperation);
  return exchangeCdkeyUseHistory;
};

This post is licensed under CC BY 4.0 by the author.

重新理解 Node.js 事件循环

minikube(k8s)日志持久化方案