并发请求调度器

实现限制并发数量的请求调度器

问题

实现一个调度器 Scheduler,限制同时运行的异步任务数量。

const scheduler = new Scheduler(2) // 最多同时执行 2 个任务

scheduler.add(() => fetch('/api/1')) // 立即执行
scheduler.add(() => fetch('/api/2')) // 立即执行
scheduler.add(() => fetch('/api/3')) // 等待前面的完成
scheduler.add(() => fetch('/api/4')) // 等待前面的完成

解答

class Scheduler {
  constructor(maxConcurrent) {
    this.maxConcurrent = maxConcurrent // 最大并发数
    this.running = 0                    // 当前运行数
    this.queue = []                     // 等待队列
  }

  add(task) {
    return new Promise((resolve, reject) => {
      // 将任务和对应的 resolve/reject 存入队列
      this.queue.push({ task, resolve, reject })
      this.run()
    })
  }

  run() {
    // 当有空闲槽位且队列不为空时,执行任务
    while (this.running < this.maxConcurrent && this.queue.length > 0) {
      const { task, resolve, reject } = this.queue.shift()
      this.running++

      // 执行任务,完成后释放槽位
      Promise.resolve(task())
        .then(resolve)
        .catch(reject)
        .finally(() => {
          this.running--
          this.run() // 尝试执行下一个任务
        })
    }
  }
}

测试代码

const scheduler = new Scheduler(2)

const delay = (ms, name) => {
  return new Promise(resolve => {
    console.log(`${name} 开始`)
    setTimeout(() => {
      console.log(`${name} 结束`)
      resolve(name)
    }, ms)
  })
}

scheduler.add(() => delay(1000, '任务1'))
scheduler.add(() => delay(500, '任务2'))
scheduler.add(() => delay(300, '任务3'))
scheduler.add(() => delay(400, '任务4'))

// 输出顺序:
// 任务1 开始
// 任务2 开始
// 任务2 结束
// 任务3 开始
// 任务3 结束
// 任务4 开始
// 任务1 结束
// 任务4 结束

简化版本

如果不需要获取任务返回值:

class Scheduler {
  constructor(max) {
    this.max = max
    this.running = 0
    this.queue = []
  }

  add(task) {
    this.queue.push(task)
    this.run()
  }

  run() {
    while (this.running < this.max && this.queue.length) {
      this.running++
      this.queue.shift()().finally(() => {
        this.running--
        this.run()
      })
    }
  }
}

关键点

  • 用队列存储待执行任务,用计数器跟踪当前并发数
  • add 返回 Promise,让调用者能获取任务结果
  • 任务完成后在 finally 中释放槽位并触发下一个任务
  • Promise.resolve(task()) 兼容同步和异步任务
  • while 循环确保一次性填满所有空闲槽位