diff --git a/doc/api/cluster.markdown b/doc/api/cluster.markdown index efcb02f7514..7df84b1e4c5 100644 --- a/doc/api/cluster.markdown +++ b/doc/api/cluster.markdown @@ -9,10 +9,17 @@ which share server ports. var cluster = require('cluster'); var http = require('http'); + var numCPUs = require('os').cpus().length; if (cluster.isMaster) { - // Start the master process, fork workers. - cluster.startMaster({ workers: 2 }); + // Fork workers. + for (var i = 0; i < numCPUs; i++) { + cluster.fork(); + } + + cluster.on('death', function(worker) { + console.log('worker ' + worker.pid + ' died'); + }); } else { // Worker processes have a http server. http.Server(function(req, res) { @@ -27,37 +34,38 @@ Running node will now share port 8000 between the workers: Worker 2438 online Worker 2437 online -### exports.startMaster([options]) +### cluster.fork() - Spawns the initial worker processes, one per CPU by default. +Spawn a new worker process. This can only be called from the master process. - The following options are supported: +### cluster.isMaster +### cluster.isWorker - - `workerFilename`: script to execute in the worker process, defaults to - `process.argv[1]` - - `args`: worker program arguments, defaulting to `process.argv.slice(2)` - - `workers`: the number of workers, defaulting to `os.cpus().length` +Boolean flags to determine if the current process is a master or a worker +process in a cluster. A process `isMaster` if `process.env.NODE_WORKER_ID` +is undefined. -### exports.spawnWorker([options]) +### cluster.eachWorker(cb) - Spawn a new worker process. This is called within `cluster.startMaster()`, - however it is useful to implement worker resuscitation as described below - in the "Common patterns" section. +Synchronously iterates over all of the workers. - The `options` available are identical to `cluster.startMaster()`. + cluster.eachWorker(function(worker) { + console.log("worker pid=" + worker.pid); + }); -## Common patterns +### cluster.workerCount() -## Worker resuscitation +Returns the number of workers. -The following is an example of how you may implement worker resuscitation, -spawning a new worker process when another exits. +### Event: 'death' - if (cluster.isMaster) { - cluster.startMaster(); - process.on('SIGCHLD', function(){ - console.log('worker killed'); - cluster.spawnWorker(); - }); - } +When any of the workers die the cluster module will emit the 'death' event. +This can be used to restart the worker by calling `fork()` again. + cluster.on('death', function(worker) { + console.log('worker ' + worker.pid + ' died. restart...'); + cluster.fork(); + }); + +Different techniques can be used to restart the worker depending on the +application. diff --git a/lib/cluster.js b/lib/cluster.js index 5cb1c2b0db9..9e8a394785b 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -22,7 +22,9 @@ var assert = require('assert'); var fork = require('child_process').fork; var net = require('net'); -var amMaster; // Used for asserts +var EventEmitter = require('events').EventEmitter; + +var cluster = module.exports = new EventEmitter(); var debug; @@ -38,17 +40,20 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) { // Used in the master: +var masterStarted = false; var ids = 0; var workers = []; var servers = {}; +var workerFilename; +var workerArgs; // Used in the worker: var workerId = 0; var queryIds = 0; var queryCallbacks = {}; -exports.isWorker = 'NODE_WORKER_ID' in process.env; -exports.isMaster = ! exports.isWorker; +cluster.isWorker = 'NODE_WORKER_ID' in process.env; +cluster.isMaster = ! cluster.isWorker; // Call this from the master process. It will start child workers. // @@ -62,38 +67,23 @@ exports.isMaster = ! exports.isWorker; // // options.workers // The number of workers to start. Defaults to os.cpus().length. -exports.startMaster = function(options) { - amMaster = true; +function startMaster() { + // This can only be called from the master. + assert(cluster.isMaster); - if (!options) { - options = {}; - } + if (masterStarted) return; + masterStarted = true; - if (!options.workerFilename) { - options.workerFilename = process.argv[1]; - } - - if (!options.args) { - options.args = process.argv.slice(2); - } - - if (!options.workers) { - options.workers = require('os').cpus().length; - } - - for (var i = 0; i < options.workers; i++) { - forkWorker(options.workerFilename, options.args); - } + workerFilename = process.argv[1]; + workerArgs = process.argv.slice(2); process.on('uncaughtException', function(e) { // Quickly try to kill all the workers. // TODO: be session leader - will cause auto SIGHUP to the children. - for (var id in workers) { - if (workers[id]) { - debug("kill worker " + id); - workers[id].kill(); - } - } + cluster.eachWorker(function(worker) { + debug("kill worker " + worker.pid); + worker.kill(); + }) console.error("Exception in cluster master process: " + e.message + '\n' + e.stack); @@ -104,7 +94,8 @@ exports.startMaster = function(options) { function handleWorkerMessage(worker, message) { - assert.ok(amMaster); + // This can only be called from the master. + assert(cluster.isMaster); debug("recv " + JSON.stringify(message)); @@ -137,7 +128,34 @@ function handleWorkerMessage(worker, message) { } -function forkWorker(workerFilename, args) { +cluster.eachWorker = function(cb) { + // This can only be called from the master. + assert(cluster.isMaster); + + for (var id in workers) { + if (workers[id]) { + cb(workers[id]); + } + } +}; + + +cluster.workerCount = function() { + var c = 0; + cluster.eachWorker(function() { + c++; + }); + return c; +}; + + +cluster.fork = function() { + // This can only be called from the master. + assert(cluster.isMaster); + + // Lazily start the master process stuff. + startMaster(); + var id = ++ids; var envCopy = {}; @@ -147,9 +165,7 @@ function forkWorker(workerFilename, args) { envCopy['NODE_WORKER_ID'] = id; - var worker = fork(workerFilename, args, { - env: envCopy - }); + var worker = fork(workerFilename, workerArgs, { env: envCopy }); worker.on('message', function(message) { handleWorkerMessage(worker, message); @@ -158,15 +174,16 @@ function forkWorker(workerFilename, args) { worker.on('exit', function() { debug('worker id=' + id + ' died'); delete workers[id]; + cluster.emit('death', worker); }); return worker; } -exports.startWorker = function() { - assert.ok(!amMaster); - amMaster = false; +// Internal function. Called from src/node.js when worker process starts. +cluster._startWorker = function() { + assert(cluster.isWorker); workerId = parseInt(process.env.NODE_WORKER_ID); queryMaster({ cmd: 'online' }); @@ -186,7 +203,7 @@ exports.startWorker = function() { function queryMaster(msg, cb) { - assert.ok(!amMaster); + assert(cluster.isWorker); debug('send ' + JSON.stringify(msg)); @@ -194,7 +211,7 @@ function queryMaster(msg, cb) { msg._queryId = (++queryIds); msg._workerId = workerId; - // Store callback for later. Callback called in startWorker. + // Store callback for later. Callback called in _startWorker. if (cb) { queryCallbacks[msg._queryId] = cb; } @@ -204,8 +221,10 @@ function queryMaster(msg, cb) { } -exports.getServer = function(address, port, addressType, cb) { - assert.ok(!amMaster); +// Internal function. Called by lib/net.js when attempting to bind a +// server. +cluster._getServer = function(address, port, addressType, cb) { + assert(cluster.isWorker); queryMaster({ cmd: "queryServer", diff --git a/lib/net.js b/lib/net.js index 92437f4d9b7..c1d09ee86fd 100644 --- a/lib/net.js +++ b/lib/net.js @@ -714,7 +714,7 @@ Server.prototype._listen2 = function(address, port, addressType) { function listen(self, address, port, addressType) { if (process.env.NODE_WORKER_ID) { - require('cluster').getServer(address, port, addressType, function(handle) { + require('cluster')._getServer(address, port, addressType, function(handle) { self._handle = handle; self._listen2(address, port, addressType); }); diff --git a/src/node.js b/src/node.js index 2e2fc1b33e9..a6788b380f1 100644 --- a/src/node.js +++ b/src/node.js @@ -88,7 +88,7 @@ // channel. if (process.env.NODE_WORKER_ID) { var cluster = NativeModule.require('cluster'); - cluster.startWorker(); + cluster._startWorker(); } var Module = NativeModule.require('module');