实现有并行限制的 Promise 调度器
实现一个 Promise 调度器,控制并发执行的 Promise 数量,当有任务完成时自动补充新任务
问题
在实际开发中,我们经常需要控制异步任务的并发数量,比如:
- 批量请求接口时,避免同时发起过多请求导致服务器压力过大
- 批量下载文件时,限制同时下载的文件数量
- 爬虫任务中,控制并发请求数避免被封禁
需要实现一个 Promise 调度器(Scheduler),支持添加异步任务,并且能够控制同时执行的任务数量上限。当有任务完成时,自动从队列中取出新任务执行。
解答
class Scheduler {
constructor(limit) {
this.limit = limit; // 并发上限
this.runningCount = 0; // 当前正在执行的任务数
this.queue = []; // 等待执行的任务队列
}
add(promiseCreator) {
return new Promise((resolve, reject) => {
// 将任务包装后加入队列
this.queue.push({
promiseCreator,
resolve,
reject
});
// 尝试执行任务
this.run();
});
}
run() {
// 如果当前执行数量已达上限,或队列为空,则不执行
if (this.runningCount >= this.limit || this.queue.length === 0) {
return;
}
// 从队列中取出第一个任务
const { promiseCreator, resolve, reject } = this.queue.shift();
this.runningCount++;
// 执行任务
promiseCreator()
.then(resolve)
.catch(reject)
.finally(() => {
// 任务完成后,减少计数并尝试执行下一个任务
this.runningCount--;
this.run();
});
}
}
使用示例
// 创建一个并发限制为 2 的调度器
const scheduler = new Scheduler(2);
// 模拟异步任务
const timeout = (time, value) => {
return new Promise((resolve) => {
setTimeout(() => {
console.log(`任务${value}完成`);
resolve(value);
}, time);
});
};
// 添加任务
const addTask = (time, order) => {
scheduler.add(() => timeout(time, order)).then((result) => {
console.log(`任务${result}返回结果`);
});
};
// 测试
addTask(1000, '1'); // 1秒后完成
addTask(500, '2'); // 0.5秒后完成
addTask(300, '3'); // 0.3秒后完成
addTask(400, '4'); // 0.4秒后完成
// 执行顺序:
// 任务1和任务2先执行(并发限制为2)
// 0.5秒后,任务2完成,任务3开始执行
// 0.8秒后,任务3完成,任务4开始执行
// 1秒后,任务1完成
// 1.2秒后,任务4完成
// 输出顺序:
// 任务2完成
// 任务2返回结果
// 任务3完成
// 任务3返回结果
// 任务1完成
// 任务1返回结果
// 任务4完成
// 任务4返回结果
关键点
-
队列管理:使用数组维护等待执行的任务队列,采用先进先出(FIFO)的策略
-
并发控制:通过
runningCount计数器追踪当前正在执行的任务数量,确保不超过limit限制 -
Promise 包装:在
add方法中返回新的 Promise,将原始任务的resolve和reject保存到队列中,实现异步结果的正确传递 -
自动调度:任务完成后在
finally中递归调用run方法,自动从队列中取出新任务执行,实现连续调度 -
状态判断:在
run方法开始时检查是否达到并发上限或队列为空,避免无效执行 -
错误处理:通过
catch捕获任务执行中的错误,确保即使任务失败也能正确释放并发位并继续执行后续任务
目录