From e9bf8c035ed08d0f7b9e6fccab2d161132000f93 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Thu, 8 Sep 2016 16:23:40 -0600 Subject: [PATCH] v1.0.0 --- README.md | 109 ++++++++++++++++++++++++++++++++++++++++ index.js | 12 +++++ master.js | 95 +++++++++++++++++++++++++++++++++++ package.json | 23 +++++++++ process/master.js | 29 +++++++++++ process/worker.js | 15 ++++++ test.js | 52 +++++++++++++++++++ worker.js | 125 ++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 460 insertions(+) create mode 100644 README.md create mode 100644 index.js create mode 100644 master.js create mode 100644 package.json create mode 100644 process/master.js create mode 100644 process/worker.js create mode 100644 test.js create mode 100644 worker.js diff --git a/README.md b/README.md new file mode 100644 index 0000000..d5dcbb4 --- /dev/null +++ b/README.md @@ -0,0 +1,109 @@ +cluster-rpc +=========== + +A simple way to wrap node.js modules for use with cluster even though they +were designed to be used in a single non-cluster instance. + +Install +======= + +``` +npm install --save cluster-rpc +``` + +Usage +===== + +In the **master** process you will create the real instance of whatever module +you're trying to use (for example express-session/session/memory, sqlite3, level) +and then you will supply the names of the methods you wish to export to **worker** +processes. + +You must pass each worker via `addWorker()` so that it signals the worker to creates +its own rpc-wrapped instance. + +### master + +```javascript +// You can pick any module with thunk-style callbacks +// For example: +var db = require('level')('./mydb') + + +// Wrap the instance +var crpc = require('cluster-rpc/master').create({ + instance: db +, methods: [ 'get', 'put' ] +, name: 'foo-level' +}); + + +// You must add each worker +crpc.addWorker(cluster.fork()); + + +crpc.then(function (db) { + // processes are connected and ready + // 'db' is the original instance +}); +``` + +### worker + +```javascript +// retrieve the instance +var crpc = require('cluster-rpc/worker').create({ + name: 'foo-level' +}); + + +// all listed methods will be rpc'd +crpc.then(function (db) { + // db.put, db.get +}); +``` + +Example +======= + +```javascript +'use strict'; + +var cluster = require('cluster'); +var crpc; + + +if (cluster.isMaster) { + + + crpc = require('cluster-rpc/master').create({ + instance: require('level')('./mydb') + , methods: [ 'get', 'put' ] + , name: 'foo-level' + }); + crpc.addWorker(cluster.fork()); + crpc.then(function () { + db.put('foo', 'bar'); + }); + + +} +else { + + + crpc = require('cluster-rpc/worker').create({ + name: 'foo-level' + }); + + +} + + +crpc.then(function (db) { + setTimeout(function () { + db.get('foo', function (err, result) { + console.log("db.get('foo')", result); + }); + }, 250); +}); +``` diff --git a/index.js b/index.js new file mode 100644 index 0000000..6fcac75 --- /dev/null +++ b/index.js @@ -0,0 +1,12 @@ +'use strict'; + +console.error(""); +console.error("One does not simply require('cluster-rpc');"); +console.error(""); +console.error("Usage:"); +console.error("\trequire('cluster-rpc/master').create({ instance: ..., methods: ... name: ... });"); +console.error("\trequire('cluster-rpc/worker').create({ name: ... });"); +console.error(""); +console.error(""); + +process.exit(1); diff --git a/master.js b/master.js new file mode 100644 index 0000000..e8bfc8e --- /dev/null +++ b/master.js @@ -0,0 +1,95 @@ +'use strict'; + +function getInstanceMethods(inst) { + var instanceMethods = Object.keys(inst) + .map(function (key) { return 'function' === typeof inst[key] ? key : null; }) + .filter(function (key) { return key; }) + ; + + var protoMethods = Object.keys(Object.getPrototypeOf(inst)) + .map(function (key) { return 'function' === typeof Object.getPrototypeOf(inst)[key] ? key : null; }) + .filter(function (key) { return key; }) + ; + + return instanceMethods.concat(protoMethods); +} + +function getMethods(inst, keys) { + if (!keys) { + keys = getInstanceMethods(inst); + } + + return keys.filter(function (key) { + if ('function' === typeof inst[key]) { + return true; + } + }); +} + +module.exports.create = function (opts) { + var PromiseA = opts.PromiseA || global.Promise; + var crypto = require('crypto'); + var inst = opts.instance; + var methods = getMethods(opts.instance, opts.methods); + var token = crypto.randomBytes(16).toString('hex'); + var msgPrefix = 'cluster-rpc.' + opts.name; + var rpcPrefix = msgPrefix + '.rpc'; + var resultPrefix = msgPrefix + '.result'; + var initPrefix = msgPrefix + '.init'; + + opts.master = opts.master || require('./process/master').create(); + + opts.master.on('connection', function (w) { + //console.log('debug w: worker connection'); + w.send({ + methods: methods + , _token: token + , type: initPrefix + }); + + w.on('message', function (cmd) { + if (0 !== (cmd.type||'').indexOf(msgPrefix)) { + //console.log('debug w: got unknown message type'); + return; + } + + if (token !== cmd._token) { + //console.log('debug w: got bad token'); + return; + } + + if (!Array.isArray(cmd.args)) { + throw new Error("[Sanity Fail] 'args' should be array of arguments"); + } + + switch (cmd.type) { + case rpcPrefix: + cmd.args.push(function callback() { + // args is probably err, data in most cases + var args = Array.prototype.slice.call(arguments); + + w.send({ + args: args + , id: cmd.id + //, this: this + , _token: token + , type: resultPrefix + }); + }); + + inst[cmd.func].apply(inst, cmd.args); + break; + + default: + throw new Error("cluster-ipc UNKNOWN TYPE"); + //break; + } + + }); + }); + + opts._promise = PromiseA.resolve(inst); + opts._promise.addWorker = opts.master.addWorker; + + return opts._promise; +}; diff --git a/package.json b/package.json new file mode 100644 index 0000000..8d14b7a --- /dev/null +++ b/package.json @@ -0,0 +1,23 @@ +{ + "name": "cluster-rpc", + "version": "1.0.0", + "description": "A simple way to wrap a single-instance module to enable it to work with node cluster.", + "main": "index.js", + "scripts": { + "test": "node test.js" + }, + "repository": { + "type": "git", + "url": "git+ssh://git@github.com/coolaj86/cluster-rpc.git" + }, + "keywords": [ + "cluster", + "rpc" + ], + "author": "AJ ONeal (https://coolaj86.com/)", + "license": "(MIT OR Apache-2.0)", + "bugs": { + "url": "https://github.com/coolaj86/cluster-rpc/issues" + }, + "homepage": "https://github.com/coolaj86/cluster-rpc#readme" +} diff --git a/process/master.js b/process/master.js new file mode 100644 index 0000000..46ebc2e --- /dev/null +++ b/process/master.js @@ -0,0 +1,29 @@ +'use strict'; + +module.exports.create = function () { + var m = new (require('events').EventEmitter)(); + + m.addWorker = function (worker) { + m._workers = []; + + var w = new (require('events').EventEmitter)(); + + worker.on('online', function () { + //console.log('debug mw: worker is up') + m.emit('connection', w); + }); + + worker.on('message', function (data) { + //console.log('debug mw: worker sends message', data) + w.emit('message', data); + }); + + w.send = function (data) { + worker.send(data); + }; + + m._workers.push(w); + }; + + return m; +}; diff --git a/process/worker.js b/process/worker.js new file mode 100644 index 0000000..488d49a --- /dev/null +++ b/process/worker.js @@ -0,0 +1,15 @@ +'use strict'; + +module.exports.create = function (process) { + var w = new (require('events').EventEmitter)(); + + process.on('message', function (data) { + w.emit('message', data); + }); + + w.send = function (data) { + process.send(data); + }; + + return w; +}; diff --git a/test.js b/test.js new file mode 100644 index 0000000..a7f5bd2 --- /dev/null +++ b/test.js @@ -0,0 +1,52 @@ +'use strict'; + +var cluster = require('cluster'); +var crpc; +var db = { + get: function (key, cb) { + cb(null, db[key]); + } +, put: function (key, val, cb) { + db[key] = val; + cb(null); + } +}; + + +if (cluster.isMaster) { + + + crpc = require('./master').create({ + instance: db + , methods: [ 'get', 'put' ] + , name: 'foo-level' + }); + crpc.addWorker(cluster.fork()); + crpc.then(function () { + db.put('foo', 'bar'); + }); + + +} +else { + + + crpc = require('./worker').create({ + name: 'foo-level' + }); + + +} + + +crpc.then(function (db) { + setTimeout(function () { + db.get('foo', function (err, result) { + console.log(cluster.isMaster && '0' || cluster.worker.id.toString(), "db.get('foo')", result); + + if (!cluster.isMaster) { + process.exit(0); + } + }); + }, 250); +}); diff --git a/worker.js b/worker.js new file mode 100644 index 0000000..b04fa88 --- /dev/null +++ b/worker.js @@ -0,0 +1,125 @@ +'use strict'; + +// com.daplie.ipc.init - receive token and methods +// com.daplie.ipc.rpc - send function and args and callback id +// com.daplie.ipc.result - receive results and callback id + +module.exports.create = function (opts) { + var PromiseA = opts.PromiseA || global.Promise; + var crypto = require('crypto'); + var token = null; + var inst = {}; + var ws = opts.worker = opts.worker || require('./process/worker').create(process); + var msgPrefix = 'cluster-rpc.' + opts.name; + var rpcPrefix = msgPrefix + '.rpc'; + var resultPrefix = msgPrefix + '.result'; + var initPrefix = msgPrefix + '.init'; + + ws.___listeners = []; + + function rpc(fname, args) { + var id; + var cb; + + if ('function' === typeof args[args.length - 1]) { + id = crypto.randomBytes(4).toString('hex'); + cb = args.pop(); + } + + ws.send({ + args: args + , func: fname + , id: id + , type: rpcPrefix + , _token: token + }); + + if (!cb) { + return; + } + + function onCallback(cmd) { + if (cmd.id !== id) { + return; + } + + if ('on' !== fname) { + var index = ws.___listeners.indexOf(onCallback); + ws.___listeners.splice(index, 1); + } + + cb.apply(cmd.this, cmd.args); + } + onCallback._createdAt = Date.now(); + if ('on' === fname) { + onCallback._live = true; + } + + ws.___listeners.push(onCallback); + } + + function onResult(cmd) { + var now = Date.now(); + + ws.___listeners.forEach(function (fn) { + try { + fn(cmd); + } catch(e) { + console.error("[ERROR] ws.on('message', fn) (multi-callback)"); + console.error(e); + // ignore + } + + // 1 minute = 60 * 1000 + if (!fn.live && (now - fn._createdAt > 60000)) { + var index = ws.___listeners.indexOf(fn); + ws.___listeners.splice(index, 1); + } + }); + } + + function onInit(cmd) { + if (token) { + console.warn('[cluster-ipc/worker] Warn: re-init'); + return; + } + + token = cmd._token; + ws._methods = cmd.methods; + + ws._methods.forEach(function (fname) { + inst[fname] = function () { + rpc(fname, Array.prototype.slice.call(arguments)); + }; + }); + } + + return new PromiseA(function (resolve) { + ws.on('message', function (cmd) { + //console.log('debug m: mesage', cmd); + if (0 !== (cmd.type||'').indexOf(msgPrefix)) { + //console.log('debug m: ignore msg', cmd); + //console.log(cmd.type, msgPrefix); + //console.log(cmd.type.indexOf(msgPrefix)); + return; + } + if (token && token !== cmd._token) { + //console.log('debug m: ignore msg', cmd); + return; + } + + switch (cmd.type) { + case initPrefix: + onInit(cmd); + resolve(inst); + break; + case resultPrefix: + onResult(cmd); + break; + default: + break; + } + + }); + }); +};