'use strict'; // com.daplie.ipc.init - receive token and methods // com.daplie.ipc.rpc - send function and args and callback id // com.daplie.ipc.result - receive results and callback id module.exports.create = function (opts) { var PromiseA = opts.PromiseA || global.Promise || require('bluebird'); var crypto = require('crypto'); var token = null; var inst = {}; var prefixes = require('./prefixes.js').create(opts); var ws = opts.worker = opts.worker || require('./process/worker').create(process, prefixes); ws.___listeners = []; function rpc(fname, args) { var id; var cb; if ('function' === typeof args[args.length - 1]) { id = crypto.randomBytes(4).toString('hex'); cb = args.pop(); } ws.send({ args: args , func: fname , id: id , type: prefixes.rpc , _token: token }); if (!cb) { return; } function onCallback(cmd) { if (cmd.id !== id) { return; } if ('on' !== fname) { var index = ws.___listeners.indexOf(onCallback); ws.___listeners.splice(index, 1); } cb.apply(cmd.this, cmd.args); } onCallback._createdAt = Date.now(); if ('on' === fname) { onCallback._live = true; } ws.___listeners.push(onCallback); } function onResult(cmd) { var now = Date.now(); ws.___listeners.forEach(function (fn) { try { fn(cmd); } catch(e) { console.error("[ERROR] ws.on('message', fn) (multi-callback)"); console.error(e); // ignore } // 1 minute = 60 * 1000 if (!fn.live && (now - fn._createdAt > 60000)) { var index = ws.___listeners.indexOf(fn); ws.___listeners.splice(index, 1); } }); } function onInit(cmd) { if (token) { console.warn('[cluster-ipc/worker] Warn: re-init'); return; } token = cmd._token; ws._methods = cmd.methods; ws._methods.forEach(function (fname) { inst[fname] = function () { rpc(fname, Array.prototype.slice.call(arguments)); }; }); } return new PromiseA(function (resolve) { ws.on('message', function (cmd) { if (opts.debug) { console.log('[cluster-rpc] [worker] message received', cmd); } if (0 !== (cmd.type||'').indexOf(prefixes.root)) { return; } if (token && token !== cmd._token) { return; } switch (cmd.type) { case prefixes.init: onInit(cmd); resolve(inst); break; case prefixes.result: onResult(cmd); break; default: break; } }); }); };