Node.js 多进程与进程通讯

Node.js 开启多进程的方式及进程间通讯方法

问题

Node.js 如何开启多进程?进程之间如何通讯?

解答

Node.js 是单线程的,但可以通过 child_processcluster 模块开启多进程,充分利用多核 CPU。

1. child_process 模块

fork 方式

fork 专门用于创建 Node.js 子进程,自带通讯通道。

// master.js
const { fork } = require('child_process');

// 创建子进程
const child = fork('./child.js');

// 向子进程发送消息
child.send({ type: 'start', data: 100 });

// 接收子进程消息
child.on('message', (msg) => {
  console.log('主进程收到:', msg);
});

// 子进程退出
child.on('exit', (code) => {
  console.log('子进程退出,code:', code);
});
// child.js
// 接收主进程消息
process.on('message', (msg) => {
  console.log('子进程收到:', msg);
  
  // 处理任务
  const result = msg.data * 2;
  
  // 向主进程发送消息
  process.send({ type: 'result', data: result });
});

spawn 方式

spawn 用于执行命令,通过 stdio 流通讯。

const { spawn } = require('child_process');

// 执行 ls 命令
const ls = spawn('ls', ['-la']);

// 通过 stdout 获取输出
ls.stdout.on('data', (data) => {
  console.log(`输出: ${data}`);
});

ls.stderr.on('data', (data) => {
  console.error(`错误: ${data}`);
});

ls.on('close', (code) => {
  console.log(`退出码: ${code}`);
});

exec 方式

exec 执行命令并缓冲输出,适合输出量小的场景。

const { exec } = require('child_process');

exec('node -v', (error, stdout, stderr) => {
  if (error) {
    console.error('执行错误:', error);
    return;
  }
  console.log('Node 版本:', stdout.trim());
});

2. cluster 模块

cluster 用于创建共享端口的多进程服务器。

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`主进程 ${process.pid} 启动`);

  // 创建与 CPU 核数相同的 worker
  for (let i = 0; i < numCPUs; i++) {
    const worker = cluster.fork();
    
    // 向 worker 发送消息
    worker.send({ type: 'init', id: i });
    
    // 接收 worker 消息
    worker.on('message', (msg) => {
      console.log(`收到 worker ${worker.id} 消息:`, msg);
    });
  }

  // worker 退出时重启
  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} 退出`);
    cluster.fork();
  });

} else {
  // worker 进程
  
  // 接收主进程消息
  process.on('message', (msg) => {
    console.log(`Worker ${process.pid} 收到:`, msg);
  });

  // 创建 HTTP 服务器,所有 worker 共享 8000 端口
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Worker ${process.pid} 处理请求\n`);
    
    // 向主进程发送消息
    process.send({ type: 'request', pid: process.pid });
  }).listen(8000);

  console.log(`Worker ${process.pid} 启动`);
}

3. 进程通讯方式对比

方式适用场景通讯方法
forkNode.js 子进程send/on(‘message’)
spawn执行命令stdin/stdout/stderr 流
exec简单命令回调获取结果
cluster多进程服务器send/on(‘message’)

4. 实际应用示例:计算密集型任务

// master.js - 主进程分发任务
const { fork } = require('child_process');
const os = require('os');

const numCPUs = os.cpus().length;
const tasks = [1, 2, 3, 4, 5, 6, 7, 8]; // 待处理任务
const results = [];
let completed = 0;

// 创建 worker 池
const workers = [];
for (let i = 0; i < numCPUs; i++) {
  const worker = fork('./worker.js');
  
  worker.on('message', (msg) => {
    results.push(msg);
    completed++;
    
    // 分配下一个任务
    if (tasks.length > 0) {
      worker.send(tasks.shift());
    }
    
    // 所有任务完成
    if (completed === 8) {
      console.log('所有结果:', results);
      workers.forEach(w => w.kill());
    }
  });
  
  workers.push(worker);
}

// 初始分配任务
workers.forEach(worker => {
  if (tasks.length > 0) {
    worker.send(tasks.shift());
  }
});
// worker.js - 处理计算任务
process.on('message', (num) => {
  // 模拟耗时计算
  const result = fibonacci(num * 5);
  process.send({ input: num, result });
});

function fibonacci(n) {
  if (n <= 1) return n;
  return fibonacci(n - 1) + fibonacci(n - 2);
}

关键点

  • fork 创建 Node.js 子进程,通过 send()on('message') 通讯
  • spawn 执行命令,通过 stdin/stdout 流通讯
  • cluster 创建共享端口的多进程服务器,适合 HTTP 服务
  • 进程通讯基于 IPC(进程间通信)通道,底层使用管道或 Unix Domain Socket
  • 多进程适合 CPU 密集型任务,可充分利用多核 CPU