'use strict'; // com.daplie.ipc.init - receive token and methods // com.daplie.ipc.rpc - send function and args and callback id // com.daplie.ipc.result - receive results and callback id module.exports.create = function (opts) { var PromiseA = opts.PromiseA || global.Promise || require('bluebird'); var crypto = require('crypto'); var token = null; var inst = {}; var ws = opts.worker = opts.worker || require('./process/worker').create(process); var msgPrefix = 'cluster-rpc.' + opts.name; var rpcPrefix = msgPrefix + '.rpc'; var resultPrefix = msgPrefix + '.result'; var initPrefix = msgPrefix + '.init'; ws.___listeners = []; function rpc(fname, args) { var id; var cb; if ('function' === typeof args[args.length - 1]) { id = crypto.randomBytes(4).toString('hex'); cb = args.pop(); } ws.send({ args: args , func: fname , id: id , type: rpcPrefix , _token: token }); if (!cb) { return; } function onCallback(cmd) { if (cmd.id !== id) { return; } if ('on' !== fname) { var index = ws.___listeners.indexOf(onCallback); ws.___listeners.splice(index, 1); } cb.apply(cmd.this, cmd.args); } onCallback._createdAt = Date.now(); if ('on' === fname) { onCallback._live = true; } ws.___listeners.push(onCallback); } function onResult(cmd) { var now = Date.now(); ws.___listeners.forEach(function (fn) { try { fn(cmd); } catch(e) { console.error("[ERROR] ws.on('message', fn) (multi-callback)"); console.error(e); // ignore } // 1 minute = 60 * 1000 if (!fn.live && (now - fn._createdAt > 60000)) { var index = ws.___listeners.indexOf(fn); ws.___listeners.splice(index, 1); } }); } function onInit(cmd) { if (token) { console.warn('[cluster-ipc/worker] Warn: re-init'); return; } token = cmd._token; ws._methods = cmd.methods; ws._methods.forEach(function (fname) { inst[fname] = function () { rpc(fname, Array.prototype.slice.call(arguments)); }; }); } return new PromiseA(function (resolve) { ws.on('message', function (cmd) { //console.log('debug m: mesage', cmd); if (0 !== (cmd.type||'').indexOf(msgPrefix)) { //console.log('debug m: ignore msg', cmd); //console.log(cmd.type, msgPrefix); //console.log(cmd.type.indexOf(msgPrefix)); return; } if (token && token !== cmd._token) { //console.log('debug m: ignore msg', cmd); return; } switch (cmd.type) { case initPrefix: onInit(cmd); resolve(inst); break; case resultPrefix: onResult(cmd); break; default: break; } }); }); };