A simple way to wrap a single-instance module to enable it to work with node cluster.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

119 lines
2.6 KiB

'use strict';
// com.daplie.ipc.init - receive token and methods
// com.daplie.ipc.rpc - send function and args and callback id
// com.daplie.ipc.result - receive results and callback id
module.exports.create = function (opts) {
var PromiseA = opts.PromiseA || global.Promise || require('bluebird');
var crypto = require('crypto');
var token = null;
var inst = {};
var prefixes = require('./prefixes.js').create(opts);
var ws = opts.worker = opts.worker || require('./process/worker').create(process, prefixes);
ws.___listeners = [];
function rpc(fname, args) {
var id;
var cb;
if ('function' === typeof args[args.length - 1]) {
id = crypto.randomBytes(4).toString('hex');
cb = args.pop();
}
ws.send({
args: args
, func: fname
, id: id
, type: prefixes.rpc
, _token: token
});
if (!cb) {
return;
}
function onCallback(cmd) {
if (cmd.id !== id) {
return;
}
if ('on' !== fname) {
var index = ws.___listeners.indexOf(onCallback);
ws.___listeners.splice(index, 1);
}
cb.apply(cmd.this, cmd.args);
}
onCallback._createdAt = Date.now();
if ('on' === fname) {
onCallback._live = true;
}
ws.___listeners.push(onCallback);
}
function onResult(cmd) {
var now = Date.now();
ws.___listeners.forEach(function (fn) {
try {
fn(cmd);
} catch(e) {
console.error("[ERROR] ws.on('message', fn) (multi-callback)");
console.error(e);
// ignore
}
// 1 minute = 60 * 1000
if (!fn.live && (now - fn._createdAt > 60000)) {
var index = ws.___listeners.indexOf(fn);
ws.___listeners.splice(index, 1);
}
});
}
function onInit(cmd) {
if (token) {
console.warn('[cluster-ipc/worker] Warn: re-init');
return;
}
token = cmd._token;
ws._methods = cmd.methods;
ws._methods.forEach(function (fname) {
inst[fname] = function () {
rpc(fname, Array.prototype.slice.call(arguments));
};
});
}
return new PromiseA(function (resolve) {
ws.on('message', function (cmd) {
if (opts.debug) { console.log('[cluster-rpc] [worker] message received', cmd); }
if (0 !== (cmd.type||'').indexOf(prefixes.root)) {
return;
}
if (token && token !== cmd._token) {
return;
}
switch (cmd.type) {
case prefixes.init:
onInit(cmd);
resolve(inst);
break;
case prefixes.result:
onResult(cmd);
break;
default:
break;
}
});
});
};