并发请求调度器
实现限制并发数量的请求调度器
问题
实现一个调度器 Scheduler,限制同时运行的异步任务数量。
const scheduler = new Scheduler(2) // 最多同时执行 2 个任务
scheduler.add(() => fetch('/api/1')) // 立即执行
scheduler.add(() => fetch('/api/2')) // 立即执行
scheduler.add(() => fetch('/api/3')) // 等待前面的完成
scheduler.add(() => fetch('/api/4')) // 等待前面的完成
解答
class Scheduler {
constructor(maxConcurrent) {
this.maxConcurrent = maxConcurrent // 最大并发数
this.running = 0 // 当前运行数
this.queue = [] // 等待队列
}
add(task) {
return new Promise((resolve, reject) => {
// 将任务和对应的 resolve/reject 存入队列
this.queue.push({ task, resolve, reject })
this.run()
})
}
run() {
// 当有空闲槽位且队列不为空时,执行任务
while (this.running < this.maxConcurrent && this.queue.length > 0) {
const { task, resolve, reject } = this.queue.shift()
this.running++
// 执行任务,完成后释放槽位
Promise.resolve(task())
.then(resolve)
.catch(reject)
.finally(() => {
this.running--
this.run() // 尝试执行下一个任务
})
}
}
}
测试代码
const scheduler = new Scheduler(2)
const delay = (ms, name) => {
return new Promise(resolve => {
console.log(`${name} 开始`)
setTimeout(() => {
console.log(`${name} 结束`)
resolve(name)
}, ms)
})
}
scheduler.add(() => delay(1000, '任务1'))
scheduler.add(() => delay(500, '任务2'))
scheduler.add(() => delay(300, '任务3'))
scheduler.add(() => delay(400, '任务4'))
// 输出顺序:
// 任务1 开始
// 任务2 开始
// 任务2 结束
// 任务3 开始
// 任务3 结束
// 任务4 开始
// 任务1 结束
// 任务4 结束
简化版本
如果不需要获取任务返回值:
class Scheduler {
constructor(max) {
this.max = max
this.running = 0
this.queue = []
}
add(task) {
this.queue.push(task)
this.run()
}
run() {
while (this.running < this.max && this.queue.length) {
this.running++
this.queue.shift()().finally(() => {
this.running--
this.run()
})
}
}
}
关键点
- 用队列存储待执行任务,用计数器跟踪当前并发数
add返回 Promise,让调用者能获取任务结果- 任务完成后在
finally中释放槽位并触发下一个任务 Promise.resolve(task())兼容同步和异步任务while循环确保一次性填满所有空闲槽位
目录