实现有并行限制的 Promise 调度器

实现一个 Promise 调度器,控制并发执行的 Promise 数量,当有任务完成时自动补充新任务

问题

在实际开发中,我们经常需要控制异步任务的并发数量,比如:

  • 批量请求接口时,避免同时发起过多请求导致服务器压力过大
  • 批量下载文件时,限制同时下载的文件数量
  • 爬虫任务中,控制并发请求数避免被封禁

需要实现一个 Promise 调度器(Scheduler),支持添加异步任务,并且能够控制同时执行的任务数量上限。当有任务完成时,自动从队列中取出新任务执行。

解答

class Scheduler {
  constructor(limit) {
    this.limit = limit; // 并发上限
    this.runningCount = 0; // 当前正在执行的任务数
    this.queue = []; // 等待执行的任务队列
  }

  add(promiseCreator) {
    return new Promise((resolve, reject) => {
      // 将任务包装后加入队列
      this.queue.push({
        promiseCreator,
        resolve,
        reject
      });
      // 尝试执行任务
      this.run();
    });
  }

  run() {
    // 如果当前执行数量已达上限,或队列为空,则不执行
    if (this.runningCount >= this.limit || this.queue.length === 0) {
      return;
    }

    // 从队列中取出第一个任务
    const { promiseCreator, resolve, reject } = this.queue.shift();
    this.runningCount++;

    // 执行任务
    promiseCreator()
      .then(resolve)
      .catch(reject)
      .finally(() => {
        // 任务完成后,减少计数并尝试执行下一个任务
        this.runningCount--;
        this.run();
      });
  }
}

使用示例

// 创建一个并发限制为 2 的调度器
const scheduler = new Scheduler(2);

// 模拟异步任务
const timeout = (time, value) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log(`任务${value}完成`);
      resolve(value);
    }, time);
  });
};

// 添加任务
const addTask = (time, order) => {
  scheduler.add(() => timeout(time, order)).then((result) => {
    console.log(`任务${result}返回结果`);
  });
};

// 测试
addTask(1000, '1'); // 1秒后完成
addTask(500, '2');  // 0.5秒后完成
addTask(300, '3');  // 0.3秒后完成
addTask(400, '4');  // 0.4秒后完成

// 执行顺序:
// 任务1和任务2先执行(并发限制为2)
// 0.5秒后,任务2完成,任务3开始执行
// 0.8秒后,任务3完成,任务4开始执行
// 1秒后,任务1完成
// 1.2秒后,任务4完成

// 输出顺序:
// 任务2完成
// 任务2返回结果
// 任务3完成
// 任务3返回结果
// 任务1完成
// 任务1返回结果
// 任务4完成
// 任务4返回结果

关键点

  • 队列管理:使用数组维护等待执行的任务队列,采用先进先出(FIFO)的策略

  • 并发控制:通过 runningCount 计数器追踪当前正在执行的任务数量,确保不超过 limit 限制

  • Promise 包装:在 add 方法中返回新的 Promise,将原始任务的 resolvereject 保存到队列中,实现异步结果的正确传递

  • 自动调度:任务完成后在 finally 中递归调用 run 方法,自动从队列中取出新任务执行,实现连续调度

  • 状态判断:在 run 方法开始时检查是否达到并发上限或队列为空,避免无效执行

  • 错误处理:通过 catch 捕获任务执行中的错误,确保即使任务失败也能正确释放并发位并继续执行后续任务