异步并发数限制

实现一个异步任务调度器,控制同时执行的异步任务数量,常用于接口请求限流、资源加载控制等场景

问题

在实际开发中,我们经常需要处理大量异步任务(如批量请求接口、批量下载文件等),如果同时发起所有请求,可能会导致:

  • 浏览器并发连接数限制
  • 服务器压力过大
  • 内存占用过高

因此需要实现一个异步任务调度器,限制同时执行的异步任务数量,当有任务完成后,自动执行队列中的下一个任务。

解答

class AsyncLimit {
  constructor(limit) {
    this.limit = limit; // 最大并发数
    this.count = 0; // 当前正在执行的任务数
    this.queue = []; // 等待执行的任务队列
  }

  /**
   * 添加异步任务
   * @param {Function} fn - 返回 Promise 的异步函数
   * @returns {Promise} 返回任务执行结果
   */
  async add(fn) {
    // 如果当前执行数量超过限制,则将任务加入队列等待
    if (this.count >= this.limit) {
      await new Promise(resolve => this.queue.push(resolve));
    }

    // 执行任务
    this.count++;
    try {
      const result = await fn();
      return result;
    } catch (error) {
      throw error;
    } finally {
      this.count--;
      // 任务完成后,从队列中取出下一个任务执行
      if (this.queue.length > 0) {
        const resolve = this.queue.shift();
        resolve();
      }
    }
  }
}

// 方法二:使用 Promise.race 实现
class AsyncLimitRace {
  constructor(limit) {
    this.limit = limit;
    this.tasks = []; // 存储正在执行的任务
  }

  /**
   * 批量执行异步任务
   * @param {Array} taskList - 任务函数数组
   * @returns {Promise<Array>} 返回所有任务结果
   */
  async run(taskList) {
    const results = [];
    
    for (let i = 0; i < taskList.length; i++) {
      const task = taskList[i]().then(
        result => {
          results[i] = { status: 'fulfilled', value: result };
          return result;
        },
        error => {
          results[i] = { status: 'rejected', reason: error };
          throw error;
        }
      );

      this.tasks.push(task);

      // 当达到并发限制时,等待最快完成的任务
      if (this.tasks.length >= this.limit) {
        await Promise.race(this.tasks).catch(() => {});
        // 移除已完成的任务
        this.tasks = this.tasks.filter(t => t !== task);
      }
    }

    // 等待所有剩余任务完成
    await Promise.allSettled(this.tasks);
    return results;
  }
}

使用示例

// 模拟异步请求
function createRequest(url, delay) {
  return () => {
    return new Promise((resolve) => {
      console.log(`开始请求: ${url}`);
      setTimeout(() => {
        console.log(`完成请求: ${url}`);
        resolve(`${url} 的响应数据`);
      }, delay);
    });
  };
}

// 示例1:使用 AsyncLimit 类
async function example1() {
  const scheduler = new AsyncLimit(2); // 最多同时执行 2 个任务

  const requests = [
    createRequest('url1', 1000),
    createRequest('url2', 2000),
    createRequest('url3', 1500),
    createRequest('url4', 1000),
    createRequest('url5', 500),
  ];

  const promises = requests.map(request => scheduler.add(request));
  const results = await Promise.all(promises);
  console.log('所有请求完成:', results);
}

// 示例2:使用 AsyncLimitRace 类
async function example2() {
  const scheduler = new AsyncLimitRace(3); // 最多同时执行 3 个任务

  const tasks = [
    createRequest('api/1', 1000),
    createRequest('api/2', 2000),
    createRequest('api/3', 1500),
    createRequest('api/4', 1000),
    createRequest('api/5', 500),
  ];

  const results = await scheduler.run(tasks);
  console.log('所有任务完成:', results);
}

// 示例3:实际应用场景 - 批量上传文件
async function batchUpload(files) {
  const uploader = new AsyncLimit(3);

  const uploadTasks = files.map(file => {
    return uploader.add(async () => {
      // 模拟文件上传
      const formData = new FormData();
      formData.append('file', file);
      const response = await fetch('/upload', {
        method: 'POST',
        body: formData
      });
      return response.json();
    });
  });

  return Promise.all(uploadTasks);
}

// 执行示例
example1();

关键点

  • 并发控制:通过计数器 count 跟踪当前执行的任务数,当达到限制时将新任务加入队列等待

  • 队列管理:使用数组存储等待执行的任务,每个任务完成后从队列中取出下一个任务执行

  • Promise 封装:将队列中的等待逻辑封装成 Promise,通过 resolve 来唤醒等待的任务

  • 错误处理:使用 try-catch-finally 确保无论任务成功或失败,都能正确更新计数器并执行下一个任务

  • 两种实现方式

    • 方法一适合动态添加任务的场景
    • 方法二适合一次性批量执行已知任务列表的场景
  • 实际应用场景

    • 批量接口请求限流
    • 大文件分片上传控制
    • 图片懒加载并发控制
    • 爬虫请求频率限制