Browse Source

v1.0.0

master v1.0.0
AJ ONeal 8 years ago
commit
e9bf8c035e
  1. 109
      README.md
  2. 12
      index.js
  3. 95
      master.js
  4. 23
      package.json
  5. 29
      process/master.js
  6. 15
      process/worker.js
  7. 52
      test.js
  8. 125
      worker.js

109
README.md

@ -0,0 +1,109 @@
cluster-rpc
===========
A simple way to wrap node.js modules for use with cluster even though they
were designed to be used in a single non-cluster instance.
Install
=======
```
npm install --save cluster-rpc
```
Usage
=====
In the **master** process you will create the real instance of whatever module
you're trying to use (for example express-session/session/memory, sqlite3, level)
and then you will supply the names of the methods you wish to export to **worker**
processes.
You must pass each worker via `addWorker()` so that it signals the worker to creates
its own rpc-wrapped instance.
### master
```javascript
// You can pick any module with thunk-style callbacks
// For example:
var db = require('level')('./mydb')
// Wrap the instance
var crpc = require('cluster-rpc/master').create({
instance: db
, methods: [ 'get', 'put' ]
, name: 'foo-level'
});
// You must add each worker
crpc.addWorker(cluster.fork());
crpc.then(function (db) {
// processes are connected and ready
// 'db' is the original instance
});
```
### worker
```javascript
// retrieve the instance
var crpc = require('cluster-rpc/worker').create({
name: 'foo-level'
});
// all listed methods will be rpc'd
crpc.then(function (db) {
// db.put, db.get
});
```
Example
=======
```javascript
'use strict';
var cluster = require('cluster');
var crpc;
if (cluster.isMaster) {
crpc = require('cluster-rpc/master').create({
instance: require('level')('./mydb')
, methods: [ 'get', 'put' ]
, name: 'foo-level'
});
crpc.addWorker(cluster.fork());
crpc.then(function () {
db.put('foo', 'bar');
});
}
else {
crpc = require('cluster-rpc/worker').create({
name: 'foo-level'
});
}
crpc.then(function (db) {
setTimeout(function () {
db.get('foo', function (err, result) {
console.log("db.get('foo')", result);
});
}, 250);
});
```

12
index.js

@ -0,0 +1,12 @@
'use strict';
console.error("");
console.error("One does not simply require('cluster-rpc');");
console.error("");
console.error("Usage:");
console.error("\trequire('cluster-rpc/master').create({ instance: ..., methods: ... name: ... });");
console.error("\trequire('cluster-rpc/worker').create({ name: ... });");
console.error("");
console.error("");
process.exit(1);

95
master.js

@ -0,0 +1,95 @@
'use strict';
function getInstanceMethods(inst) {
var instanceMethods = Object.keys(inst)
.map(function (key) { return 'function' === typeof inst[key] ? key : null; })
.filter(function (key) { return key; })
;
var protoMethods = Object.keys(Object.getPrototypeOf(inst))
.map(function (key) { return 'function' === typeof Object.getPrototypeOf(inst)[key] ? key : null; })
.filter(function (key) { return key; })
;
return instanceMethods.concat(protoMethods);
}
function getMethods(inst, keys) {
if (!keys) {
keys = getInstanceMethods(inst);
}
return keys.filter(function (key) {
if ('function' === typeof inst[key]) {
return true;
}
});
}
module.exports.create = function (opts) {
var PromiseA = opts.PromiseA || global.Promise;
var crypto = require('crypto');
var inst = opts.instance;
var methods = getMethods(opts.instance, opts.methods);
var token = crypto.randomBytes(16).toString('hex');
var msgPrefix = 'cluster-rpc.' + opts.name;
var rpcPrefix = msgPrefix + '.rpc';
var resultPrefix = msgPrefix + '.result';
var initPrefix = msgPrefix + '.init';
opts.master = opts.master || require('./process/master').create();
opts.master.on('connection', function (w) {
//console.log('debug w: worker connection');
w.send({
methods: methods
, _token: token
, type: initPrefix
});
w.on('message', function (cmd) {
if (0 !== (cmd.type||'').indexOf(msgPrefix)) {
//console.log('debug w: got unknown message type');
return;
}
if (token !== cmd._token) {
//console.log('debug w: got bad token');
return;
}
if (!Array.isArray(cmd.args)) {
throw new Error("[Sanity Fail] 'args' should be array of arguments");
}
switch (cmd.type) {
case rpcPrefix:
cmd.args.push(function callback() {
// args is probably err, data in most cases
var args = Array.prototype.slice.call(arguments);
w.send({
args: args
, id: cmd.id
//, this: this
, _token: token
, type: resultPrefix
});
});
inst[cmd.func].apply(inst, cmd.args);
break;
default:
throw new Error("cluster-ipc UNKNOWN TYPE");
//break;
}
});
});
opts._promise = PromiseA.resolve(inst);
opts._promise.addWorker = opts.master.addWorker;
return opts._promise;
};

23
package.json

@ -0,0 +1,23 @@
{
"name": "cluster-rpc",
"version": "1.0.0",
"description": "A simple way to wrap a single-instance module to enable it to work with node cluster.",
"main": "index.js",
"scripts": {
"test": "node test.js"
},
"repository": {
"type": "git",
"url": "git+ssh://git@github.com/coolaj86/cluster-rpc.git"
},
"keywords": [
"cluster",
"rpc"
],
"author": "AJ ONeal <coolaj86@gmail.com> (https://coolaj86.com/)",
"license": "(MIT OR Apache-2.0)",
"bugs": {
"url": "https://github.com/coolaj86/cluster-rpc/issues"
},
"homepage": "https://github.com/coolaj86/cluster-rpc#readme"
}

29
process/master.js

@ -0,0 +1,29 @@
'use strict';
module.exports.create = function () {
var m = new (require('events').EventEmitter)();
m.addWorker = function (worker) {
m._workers = [];
var w = new (require('events').EventEmitter)();
worker.on('online', function () {
//console.log('debug mw: worker is up')
m.emit('connection', w);
});
worker.on('message', function (data) {
//console.log('debug mw: worker sends message', data)
w.emit('message', data);
});
w.send = function (data) {
worker.send(data);
};
m._workers.push(w);
};
return m;
};

15
process/worker.js

@ -0,0 +1,15 @@
'use strict';
module.exports.create = function (process) {
var w = new (require('events').EventEmitter)();
process.on('message', function (data) {
w.emit('message', data);
});
w.send = function (data) {
process.send(data);
};
return w;
};

52
test.js

@ -0,0 +1,52 @@
'use strict';
var cluster = require('cluster');
var crpc;
var db = {
get: function (key, cb) {
cb(null, db[key]);
}
, put: function (key, val, cb) {
db[key] = val;
cb(null);
}
};
if (cluster.isMaster) {
crpc = require('./master').create({
instance: db
, methods: [ 'get', 'put' ]
, name: 'foo-level'
});
crpc.addWorker(cluster.fork());
crpc.then(function () {
db.put('foo', 'bar');
});
}
else {
crpc = require('./worker').create({
name: 'foo-level'
});
}
crpc.then(function (db) {
setTimeout(function () {
db.get('foo', function (err, result) {
console.log(cluster.isMaster && '0' || cluster.worker.id.toString(), "db.get('foo')", result);
if (!cluster.isMaster) {
process.exit(0);
}
});
}, 250);
});

125
worker.js

@ -0,0 +1,125 @@
'use strict';
// com.daplie.ipc.init - receive token and methods
// com.daplie.ipc.rpc - send function and args and callback id
// com.daplie.ipc.result - receive results and callback id
module.exports.create = function (opts) {
var PromiseA = opts.PromiseA || global.Promise;
var crypto = require('crypto');
var token = null;
var inst = {};
var ws = opts.worker = opts.worker || require('./process/worker').create(process);
var msgPrefix = 'cluster-rpc.' + opts.name;
var rpcPrefix = msgPrefix + '.rpc';
var resultPrefix = msgPrefix + '.result';
var initPrefix = msgPrefix + '.init';
ws.___listeners = [];
function rpc(fname, args) {
var id;
var cb;
if ('function' === typeof args[args.length - 1]) {
id = crypto.randomBytes(4).toString('hex');
cb = args.pop();
}
ws.send({
args: args
, func: fname
, id: id
, type: rpcPrefix
, _token: token
});
if (!cb) {
return;
}
function onCallback(cmd) {
if (cmd.id !== id) {
return;
}
if ('on' !== fname) {
var index = ws.___listeners.indexOf(onCallback);
ws.___listeners.splice(index, 1);
}
cb.apply(cmd.this, cmd.args);
}
onCallback._createdAt = Date.now();
if ('on' === fname) {
onCallback._live = true;
}
ws.___listeners.push(onCallback);
}
function onResult(cmd) {
var now = Date.now();
ws.___listeners.forEach(function (fn) {
try {
fn(cmd);
} catch(e) {
console.error("[ERROR] ws.on('message', fn) (multi-callback)");
console.error(e);
// ignore
}
// 1 minute = 60 * 1000
if (!fn.live && (now - fn._createdAt > 60000)) {
var index = ws.___listeners.indexOf(fn);
ws.___listeners.splice(index, 1);
}
});
}
function onInit(cmd) {
if (token) {
console.warn('[cluster-ipc/worker] Warn: re-init');
return;
}
token = cmd._token;
ws._methods = cmd.methods;
ws._methods.forEach(function (fname) {
inst[fname] = function () {
rpc(fname, Array.prototype.slice.call(arguments));
};
});
}
return new PromiseA(function (resolve) {
ws.on('message', function (cmd) {
//console.log('debug m: mesage', cmd);
if (0 !== (cmd.type||'').indexOf(msgPrefix)) {
//console.log('debug m: ignore msg', cmd);
//console.log(cmd.type, msgPrefix);
//console.log(cmd.type.indexOf(msgPrefix));
return;
}
if (token && token !== cmd._token) {
//console.log('debug m: ignore msg', cmd);
return;
}
switch (cmd.type) {
case initPrefix:
onInit(cmd);
resolve(inst);
break;
case resultPrefix:
onResult(cmd);
break;
default:
break;
}
});
});
};
Loading…
Cancel
Save