diff --git a/master.js b/master.js index 9e89799..a29ec6a 100644 --- a/master.js +++ b/master.js @@ -30,24 +30,21 @@ function setup(opts) { var crypto = require('crypto'); var methods = getMethods(opts.instance, opts.methods); var token = crypto.randomBytes(16).toString('hex'); - var msgPrefix = 'cluster-rpc.' + opts.name; - var rpcPrefix = msgPrefix + '.rpc'; - var resultPrefix = msgPrefix + '.result'; - var initPrefix = msgPrefix + '.init'; var inst = opts.instance; + var prefixes = require('./prefixes.js').create(opts); // uses opts.name - opts.master = opts.master || require('./process/master').create(); + opts.master = opts.master || require('./process/master').create(prefixes); opts.master.on('connection', function (w) { - //console.log('debug w: worker connection'); + if (opts.debug) { console.log('[cluster-rpc] [master] worker connected'); } w.send({ methods: methods , _token: token - , type: initPrefix + , type: prefixes.init }); w.on('message', function (cmd) { - if (0 !== (cmd.type||'').indexOf(msgPrefix)) { + if (0 !== (cmd.type||'').indexOf(prefixes.root)) { //console.log('debug w: got unknown message type'); return; } @@ -62,7 +59,7 @@ function setup(opts) { } switch (cmd.type) { - case rpcPrefix: + case prefixes.rpc: cmd.args.push(function callback() { // args is probably err, data in most cases var args = Array.prototype.slice.call(arguments); @@ -72,7 +69,7 @@ function setup(opts) { , id: cmd.id //, this: this , _token: token - , type: resultPrefix + , type: prefixes.result }); }); @@ -89,12 +86,14 @@ function setup(opts) { } module.exports.create = function (opts) { + if (opts.debug) { console.log('[cluster-rpc] [master] create'); } var cluster = require('cluster'); var PromiseA = opts.PromiseA || global.Promise || require('bluebird'); var init = false; opts._promise = PromiseA.resolve(opts.instance); opts._promise.addWorker = function (w) { + if (opts.debug) { console.log('[cluster-rpc] [master] addWorker wrapper'); } if (!init) { init = true; setup(opts); @@ -103,6 +102,7 @@ module.exports.create = function (opts) { }; if (false !== opts.addOnFork) { + if (opts.debug) { console.log('[cluster-rpc] [master] -- will call addWorker on each fork'); } cluster.on('fork', opts._promise.addWorker); } diff --git a/package.json b/package.json index bbb4acd..ef8eaea 100644 --- a/package.json +++ b/package.json @@ -1,14 +1,15 @@ { "name": "cluster-rpc", - "version": "1.0.4", + "version": "1.0.6", "description": "A simple way to wrap a single-instance module to enable it to work with node cluster.", + "homepage": "https://git.coolaj86.com/coolaj86/cluster-rpc.js", "main": "index.js", "scripts": { "test": "node test.js" }, "repository": { "type": "git", - "url": "git+ssh://git@github.com/coolaj86/cluster-rpc.git" + "url": "git+https://git.coolaj86.com/coolaj86/cluster-rpc.js.git" }, "keywords": [ "cluster", @@ -17,7 +18,6 @@ "author": "AJ ONeal (https://coolaj86.com/)", "license": "(MIT OR Apache-2.0)", "bugs": { - "url": "https://github.com/coolaj86/cluster-rpc/issues" - }, - "homepage": "https://github.com/coolaj86/cluster-rpc#readme" + "url": "https://git.coolaj86.com/coolaj86/cluster-rpc.js/issues" + } } diff --git a/prefixes.js b/prefixes.js new file mode 100644 index 0000000..7f4531f --- /dev/null +++ b/prefixes.js @@ -0,0 +1,18 @@ +'use strict'; + +module.exports.create = function (opts) { + //var msgPrefix = 'cluster-rpc.' + opts.name; + //var rpcPrefix = msgPrefix + '.rpc'; + //var resultPrefix = msgPrefix + '.result'; + //var initPrefix = msgPrefix + '.init'; + var root = 'com.daplie.cluster-rpc.' + (opts.name ? opts.name + '.' : ''); + return { + root: root + , rpc: root + 'rpc' + , result: root + 'result' + , init: root + 'init' + , connect: root + 'connect' + // TODO the things that are using this should probably accept opts + , debug: opts.debug + }; +}; diff --git a/process/master.js b/process/master.js index 46ebc2e..f30e310 100644 --- a/process/master.js +++ b/process/master.js @@ -1,27 +1,45 @@ 'use strict'; -module.exports.create = function () { +module.exports.create = function (prefixes) { + if (prefixes.debug) { console.log('[cluster-rpc] master created'); } var m = new (require('events').EventEmitter)(); m.addWorker = function (worker) { + if (prefixes.debug) { console.log('[cluster-rpc] [master] adding worker'); } m._workers = []; var w = new (require('events').EventEmitter)(); - worker.on('online', function () { - //console.log('debug mw: worker is up') + function emitConnection() { + if (w.__online) { + return; + } + + w.__online = true; m.emit('connection', w); + } + + worker.on('online', function () { + if (prefixes.debug) { console.log('[cluster-rpc] [master] worker came online, at fork'); } + emitConnection(); }); worker.on('message', function (data) { - //console.log('debug mw: worker sends message', data) + if (prefixes.connect === data.type) { + if (prefixes.debug) { console.log('[cluster-rpc] [master] worker connected, manually'); } + emitConnection(); + return; + } + if (prefixes.debug) { console.log('[cluster-rpc] [master] worker sent message', data); } w.emit('message', data); }); w.send = function (data) { + if (prefixes.debug) { console.log('[cluster-rpc] [master] sending', data); } worker.send(data); }; + // TODO remove workers that exit m._workers.push(w); }; diff --git a/process/worker.js b/process/worker.js index 488d49a..d4548a5 100644 --- a/process/worker.js +++ b/process/worker.js @@ -1,6 +1,7 @@ 'use strict'; -module.exports.create = function (process) { +module.exports.create = function (process, prefixes) { + if (prefixes.debug) { console.log('[cluster-rpc] worker created'); } var w = new (require('events').EventEmitter)(); process.on('message', function (data) { @@ -11,5 +12,10 @@ module.exports.create = function (process) { process.send(data); }; + // if this were a web / unix socket there would be a 'connection' event + // emulating this is useful since the worker may create its cluster rpc + // at any time, (which means it may miss the 'fork' event) + w.send({ type: prefixes.connect }); + return w; }; diff --git a/test.js b/test.js index 475a09d..bdf60b2 100644 --- a/test.js +++ b/test.js @@ -2,19 +2,18 @@ var cluster = require('cluster'); var crpc; -var db = { - get: function (key, cb) { - cb(null, db[key]); - } -, put: function (key, val, cb) { - db[key] = val; - cb(null); - } -}; +function runMaster() { -if (cluster.isMaster) { - + var db = { + get: function (key, cb) { + cb(null, db[key]); + } + , put: function (key, val, cb) { + db[key] = val; + if (cb) { cb(null); } + } + }; crpc = require('./master').create({ instance: db @@ -26,10 +25,9 @@ if (cluster.isMaster) { }); cluster.fork(); - } -else { +function runWorker() { crpc = require('./worker').create({ name: 'foo-level' @@ -38,6 +36,17 @@ else { } +if (cluster.isMaster) { + + runMaster(); + +} +else { + + runWorker(); + +} + crpc.then(function (db) { setTimeout(function () { diff --git a/worker.js b/worker.js index bc57b53..9a4f641 100644 --- a/worker.js +++ b/worker.js @@ -9,11 +9,8 @@ module.exports.create = function (opts) { var crypto = require('crypto'); var token = null; var inst = {}; - var ws = opts.worker = opts.worker || require('./process/worker').create(process); - var msgPrefix = 'cluster-rpc.' + opts.name; - var rpcPrefix = msgPrefix + '.rpc'; - var resultPrefix = msgPrefix + '.result'; - var initPrefix = msgPrefix + '.init'; + var prefixes = require('./prefixes.js').create(opts); + var ws = opts.worker = opts.worker || require('./process/worker').create(process, prefixes); ws.___listeners = []; @@ -30,7 +27,7 @@ module.exports.create = function (opts) { args: args , func: fname , id: id - , type: rpcPrefix + , type: prefixes.rpc , _token: token }); @@ -96,24 +93,21 @@ module.exports.create = function (opts) { return new PromiseA(function (resolve) { ws.on('message', function (cmd) { - //console.log('debug m: mesage', cmd); - if (0 !== (cmd.type||'').indexOf(msgPrefix)) { - //console.log('debug m: ignore msg', cmd); - //console.log(cmd.type, msgPrefix); - //console.log(cmd.type.indexOf(msgPrefix)); + if (opts.debug) { console.log('[cluster-rpc] [worker] message received', cmd); } + + if (0 !== (cmd.type||'').indexOf(prefixes.root)) { return; } if (token && token !== cmd._token) { - //console.log('debug m: ignore msg', cmd); return; } switch (cmd.type) { - case initPrefix: + case prefixes.init: onInit(cmd); resolve(inst); break; - case resultPrefix: + case prefixes.result: onResult(cmd); break; default: