200 lines
4.8 KiB
JavaScript
200 lines
4.8 KiB
JavaScript
'use strict';
|
|
|
|
/*global Promise*/
|
|
|
|
function startServer(opts) {
|
|
return require('./server').create(opts).then(function (server) {
|
|
// this process doesn't need to connect to itself
|
|
// through a socket
|
|
return server.masterClient;
|
|
});
|
|
}
|
|
|
|
function getConnection(opts) {
|
|
return new Promise(function (resolve) {
|
|
//setTimeout(function () {
|
|
var WebSocket = require('ws');
|
|
var ws = new WebSocket('ws+unix:' + opts.sock);
|
|
|
|
ws.on('error', function (err) {
|
|
console.error('[ERROR] ws connection failed, retrying');
|
|
console.error(err);
|
|
|
|
function retry() {
|
|
setTimeout(function () {
|
|
getConnection(opts).then(resolve, retry);
|
|
}, 100 + (Math.random() * 250));
|
|
}
|
|
|
|
if (!opts.connect && ('ENOENT' === err.code || 'ECONNREFUSED' === err.code)) {
|
|
console.log('[NO SERVER] attempting to create a server #######################');
|
|
return startServer(opts).then(function (client) {
|
|
// ws.masterClient = client;
|
|
resolve({ masterClient: client });
|
|
}, function () {
|
|
retry();
|
|
});
|
|
}
|
|
|
|
retry();
|
|
});
|
|
|
|
/*
|
|
ws.on('open', function () {
|
|
resolve(ws);
|
|
});
|
|
*/
|
|
ws.___listeners = [];
|
|
ws.on('message', function (data) {
|
|
ws.___listeners.forEach(function (fn) {
|
|
try {
|
|
fn(data);
|
|
} catch(e) {
|
|
console.error("[ERROR] ws.on('message', fn) (multi-callback)");
|
|
console.error(e);
|
|
// ignore
|
|
}
|
|
});
|
|
});
|
|
|
|
function onInitMessage(str) {
|
|
// TODO there's no way to remove a listener... what to do?
|
|
var data;
|
|
|
|
try {
|
|
data = JSON.parse(str);
|
|
} catch(e) {
|
|
console.error('[ERROR]');
|
|
console.error(e);
|
|
}
|
|
|
|
if ('methods' !== data.type) {
|
|
return;
|
|
}
|
|
|
|
var index = ws.___listeners.indexOf(onInitMessage);
|
|
ws.___listeners.splice(index, 1);
|
|
ws._methods = data.methods;
|
|
|
|
resolve(ws);
|
|
}
|
|
|
|
ws.___listeners.push(onInitMessage);
|
|
//}, 100 + (Math.random() * 250));
|
|
});
|
|
}
|
|
|
|
function create(opts) {
|
|
if (!opts.sock) {
|
|
opts.sock = '/tmp/memstore' + '.sock';
|
|
}
|
|
|
|
var promise;
|
|
var numcpus = require('os').cpus().length;
|
|
if (opts.standalone || (1 === numcpus && !opts.serve && !opts.connect)) {
|
|
return require('./memstore').create(opts);
|
|
}
|
|
|
|
function retryServe() {
|
|
return startServer(opts).then(function (client) {
|
|
// ws.masterClient = client;
|
|
return { masterClient: client };
|
|
}, function (err) {
|
|
console.error('[ERROR] retryServe()');
|
|
console.error(err);
|
|
retryServe();
|
|
});
|
|
}
|
|
|
|
if (opts.serve) {
|
|
promise = retryServe();
|
|
} else {
|
|
promise = getConnection(opts);
|
|
}
|
|
|
|
// TODO maybe use HTTP POST instead?
|
|
return promise.then(function (ws) {
|
|
if (ws.masterClient) {
|
|
return ws.masterClient;
|
|
}
|
|
|
|
var db = {};
|
|
|
|
function rpc(fname, args) {
|
|
var id;
|
|
var cb;
|
|
|
|
if ('function' === typeof args[args.length - 1]) {
|
|
// TODO if off, search for cb and derive id from previous onMessage
|
|
id = Math.random();
|
|
cb = args.pop();
|
|
}
|
|
|
|
ws.send(JSON.stringify({
|
|
type: 'rpc'
|
|
, func: fname
|
|
, args: args
|
|
, hasCallback: !!cb
|
|
, filename: opts.filename
|
|
, id: id
|
|
}));
|
|
|
|
if (!cb) {
|
|
return;
|
|
}
|
|
|
|
function onMessage(data) {
|
|
var cmd;
|
|
|
|
try {
|
|
cmd = JSON.parse(data.toString('utf8'));
|
|
} catch(e) {
|
|
console.error('[ERROR] in client, from sql server parse json');
|
|
console.error(e);
|
|
console.error(data);
|
|
console.error();
|
|
|
|
//ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } }));
|
|
return;
|
|
}
|
|
|
|
if (cmd.id !== id) {
|
|
return;
|
|
}
|
|
|
|
/*
|
|
// TODO not sure how to handle 'emit' or 'off'...
|
|
// it'll just be broken for now
|
|
if ('off' === fname || 'remove.*Listener'.test(fname)) {
|
|
var index = ws.___listeners.indexOf(onMessage);
|
|
ws.___listeners.splice(index, 1);
|
|
}
|
|
*/
|
|
|
|
if ('on' !== fname && ! /add.*Listener/.test(fname)) {
|
|
var index = ws.___listeners.indexOf(onMessage);
|
|
ws.___listeners.splice(index, 1);
|
|
}
|
|
|
|
cb.apply(cmd.this, cmd.args);
|
|
}
|
|
|
|
// TODO search index by cb for 'off'
|
|
// and pass it along to the rpc with the original id
|
|
onMessage.__cb = cb;
|
|
onMessage.__id = id;
|
|
ws.___listeners.push(onMessage);
|
|
}
|
|
|
|
ws._methods.forEach(function (key) {
|
|
db[key] = function () {
|
|
rpc(key, Array.prototype.slice.call(arguments));
|
|
};
|
|
});
|
|
|
|
return db;
|
|
});
|
|
}
|
|
|
|
module.exports.create = create;
|