异步并发数限制
实现一个异步任务调度器,控制同时执行的异步任务数量,常用于接口请求限流、资源加载控制等场景
问题
在实际开发中,我们经常需要处理大量异步任务(如批量请求接口、批量下载文件等),如果同时发起所有请求,可能会导致:
- 浏览器并发连接数限制
- 服务器压力过大
- 内存占用过高
因此需要实现一个异步任务调度器,限制同时执行的异步任务数量,当有任务完成后,自动执行队列中的下一个任务。
解答
class AsyncLimit {
constructor(limit) {
this.limit = limit; // 最大并发数
this.count = 0; // 当前正在执行的任务数
this.queue = []; // 等待执行的任务队列
}
/**
* 添加异步任务
* @param {Function} fn - 返回 Promise 的异步函数
* @returns {Promise} 返回任务执行结果
*/
async add(fn) {
// 如果当前执行数量超过限制,则将任务加入队列等待
if (this.count >= this.limit) {
await new Promise(resolve => this.queue.push(resolve));
}
// 执行任务
this.count++;
try {
const result = await fn();
return result;
} catch (error) {
throw error;
} finally {
this.count--;
// 任务完成后,从队列中取出下一个任务执行
if (this.queue.length > 0) {
const resolve = this.queue.shift();
resolve();
}
}
}
}
// 方法二:使用 Promise.race 实现
class AsyncLimitRace {
constructor(limit) {
this.limit = limit;
this.tasks = []; // 存储正在执行的任务
}
/**
* 批量执行异步任务
* @param {Array} taskList - 任务函数数组
* @returns {Promise<Array>} 返回所有任务结果
*/
async run(taskList) {
const results = [];
for (let i = 0; i < taskList.length; i++) {
const task = taskList[i]().then(
result => {
results[i] = { status: 'fulfilled', value: result };
return result;
},
error => {
results[i] = { status: 'rejected', reason: error };
throw error;
}
);
this.tasks.push(task);
// 当达到并发限制时,等待最快完成的任务
if (this.tasks.length >= this.limit) {
await Promise.race(this.tasks).catch(() => {});
// 移除已完成的任务
this.tasks = this.tasks.filter(t => t !== task);
}
}
// 等待所有剩余任务完成
await Promise.allSettled(this.tasks);
return results;
}
}
使用示例
// 模拟异步请求
function createRequest(url, delay) {
return () => {
return new Promise((resolve) => {
console.log(`开始请求: ${url}`);
setTimeout(() => {
console.log(`完成请求: ${url}`);
resolve(`${url} 的响应数据`);
}, delay);
});
};
}
// 示例1:使用 AsyncLimit 类
async function example1() {
const scheduler = new AsyncLimit(2); // 最多同时执行 2 个任务
const requests = [
createRequest('url1', 1000),
createRequest('url2', 2000),
createRequest('url3', 1500),
createRequest('url4', 1000),
createRequest('url5', 500),
];
const promises = requests.map(request => scheduler.add(request));
const results = await Promise.all(promises);
console.log('所有请求完成:', results);
}
// 示例2:使用 AsyncLimitRace 类
async function example2() {
const scheduler = new AsyncLimitRace(3); // 最多同时执行 3 个任务
const tasks = [
createRequest('api/1', 1000),
createRequest('api/2', 2000),
createRequest('api/3', 1500),
createRequest('api/4', 1000),
createRequest('api/5', 500),
];
const results = await scheduler.run(tasks);
console.log('所有任务完成:', results);
}
// 示例3:实际应用场景 - 批量上传文件
async function batchUpload(files) {
const uploader = new AsyncLimit(3);
const uploadTasks = files.map(file => {
return uploader.add(async () => {
// 模拟文件上传
const formData = new FormData();
formData.append('file', file);
const response = await fetch('/upload', {
method: 'POST',
body: formData
});
return response.json();
});
});
return Promise.all(uploadTasks);
}
// 执行示例
example1();
关键点
-
并发控制:通过计数器
count跟踪当前执行的任务数,当达到限制时将新任务加入队列等待 -
队列管理:使用数组存储等待执行的任务,每个任务完成后从队列中取出下一个任务执行
-
Promise 封装:将队列中的等待逻辑封装成 Promise,通过
resolve来唤醒等待的任务 -
错误处理:使用
try-catch-finally确保无论任务成功或失败,都能正确更新计数器并执行下一个任务 -
两种实现方式:
- 方法一适合动态添加任务的场景
- 方法二适合一次性批量执行已知任务列表的场景
-
实际应用场景:
- 批量接口请求限流
- 大文件分片上传控制
- 图片懒加载并发控制
- 爬虫请求频率限制
目录