Node.js API详解之 cluster

Node.js在单个线程中运行单个实例。 用户(开发者)为了使用现在的多核系统,有时候,用户(开发者)会用一串Node.js进程去处理负载任务。
cluster 模块允许简单容易的创建共享服务器端口的子进程。详细介绍之前,我们先看一个简单的例子:

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if(cluster.isMaster) {
  console.log(`主进程 ${process.pid} 正在运行`);

  // 衍生工作进程。
  for(let i = 0; i < numCPUs; i++){
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`工作进程 ${worker.process.pid} 已退出`);
  });
}else{

  // 工作进程可以共享任何 TCP 连接。
  // 在本例子中,共享的是一个 HTTP 服务器。
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('你好世界\n');
  }).listen(8000);

  console.log(`工作进程 ${process.pid} 已启动`);
}

// 主进程 63576 正在运行
// 工作进程 63578 已启动
// 工作进程 63579 已启动
// 工作进程 63577 已启动
// 工作进程 63580 已启动

上面的例子会在工作进程(指代子进程)之间共享8000端口

目录:

cluster:

Class: Worker

cluster.fork([env])

说明:

衍生出一个新的工作进程。只能通过主进程调用。
env:增加进程环境变量,以Key/value对的形式。
返回一个Worker实例

demo:

const cluster = require('cluster');
const http = require('http');

if(cluster.isMaster) {
  	console.log(`主进程 ${process.pid} 正在运行`);

  	// 衍生工作进程。
	cluster.fork();	

}else{
  	// 工作进程可以共享任何 TCP 连接。
  	// 在本例子中,共享的是一个 HTTP 服务器。
  	http.createServer((req, res) => {
		res.writeHead(200);
    	res.end('你好世界\n');
  	}).listen(8000);

  	console.log(`工作进程 ${process.pid} 已启动`);
}

// 主进程 63692 正在运行
// 工作进程 63693 已启动

listening 事件

说明:

当一个工作进程调用listen()后,工作进程上的server会触发’listening’ 事件,
同时主进程上的 cluster 也会被触发’listening’事件。
事件回调函数接收两个参数,其中worker包含了工作进程对象,
address 包含了以下连接属性: address、port 和 addressType。
当工作进程同时监听多个地址时,这些参数非常有用。

demo:

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if(cluster.isMaster) {
  console.log(`主进程 ${process.pid} 正在运行`);

  // 衍生工作进程。
  for(let i = 0; i < numCPUs; i++){
    cluster.fork();
  }

  cluster.on('listening', (worker, address) => {
  	console.log(`工作进程 ${worker.process.pid} 已启动,端口是:${address.port}`);
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log(`工作进程 ${worker.process.pid} 已退出`);
  });
}else{
  // 工作进程可以共享任何 TCP 连接。
  // 在本例子中,共享的是一个 HTTP 服务器。
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('你好世界\n');
  }).listen(8000);
}
// 主进程 13369 正在运行
// 工作进程 13370 已启动,端口是:8000
// 工作进程 13373 已启动,端口是:8000
// 工作进程 13372 已启动,端口是:8000
// 工作进程 13371 已启动,端口是:8000

fork 事件

说明:

当新的工作进程被fork时,cluster模块将触发’fork’事件。

demo:

const cluster = require('cluster');
const http = require('http');

cluster.on('fork', (worker) => {
	console.log(`一个工作进程被创建:${worker.id}`)
});

if(cluster.isMaster) {
  	console.log(`主进程 ${process.pid} 正在运行`);

  	// 衍生工作进程。
	cluster.fork();	
}
// 主进程 64608 正在运行
// 一个工作进程被创建:1

online 事件

说明:

当新建一个工作进程后,工作进程应当响应一个online消息给主进程。当主进程收到online消息后触发这个事件。
‘fork’ 事件和 ‘online’事件的不同之处在于,前者是在主进程新建工作进程后触发,而后者是在工作进程运行的时候触发。

demo:

cluster.on('online', (worker) => {
  console.log('Yay, the worker responded after it was forked');
});

message 事件

说明:

当cluster主进程接收任意工作进程发送的消息后被触发。
回调函数接受三个参数worker,message和handle

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	//主进程
  	cluster.on('message', (worker, message, handle) => {
		console.log(`收到消息:${message.name}`)
	});

  	// 衍生工作进程。
	cluster.fork();	

}else{
	//工作进程

  	cluster.worker.send({name:'xiaoqiang'});
}
// 收到消息:xiaoqiang

cluster.isMaster

说明:

当该进程是主进程时,返回 true。这是由process.env.NODE_UNIQUE_ID决定的,
当process.env.NODE_UNIQUE_ID未定义时,isMaster为true。

demo:

const cluster = require('cluster');

console.log(cluster.isMaster);
// true
console.log(process.env.NODE_UNIQUE_ID);
// undefined

cluster.isWorker

说明:

当进程不是主进程时,返回 true。(和cluster.isMaster刚好相反)

demo:

const cluster = require('cluster');

console.log(cluster.isWorker);
// false

cluster.settings

说明:

调用.setupMaster() 或 fork()后,这个settings对象将会包含以下设置项:
execArgv:传递给Node.js可执行文件的参数列表。 (Default=process.execArgv)
exec:worker文件路径。 (Default=process.argv[1])
args:传递给worker的参数。(Default=process.argv.slice(2))
silent:是否需要发送输出值父进程的stdio。(Default=false)
stdio:配置fork进程的stdio。 由于cluster模块运行依赖于IPC,这个配置必须包含’ipc’。当提供了这个选项后,将撤销silent。
uid:设置进程的user标识符。
gid:置进程的group标识符。
inspectPort:设置worker检查端口,可以是数字或返回数字的函数。默认情况下每个worker都有自己的端口,从master的process.debugPort属性递增。
这个对象不打算被修改或手动设置。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	//主进程
	console.log(process.debugPort);
	// 9229

  	// 衍生工作进程。
	cluster.fork();	
	console.log(cluster.settings)
}
// { args: [],
//   exec: '/Users/xiaoqiang/Documents/work/demo/NodeApi/app.js',
//   execArgv: [],
//   silent: false }

cluster.setupMaster([settings])

说明:

settings:cluster.settings
用于修改默认’fork’ 行为。一旦调用,将会按照cluster.settings进行设置。
所有的设置只对后来的 .fork()调用有效,对之前的工作进程无影响
唯一无法通过 .setupMaster()设置的属性是传递给.fork()的env属性
上述的默认值只在第一次调用时有效,当后续调用时,将采用cluster.setupMaster()调用时的当前值。

demo:

const cluster = require('cluster');
cluster.setupMaster({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true
});
cluster.fork(); // https worker
cluster.setupMaster({
  exec: 'worker.js',
  args: ['--use', 'http']
});
cluster.fork(); // http worker

setup 事件

说明:

每当 cluster.setupMaster() 被调用的时候触发此事件。
会将调用 cluster.setupMaster() 时传递的setting参数传递到回调函数中。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	//主进程
  	cluster.on('setup', (setting) => {
		console.log('setting发生变化:', setting);
	});

  	cluster.setupMaster({
	  args: ['user=isjs'],
	  silent: true
	});
}
// setting发生变化: { args: [ 'user=isjs' ],
//   exec: '/Users/xiaoqiang/Documents/work/demo/NodeApi/app.js',
//   execArgv: [],
//   silent: true }

cluster.schedulingPolicy

说明:

通过设置不同的值实现不同的调度策略,支持设置两种值:通过循环计数实现的均衡策略 cluster.SCHED_RR,以及由操作系统决定的均衡策略cluster.SCHED_NONE。
这是一个全局设置,当第一个工作进程被衍生或者调动cluster.setupMaster()时,都将第一时间生效。
除Windows外的所有操作系统中,SCHED_RR都是默认设置。
只要libuv可以有效地分发IOCP handle,而不会导致严重的性能冲击的话,Windows系统也会更改为SCHED_RR。
cluster.schedulingPolicy 也可以通过设置NODE_CLUSTER_SCHED_POLICY环境变量来实现。这个环境变量的有效值包括”rr” 和 “none”。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	//主进程
  	cluster.on('setup', (setting) => {
  		console.log(cluster.schedulingPolicy);
  		// 2
  		cluster.schedulingPolicy =  cluster.SCHED_NONE; 
		
		console.log(cluster.schedulingPolicy);
		// 1
	});

  	cluster.setupMaster({
	  args: ['user=isjs'],
	  silent: true
	});
	cluster.fork();
}

cluster.disconnect([callback])

说明:

在cluster.workers的每个工作进程中调用 .disconnect()。
当所有工作进程断开连接后,所有内部handle将会关闭,这个时候如果没有等待事件的话,运行主进程优雅地关闭。
这个方法可以选择添加一个回调参数,当结束时会调用这个回调函数。
这个方法只能由主进程调用。
callback:当所有工作进程都断开连接并且所有handle关闭的时候调用。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	//主进程
	cluster.fork();
	cluster.disconnect(() => {
		console.log('主进程关闭。')
	});
}
// 主进程关闭。

disconnect 事件

说明:

当进程结束时调用

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	//主进程
	cluster.fork();

	cluster.on('disconnect', () => {
		console.log('进程已关闭。');
	});

	cluster.disconnect();
}
// 主进程关闭。

exit 事件

说明:

当任何一个工作进程关闭的时候,cluster模块都将触发’exit’事件。
可以被用来重启工作进程。通过调用.fork())。
worker:关闭的工作进程
code:正常退出情况下,是退出代码.
signal:导致进程被kill的信号名称 (例如 ‘SIGHUP’)

demo:

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
const maxSatrtCount = 5;
var satrtCount = 1;

if(cluster.isMaster) {
	cluster.on('exit', (worker, code, signal) => {
	    console.log(`工作进程 ${worker.process.pid} 已退出,信号:${ signal || code }`);
	    //重启进程
	    if(satrtCount < maxSatrtCount){
	    	cluster.fork();
	    	satrtCount++;
	    }else{
	    	console.log('进程重启失败!');
	    }
	});
	cluster.fork();
}else{
	process.exit()
}
// 工作进程 13968 已退出,信号:0
// 工作进程 13969 已退出,信号:0
// 工作进程 13970 已退出,信号:0
// 工作进程 13971 已退出,信号:0
// 工作进程 13972 已退出,信号:0
// 进程重启失败!

cluster.worker

说明:

当前工作进程对象的引用,对于主进程则无效。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	console.log('I am master');
	cluster.fork();
	cluster.fork();
}else{
	console.log(`I am worker #${cluster.worker.id}`);
}
// I am master
// I am worker #1
// I am worker #2

cluster.workers

说明:

这是一个哈希表,储存了活跃的工作进程对象,id作为key。
有了它,可以方便地遍历所有工作进程。只能在主进程中调用。
工作进程断开连接以及退出后,将会从cluster.workers里面移除。
这两个事件的先后顺序并不能预先确定,
但可以保证的是, cluster.workers的移除工作在'disconnect' 和 'exit'两个事件中的最后一个触发之前完成。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	console.log('I am master');
	cluster.fork();
	cluster.fork();
	console.log(cluster.workers);
}else{
	console.log(`I am worker #${cluster.worker.id}`);
}
// { '1': Worker {...}, '2': Worker {...} }

Class: Worker

说明:

Worker对象包含了关于工作进程的所有public信息和方法。
在一个主进程里,可以使用cluster.workers来获取Worker对象。
在一个工作进程里,可以使用cluster.worker来获取Worker对象。
下面我们来介绍Worker对象的属性与方法。

worker.process

说明:

所有的工作进程都是通过child_process.fork()来创建的,这个方法返回的对象被存储为.process。
在工作进程中, process属于全局对象。
需要注意:当process上发生 'disconnect'事件,并且 exitedAfterDisconnect 的值不是true时,
工作进程会调用 process.exit(0)。这样就可以防止连接意外断开。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	cluster.fork();
}else{
	console.log('工作进程已创建:', process.pid);
}
// 工作进程已创建: 14974

worker.send(message[, sendHandle][, callback])

说明:

发送一个消息给工作进程或主进程,也可以附带发送一个handle。
主进程调用这个方法会发送消息给具体的工作进程。还有一个等价的方法是ChildProcess.send()。
工作进程调用这个方法会发送消息给主进程。还有一个等价方法是process.send()。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	const worker = cluster.fork();
	worker.send({name: 'isjs'});
	worker.on('message', (msg) => {
		console.log('主进程收到消息:', msg);
	});
}else{
	process.on('message', (msg) => {
		console.log('工作进程收到消息:', msg);
		process.send({name: 'work'});
	});
}
// 工作进程收到消息: { name: 'isjs' }
// 主进程收到消息: { name: 'work' }

online 事件

说明:

和cluster.on('online')事件类似,但针对特定的工作进程。
本事件不会在工作进程内部被触发。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	cluster.on('online', () => {
		console.log('cluster online Event')
	})

	const worker = cluster.fork();
	
	worker.on('online', () => {
		console.log('Worker online Event')
	});
}
// Worker online Event
// cluster online Event

message 事件

说明:

和cluster.on('message')事件类似,但针对特定的工作进程。
在工作进程内,可以使用process.on('message')

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	const worker = cluster.fork();
	worker.send({name: 'isjs'});
	worker.on('message', (msg) => {
		console.log('主进程收到消息:', msg);
	});
}else{
	process.on('message', (msg) => {
		console.log('工作进程收到消息:', msg);
		process.send({name: 'work'});
	});
}
// 工作进程收到消息: { name: 'isjs' }
// 主进程收到消息: { name: 'work' }

listening 事件

说明:

和cluster.on('listening')事件类似,但针对特定的工作进程。

demo:

const cluster = require('cluster');
const http = require('http');

if(cluster.isMaster) {
	const worker = cluster.fork();
	worker.on('listening', (msg) => {
		console.log(' Worker is listening');
	});
}else{
	http.createServer((req, res) => {
    	res.writeHead(200);
    	res.end('你好世界\n');
  	}).listen(8000);
}
 // Worker is listening

worker.disconnect()

说明:

在一个工作进程内,调用此方法会关闭所有的server,并等待这些server的 'close'事件执行,然后关闭IPC管道。
在主进程内调用,会给工作进程发送一个内部消息,导致工作进程自身调用.disconnect()。

demo:

const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {
  	const worker = cluster.fork();
  	let timeout;

  	worker.on('listening', (address) => {
    	worker.send('shutdown');
    	worker.disconnect();
    	timeout = setTimeout(() => {
      		worker.kill();
    	}, 2000);
  	});

  	worker.on('disconnect', () => {
    	clearTimeout(timeout);
  	});

} else if (cluster.isWorker) {
  	const net = require('net');
  	const server = net.createServer((socket) => {
    	// 连接永远不会结束
  	});

  	server.listen(8000);

  	process.on('message', (msg) => {
    	if (msg === 'shutdown') {
      	// 将所有与服务器的连接优雅关闭
		}
  	});
}

disconnect 事件

说明:

和cluster.on('disconnect')事件类似,但针对特定的工作进程。

demo:

const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {
  	const worker = cluster.fork();

  	worker.disconnect();
  
  	worker.on('disconnect', () => {
    	console.log('进程已退出');
  	});
}
// 进程已退出

worker.id

说明:

每一个新衍生的工作进程都会被赋予自己独一无二的编号,这个编号就是储存在id属性里面。
当工作进程还存活时,id可以作为在cluster.workers中的索引。

demo:

const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {
  	const worker = cluster.fork();

  	console.log(worker.id);
}
// 1

worker.isConnected()

说明:

当工作进程通过IPC管道连接至主进程时,这个方法返回true,否则返回false。
一个工作进程在创建后会自动连接到它的主进程,当'disconnect' 事件被触发时才会断开连接。

demo:

const cluster = require('cluster');

if (cluster.isMaster) {
  	const worker = cluster.fork();

  	console.log('工作进程是否连接到主进程?', worker.isConnected());
}
// 工作进程是否连接到主进程? true

worker.isDead()

说明:

当工作进程被终止时(包括自动退出或被发送信号),这个方法返回true ,否则返回false。

demo:

const cluster = require('cluster');

if (cluster.isMaster) {
  	const worker = cluster.fork();

  	worker.disconnect()
  	setTimeout(() => {
  		console.log('工作进程是否已退出?', worker.isDead());
  	},2000);
}
// 工作进程是否已退出? true

worker.exitedAfterDisconnect

说明:

当调用 .kill() 或者 .disconnect()方法时被设置,在这之前都是 undefined。
worker.exitedAfterDisconnect 可以用于区分自发退出还是被动退出,主进程可以根据这个值决定是否重新衍生新的工作进程。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	cluster.on('exit', (worker, code, signal) => {
	    console.log(`工作进程 ${worker.process.pid} 已退出,信号:${ signal || code }`);
	    //重启进程
	    if(worker.exitedAfterDisconnect === true){
	    	console.log('用户主动退出,不重启进程');
	    }else{
	    	cluster.fork();
	    	console.log('进程已重启!');
	    }
	});
	cluster.fork();
}else{
	cluster.worker.kill();
}
// 工作进程 18311 已退出,信号:0
// 用户主动退出,不重启进程


exit 事件

说明:

和cluster.on('exit')事件类似,但针对特定的工作进程。
code:若正常退出,表示退出代码.
signal:引发进程被kill的信号名称(如'SIGHUP')

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	const worker = cluster.fork();
	worker.on('exit', (code, signal) => {
	 	if (signal) {
		    console.log(`worker was killed by signal: ${signal}`);
	  	} else if (code !== 0) {
		    console.log(`worker exited with error code: ${code}`);
	  	} else {
		    console.log('worker success!');
  		}
	});
}else{
	cluster.worker.kill();
}
// worker success!

error 事件

说明:

此事件和 child_process.fork()提供的error事件相同。
在一个工作进程中,可以使用process.on('error')

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	const worker = cluster.fork();
	worker.on('error', (err) => {
		console.log('系统异常!');
	});
}else{
	cluster.worker.kill();
	process.on('error', (err) => {
		console.log('进程异常!');
	});
}
// worker success!

worker.kill([signal='SIGTERM'])

说明:

这个方法将会kill工作进程。
在主进程中,通过断开与worker.process的连接来实现,一旦断开连接后,通过signal来杀死工作进程。
在工作进程中,通过断开IPC管道来实现,然后以代码0退出进程。
将导致.exitedAfterDisconnect被设置。
为向后兼容,这个方法与worker.destroy()等义。
需要注意的是,在工作进程中有一个方法process.kill() ,这个方法本方法不同,本方法是kill。

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	const worker = cluster.fork();
	worker.on('exit', () => {
		console.log('系统退出!');
	});
	worker.kill()
}
// 系统退出!

worker.suicide (已废弃)

说明:

调用.kill() 或.disconnect()被设置,否则一直是 undefined。
worker.suicide用于区分自发退出或被动退出,主进程可根据这个值来决定是否重新衍生新的工作进程。
使用:worker.exitedAfterDisconnect 替代使用

demo:

const cluster = require('cluster');

if(cluster.isMaster) {
	cluster.on('exit', (worker, code, signal) => {
	    console.log(`工作进程 ${worker.process.pid} 已退出,信号:${ signal || code }`);
	    //重启进程
	    if(worker.suicide === true){
	    	console.log('用户主动退出,不重启进程');
	    }else{
	    	cluster.fork();
	    	console.log('进程已重启!');
	    }
	});
	cluster.fork();
}else{
	cluster.worker.kill();
}
// 工作进程 18311 已退出,信号:0
// 用户主动退出,不重启进程