cluster-rpc.js/worker.js

126 lines
2.8 KiB
JavaScript
Raw Permalink Normal View History

2016-09-08 22:23:40 +00:00
'use strict';
// com.daplie.ipc.init - receive token and methods
// com.daplie.ipc.rpc - send function and args and callback id
// com.daplie.ipc.result - receive results and callback id
module.exports.create = function (opts) {
var PromiseA = opts.PromiseA || global.Promise;
var crypto = require('crypto');
var token = null;
var inst = {};
var ws = opts.worker = opts.worker || require('./process/worker').create(process);
var msgPrefix = 'cluster-rpc.' + opts.name;
var rpcPrefix = msgPrefix + '.rpc';
var resultPrefix = msgPrefix + '.result';
var initPrefix = msgPrefix + '.init';
ws.___listeners = [];
function rpc(fname, args) {
var id;
var cb;
if ('function' === typeof args[args.length - 1]) {
id = crypto.randomBytes(4).toString('hex');
cb = args.pop();
}
ws.send({
args: args
, func: fname
, id: id
, type: rpcPrefix
, _token: token
});
if (!cb) {
return;
}
function onCallback(cmd) {
if (cmd.id !== id) {
return;
}
if ('on' !== fname) {
var index = ws.___listeners.indexOf(onCallback);
ws.___listeners.splice(index, 1);
}
cb.apply(cmd.this, cmd.args);
}
onCallback._createdAt = Date.now();
if ('on' === fname) {
onCallback._live = true;
}
ws.___listeners.push(onCallback);
}
function onResult(cmd) {
var now = Date.now();
ws.___listeners.forEach(function (fn) {
try {
fn(cmd);
} catch(e) {
console.error("[ERROR] ws.on('message', fn) (multi-callback)");
console.error(e);
// ignore
}
// 1 minute = 60 * 1000
if (!fn.live && (now - fn._createdAt > 60000)) {
var index = ws.___listeners.indexOf(fn);
ws.___listeners.splice(index, 1);
}
});
}
function onInit(cmd) {
if (token) {
console.warn('[cluster-ipc/worker] Warn: re-init');
return;
}
token = cmd._token;
ws._methods = cmd.methods;
ws._methods.forEach(function (fname) {
inst[fname] = function () {
rpc(fname, Array.prototype.slice.call(arguments));
};
});
}
return new PromiseA(function (resolve) {
ws.on('message', function (cmd) {
//console.log('debug m: mesage', cmd);
if (0 !== (cmd.type||'').indexOf(msgPrefix)) {
//console.log('debug m: ignore msg', cmd);
//console.log(cmd.type, msgPrefix);
//console.log(cmd.type.indexOf(msgPrefix));
return;
}
if (token && token !== cmd._token) {
//console.log('debug m: ignore msg', cmd);
return;
}
switch (cmd.type) {
case initPrefix:
onInit(cmd);
resolve(inst);
break;
case resultPrefix:
onResult(cmd);
break;
default:
break;
}
});
});
};