Compare commits
10 Commits
Author | SHA1 | Date |
---|---|---|
|
8a88dbe229 | |
|
01697a19ea | |
|
397b751b42 | |
|
73edc8beeb | |
|
4f53821a2c | |
|
1a84c1db10 | |
|
cf5e734618 | |
|
efc10cab3b | |
|
e6155d9f33 | |
|
3b0b2f7097 |
536
client.js
536
client.js
|
@ -1,59 +1,79 @@
|
|||
'use strict';
|
||||
|
||||
/*global Promise*/
|
||||
// I'm not a fan of singletons (as they are EVIL).
|
||||
// However, this module is, by intent, meant to
|
||||
// represent a worker process in a cluster.
|
||||
// Thus, it is appropriately a singleton.
|
||||
var processWebSocket;
|
||||
var promiseWebSocket;
|
||||
|
||||
var PromiseA = Promise;
|
||||
try {
|
||||
PromiseA = require('bluebird').Promise;
|
||||
} catch(e) {
|
||||
console.warn("For better Promise support please use bluebird");
|
||||
}
|
||||
// TODO iterate over the prototype
|
||||
// translate request / response
|
||||
var sqlite3real = require('sqlite3');
|
||||
|
||||
/*
|
||||
function createConnection(opts) {
|
||||
var server = ;
|
||||
|
||||
return server.create(opts).then(function () {
|
||||
// created and listening
|
||||
});
|
||||
}
|
||||
*/
|
||||
|
||||
function startServer(opts) {
|
||||
return require('./server').create(opts).then(function (server) {
|
||||
function startServer(opts, verbs, myServer) {
|
||||
if (myServer) {
|
||||
return verbs.Promise.resolve(myServer);
|
||||
}
|
||||
return require('./server').create(opts, verbs).then(function (server) {
|
||||
// this process doesn't need to connect to itself
|
||||
// through a socket
|
||||
return server.masterClient;
|
||||
});
|
||||
}
|
||||
|
||||
function getConnection(opts) {
|
||||
return new PromiseA(function (resolve) {
|
||||
// connection is scoped per-process, nothing more
|
||||
function getConnection(opts, verbs, mySocket, retry) {
|
||||
function incr(ws) {
|
||||
if (!ws.__count) {
|
||||
ws.__count = 0;
|
||||
}
|
||||
ws.__count += 1;
|
||||
return ws;
|
||||
}
|
||||
if (!retry && (mySocket || processWebSocket)) {
|
||||
promiseWebSocket = verbs.Promise.resolve(mySocket || processWebSocket);
|
||||
return promiseWebSocket.then(incr);
|
||||
}
|
||||
|
||||
if (!retry && promiseWebSocket) {
|
||||
return promiseWebSocket.then(incr);
|
||||
}
|
||||
|
||||
promiseWebSocket = new verbs.Promise(function (resolve) {
|
||||
//setTimeout(function () {
|
||||
var WebSocket = require('ws');
|
||||
var ws = new WebSocket('ws+unix:' + opts.sock);
|
||||
// TODO how to include path and such?
|
||||
// http://unix:/absolute/path/to/unix.socket:/request/path
|
||||
// https://github.com/websockets/ws/issues/236
|
||||
var ws;
|
||||
var pathname = '/' + (require('cluster').worker||{}).id + '/' + opts.ipcKey;
|
||||
var queryparams = {
|
||||
ipcKey: opts.ipcKey
|
||||
, ipc_key: opts.ipcKey
|
||||
, worker_id: (require('cluster').worker||{}).id
|
||||
};
|
||||
var search = require('querystring').stringify(queryparams);
|
||||
var wspath = 'ws+unix://' + opts.sock + ':' + pathname + '?' + search;
|
||||
// https://github.com/websockets/ws/blob/master/doc/ws.md#unix-domain-sockets
|
||||
// ws+unix:///absolule/path/to/uds_socket:/pathname?search_params
|
||||
ws = new WebSocket(wspath);
|
||||
|
||||
ws.on('error', function (err) {
|
||||
console.error('[ERROR] ws connection failed, retrying');
|
||||
console.error(err);
|
||||
console.error(err.stack || err);
|
||||
|
||||
function retry() {
|
||||
// TODO eventually throw up
|
||||
setTimeout(function () {
|
||||
getConnection(opts).then(resolve, retry);
|
||||
getConnection(opts, verbs, mySocket, true).then(resolve, retry);
|
||||
}, 100 + (Math.random() * 250));
|
||||
}
|
||||
|
||||
if (!opts.connect && ('ENOENT' === err.code || 'ECONNREFUSED' === err.code)) {
|
||||
console.log('[NO SERVER] attempting to create a server #######################');
|
||||
return startServer(opts).then(function (client) {
|
||||
return startServer(opts, verbs).then(function (client) {
|
||||
// ws.masterClient = client;
|
||||
resolve({ masterClient: client });
|
||||
}, function (err) {
|
||||
console.error('[ERROR] failed to connect to sqlite3-cluster service. retrying...');
|
||||
console.error(err);
|
||||
console.error(err.stack || err);
|
||||
retry();
|
||||
});
|
||||
}
|
||||
|
@ -62,66 +82,276 @@ function getConnection(opts) {
|
|||
});
|
||||
|
||||
ws.on('open', function () {
|
||||
|
||||
resolve(ws);
|
||||
});
|
||||
//}, 100 + (Math.random() * 250));
|
||||
});
|
||||
}
|
||||
|
||||
function create(opts) {
|
||||
if (!opts.sock) {
|
||||
opts.sock = opts.filename + '.sock';
|
||||
}
|
||||
|
||||
var promise;
|
||||
var numcpus = require('os').cpus().length;
|
||||
if (opts.standalone || (1 === numcpus && !opts.serve && !opts.connect)) {
|
||||
return require('./wrapper').create(opts);
|
||||
}
|
||||
|
||||
function retryServe() {
|
||||
return startServer(opts).then(function (client) {
|
||||
// ws.masterClient = client;
|
||||
return { masterClient: client };
|
||||
}, function (err) {
|
||||
console.error('[ERROR] retryServe()');
|
||||
console.error(err);
|
||||
retryServe();
|
||||
});
|
||||
}
|
||||
|
||||
if (opts.serve) {
|
||||
promise = retryServe();
|
||||
} else {
|
||||
promise = getConnection(opts);
|
||||
}
|
||||
/*
|
||||
if (opts.connect) {
|
||||
}
|
||||
*/
|
||||
|
||||
// TODO maybe use HTTP POST instead?
|
||||
return promise.then(function (ws) {
|
||||
if (ws.masterClient) {
|
||||
return ws.masterClient;
|
||||
}).then(function (ws) {
|
||||
if (!processWebSocket) {
|
||||
processWebSocket = ws;
|
||||
}
|
||||
|
||||
var db = {};
|
||||
var proto = sqlite3real.Database.prototype;
|
||||
var messages = [];
|
||||
return ws;
|
||||
});
|
||||
|
||||
function init(opts) {
|
||||
return new PromiseA(function (resolve, reject) {
|
||||
var id = Math.random();
|
||||
return promiseWebSocket.then(incr);
|
||||
}
|
||||
|
||||
module.exports.createClientFactory = function (conf, verbs, _socket) {
|
||||
// TODO distinguish between defaults and mandates
|
||||
|
||||
if (!conf.ipcKey) {
|
||||
throw new Error("[E_NO_IPCKEY] Your config must specify an ipcKey.");
|
||||
}
|
||||
|
||||
return {
|
||||
create: function (opts, _s) {
|
||||
var copy = {};
|
||||
|
||||
if (_socket && _s) {
|
||||
throw new Error("[E_USR_SOCKET] Your parent has decided that you may not choose your own SOCKET. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
if (opts.key && conf.key) {
|
||||
throw new Error("[E_USR_KEY] Your parent has decided that you may not choose your own KEY. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
if (opts.algo && conf.algo) {
|
||||
throw new Error("[E_USR_ALGO] Your parent has decided that you may not choose your own ALGO. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
if (opts.bits && conf.bits) {
|
||||
throw new Error("[E_USR_BITS] Your parent has decided that you may not choose your own BITS. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
if (opts.dirname && conf.dirname) {
|
||||
throw new Error("[E_USR_TENANT] Your parent has decided that you may not choose your own TENANT. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
if (opts.tenant && conf.tenant) {
|
||||
throw new Error("[E_USR_TENANT] Your parent has decided that you may not choose your own TENANT. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
if (opts.prefix && conf.prefix) {
|
||||
throw new Error("[E_USR_PREFIX] Your parent has decided that you may not choose your own PREFIX. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
if (opts.suffix && conf.suffix) {
|
||||
throw new Error("[E_USR_SUFFIX] Your parent has decided that you may not choose your own SUFFIX. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
if (opts.ext && conf.ext) {
|
||||
throw new Error("[E_USR_EXT] Your parent has decided that you may not choose your own EXT. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
if (opts.serve) {
|
||||
throw new Error("[E_USR_SERVE] Your parent have forbidden you to SERVE. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
if (opts.sock && conf.sock) {
|
||||
throw new Error("[E_USR_SERVE] Your parent have forbidden you to choose your own SOCK. Don't get mad at me, take it up with them.");
|
||||
}
|
||||
|
||||
Object.keys(conf).forEach(function (key) {
|
||||
copy[key] = conf[key];
|
||||
});
|
||||
Object.keys(opts).forEach(function (key) {
|
||||
copy[key] = opts[key];
|
||||
});
|
||||
|
||||
if (!verbs) {
|
||||
verbs = {
|
||||
Promise: null
|
||||
};
|
||||
}
|
||||
if (!verbs.Promise) {
|
||||
verbs.Promise = require('bluebird');
|
||||
}
|
||||
|
||||
copy.connect = (require('cluster').worker && true);
|
||||
copy.sock = conf.sock;
|
||||
copy.tenant = conf.tenant;
|
||||
copy.ipcKey = conf.ipcKey;
|
||||
return module.exports.create(copy, verbs, _socket || _s || processWebSocket);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
module.exports.create = function (opts, verbs, mySocket) {
|
||||
if (!verbs) {
|
||||
verbs = {};
|
||||
}
|
||||
|
||||
var PromiseA = verbs && verbs.Promise || require('bluebird');
|
||||
verbs.Promise = PromiseA;
|
||||
// TODO iterate over the prototype
|
||||
// translate request / response
|
||||
var sqlite3real = require('sqlite3');
|
||||
|
||||
if (!mySocket) {
|
||||
mySocket = processWebSocket;
|
||||
}
|
||||
|
||||
function create(opts) {
|
||||
function retryServe() {
|
||||
return startServer(opts, verbs).then(function (client) {
|
||||
// ws.masterClient = client;
|
||||
return { masterClient: client };
|
||||
}, function (err) {
|
||||
console.error('[ERROR] retryServe()');
|
||||
console.error(err.stack || err);
|
||||
retryServe();
|
||||
});
|
||||
}
|
||||
|
||||
if (opts.serve) {
|
||||
return retryServe(opts).then(function (servers) {
|
||||
return servers.masterClient;
|
||||
});
|
||||
}
|
||||
|
||||
if (!opts.tenant) {
|
||||
opts.tenant = "";
|
||||
}
|
||||
if (!opts.subtenant) {
|
||||
opts.subtenant = "";
|
||||
}
|
||||
if (!opts.prefix) {
|
||||
opts.prefix = '';
|
||||
}
|
||||
if (!opts.suffix) {
|
||||
opts.suffix = '';
|
||||
}
|
||||
if (!opts.ext) {
|
||||
opts.ext = '.sqlite3'; // also .sqlcipher
|
||||
}
|
||||
if (!opts.dbname) {
|
||||
throw new Error("Please specify opts.dbname as the name of the database");
|
||||
}
|
||||
|
||||
|
||||
var promise;
|
||||
var numcpus = require('os').cpus().length;
|
||||
|
||||
// if we're only on one core, use the lib directly, no socket
|
||||
if (opts.standalone || (1 === numcpus && !opts.serve && !opts.connect)) {
|
||||
return require('./wrapper').create(opts, verbs);
|
||||
}
|
||||
|
||||
if (!opts.sock) {
|
||||
throw new Error("Please specify opts.sock as the path to the master socket. '/tmp/sqlite3-cluster' would do nicely.");
|
||||
}
|
||||
|
||||
promise = getConnection(opts, verbs, mySocket, true).then(function (socket) {
|
||||
mySocket = socket;
|
||||
return mySocket;
|
||||
});
|
||||
|
||||
// TODO maybe use HTTP POST instead?
|
||||
return promise.then(function (ws) {
|
||||
var db = {};
|
||||
var proto = sqlite3real.Database.prototype;
|
||||
var messages = [];
|
||||
var idprefix = require('crypto').randomBytes(12).toString('base64');
|
||||
var idcount = 0;
|
||||
|
||||
function genId() {
|
||||
idcount += 1;
|
||||
return idprefix + idcount;
|
||||
}
|
||||
|
||||
function init(iopts) {
|
||||
//console.log('CLIENT INIT');
|
||||
if (db._initPromise) {
|
||||
return db._initPromise;
|
||||
}
|
||||
|
||||
db._initPromise = new PromiseA(function (resolve, reject) {
|
||||
// TODO needs to reject by a timeout
|
||||
var id = genId();
|
||||
ws.send(JSON.stringify({
|
||||
type: 'init'
|
||||
, args: [{
|
||||
// encryption
|
||||
bits: opts.bits || iopts.bits
|
||||
, algorithm: opts.algo || opts.algorithm || iopts.algorithm || iopts.algo
|
||||
, algo: opts.algo || opts.algorithm || iopts.algorithm || iopts.algo
|
||||
, encmode: opts.mode || iopts.mode
|
||||
}]
|
||||
, func: 'init'
|
||||
// db
|
||||
, dirname: opts.dirname
|
||||
, prefix: opts.prefix
|
||||
, subtenant: opts.subtenant
|
||||
, tenant: opts.tenant
|
||||
, dbname: opts.dbname
|
||||
, suffix: opts.suffix
|
||||
, ext: opts.ext
|
||||
// session
|
||||
, id: id
|
||||
}));
|
||||
|
||||
function onMessage(data) {
|
||||
var cmd;
|
||||
|
||||
try {
|
||||
cmd = JSON.parse(data.toString('utf8'));
|
||||
} catch(e) {
|
||||
console.error('[ERROR] in client, from sql server parse json');
|
||||
console.error(e.stack || e);
|
||||
console.error(data);
|
||||
console.error();
|
||||
|
||||
// ignore this message, it came out of order
|
||||
return reject(new Error("suxors to rejexors parse"));
|
||||
}
|
||||
|
||||
if (cmd.id !== id) {
|
||||
// ignore this message, it came out of order
|
||||
return;
|
||||
}
|
||||
|
||||
if (cmd.self) {
|
||||
cmd.args = [db];
|
||||
}
|
||||
|
||||
messages.splice(messages.indexOf(onMessage), 1);
|
||||
|
||||
if ('error' === cmd.type) {
|
||||
//console.log('ERROR ARGS');
|
||||
//console.log(cmd);
|
||||
reject(cmd.args[0]);
|
||||
return;
|
||||
}
|
||||
|
||||
//console.log('CLIENT RESOLVING INIT');
|
||||
resolve(cmd.args[0]);
|
||||
return;
|
||||
}
|
||||
|
||||
messages.push(onMessage);
|
||||
});
|
||||
|
||||
return db._initPromise;
|
||||
}
|
||||
|
||||
function rpcThunk(fname, args) {
|
||||
var id;
|
||||
var cb;
|
||||
|
||||
if ('function' === typeof args[args.length - 1]) {
|
||||
id = genId();
|
||||
cb = args.pop();
|
||||
}
|
||||
|
||||
ws.send(JSON.stringify({
|
||||
type: 'init'
|
||||
, args: [opts]
|
||||
, func: 'init'
|
||||
, filename: opts.filename
|
||||
type: 'rpc'
|
||||
, func: fname
|
||||
, args: args
|
||||
, dirname: opts.dirname
|
||||
, prefix: opts.prefix
|
||||
, subtenant: opts.subtenant
|
||||
, tenant: opts.tenant
|
||||
, dbname: opts.dbname
|
||||
, suffix: opts.suffix
|
||||
, ext: opts.ext
|
||||
, id: id
|
||||
}));
|
||||
|
||||
if (!cb) {
|
||||
return;
|
||||
}
|
||||
|
||||
function onMessage(data) {
|
||||
var cmd;
|
||||
|
||||
|
@ -129,7 +359,7 @@ function create(opts) {
|
|||
cmd = JSON.parse(data.toString('utf8'));
|
||||
} catch(e) {
|
||||
console.error('[ERROR] in client, from sql server parse json');
|
||||
console.error(e);
|
||||
console.error(e.stack || e);
|
||||
console.error(data);
|
||||
console.error();
|
||||
|
||||
|
@ -138,119 +368,77 @@ function create(opts) {
|
|||
}
|
||||
|
||||
if (cmd.id !== id) {
|
||||
// ignore this message, it probably came out of order
|
||||
return;
|
||||
}
|
||||
|
||||
if (cmd.self) {
|
||||
cmd.args = [db];
|
||||
}
|
||||
//console.log('RESOLVING RPC', cmd.this, cmd.args);
|
||||
cb.apply(cmd.this, cmd.args);
|
||||
|
||||
messages.splice(messages.indexOf(onMessage), 1);
|
||||
|
||||
if ('error' === cmd.type) {
|
||||
reject(cmd.args[0]);
|
||||
return;
|
||||
if ('on' !== fname) {
|
||||
var index = messages.indexOf(onMessage);
|
||||
messages.splice(index, 1);
|
||||
}
|
||||
resolve(cmd.args[0]);
|
||||
}
|
||||
|
||||
messages.push(onMessage);
|
||||
});
|
||||
}
|
||||
|
||||
function rpcThunk(fname, args) {
|
||||
var id;
|
||||
var cb;
|
||||
|
||||
if ('function' === typeof args[args.length - 1]) {
|
||||
id = Math.random();
|
||||
cb = args.pop();
|
||||
}
|
||||
|
||||
ws.send(JSON.stringify({
|
||||
type: 'rpc'
|
||||
, func: fname
|
||||
, args: args
|
||||
, filename: opts.filename
|
||||
, id: id
|
||||
}));
|
||||
db.sanitize = require('./wrapper').sanitize;
|
||||
db.escape = require('./wrapper').escape;
|
||||
|
||||
if (!cb) {
|
||||
return;
|
||||
}
|
||||
// TODO get methods from server (cluster-store does this)
|
||||
// instead of using the prototype
|
||||
Object.keys(sqlite3real.Database.prototype).forEach(function (key) {
|
||||
|
||||
function onMessage(data) {
|
||||
var cmd;
|
||||
|
||||
try {
|
||||
cmd = JSON.parse(data.toString('utf8'));
|
||||
} catch(e) {
|
||||
console.error('[ERROR] in client, from sql server parse json');
|
||||
console.error(e);
|
||||
console.error(data);
|
||||
console.error();
|
||||
|
||||
//ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } }));
|
||||
return;
|
||||
}
|
||||
|
||||
if (cmd.id !== id) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (cmd.self) {
|
||||
cmd.args = [db];
|
||||
}
|
||||
cb.apply(cmd.this, cmd.args);
|
||||
|
||||
if ('on' !== fname) {
|
||||
var index = messages.indexOf(onMessage);
|
||||
messages.splice(index, 1);
|
||||
}
|
||||
}
|
||||
|
||||
messages.push(onMessage);
|
||||
}
|
||||
|
||||
db.sanitize = require('./wrapper').sanitize;
|
||||
db.escape = require('./wrapper').escape;
|
||||
|
||||
// TODO get methods from server (cluster-store does this)
|
||||
// instead of using the prototype
|
||||
Object.keys(sqlite3real.Database.prototype).forEach(function (key) {
|
||||
|
||||
if ('function' === typeof proto[key]) {
|
||||
db[key] = function () {
|
||||
rpcThunk(key, Array.prototype.slice.call(arguments));
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
db.init = init;
|
||||
|
||||
ws.on('message', function (data) {
|
||||
messages.forEach(function (fn) {
|
||||
try {
|
||||
fn(data);
|
||||
} catch(e) {
|
||||
console.error("[ERROR] ws.on('message', fn) (multi-callback)");
|
||||
console.error(e);
|
||||
// ignore
|
||||
if ('function' === typeof proto[key]) {
|
||||
db[key] = function () {
|
||||
rpcThunk(key, Array.prototype.slice.call(arguments));
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
db.init = init;
|
||||
|
||||
ws.on('message', function (data) {
|
||||
messages.forEach(function (fn) {
|
||||
try {
|
||||
fn(data);
|
||||
} catch(e) {
|
||||
console.error("[ERROR] ws.on('message', fn) (multi-callback)");
|
||||
console.error(e.stack || e);
|
||||
// ignore
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// serialize
|
||||
// parallel
|
||||
db.serialize = db.parallel = function () {
|
||||
throw new Error('NOT IMPLEMENTED in SQLITE3-remote');
|
||||
};
|
||||
|
||||
db.close = function (fn) {
|
||||
ws.__count -= 1;
|
||||
if (!ws.__count) {
|
||||
// waiting for https://github.com/websockets/ws/issues/613 to land
|
||||
// 1000 means 'normal' https://github.com/websockets/ws/blob/master/lib/ErrorCodes.js
|
||||
ws.close(1000, null, fn);
|
||||
}
|
||||
};
|
||||
|
||||
if (opts.init) {
|
||||
return db.init(opts);
|
||||
}
|
||||
return db;
|
||||
});
|
||||
}
|
||||
|
||||
// serialize
|
||||
// parallel
|
||||
db.serialize = db.parallel = function () {
|
||||
throw new Error('NOT IMPLEMENTED in SQLITE3-remote');
|
||||
};
|
||||
|
||||
return db;
|
||||
});
|
||||
}
|
||||
|
||||
return create(opts);
|
||||
};
|
||||
|
||||
module.exports.sanitize = require('./wrapper').sanitize;
|
||||
module.exports.escape = require('./wrapper').escape;
|
||||
module.exports.create = create;
|
||||
|
|
|
@ -20,3 +20,5 @@ function create(opts) {
|
|||
module.exports.sanitize = sqlite3.sanitize;
|
||||
module.exports.escape = sqlite3.escape;
|
||||
module.exports.create = create;
|
||||
module.exports.createServer = sqlite3.createServer;
|
||||
module.exports.createMasterClient = sqlite3.createMasterClient;
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
var cluster = require('cluster');
|
||||
|
||||
if (cluster.isMaster) {
|
||||
require('./master');
|
||||
}
|
||||
else {
|
||||
require('./worker');
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
'use strict';
|
||||
|
||||
var cluster = require('cluster');
|
||||
var minCores = 2;
|
||||
var numCores = Math.max(minCores, require('os').cpus().length);
|
||||
var i;
|
||||
|
||||
function run(connect, ipcKey) {
|
||||
var sqlite3 = require('../../cluster');
|
||||
|
||||
return sqlite3.create({
|
||||
bits: 128
|
||||
, dirname: '/tmp/'
|
||||
, prefix: 'foobar.'
|
||||
, dbname: 'cluster'
|
||||
, suffix: '.test'
|
||||
, ext: '.sqlcipher'
|
||||
, verbose: null
|
||||
, standalone: null
|
||||
, serve: !connect
|
||||
, connect: connect
|
||||
, sock: '/tmp/foobar.sqlite3-cluster.test.sock'
|
||||
, ipcKey: ipcKey
|
||||
});
|
||||
}
|
||||
|
||||
var ipcKey = require('crypto').randomBytes(16).toString('hex');
|
||||
// not a bad idea to setup the master before forking the workers
|
||||
|
||||
run(false, ipcKey).then(function () {
|
||||
var w;
|
||||
|
||||
function setupWorker(w) {
|
||||
function sendKey() {
|
||||
w.send({ ipcKey: ipcKey });
|
||||
}
|
||||
w.on('online', sendKey);
|
||||
}
|
||||
|
||||
for (i = 1; i <= numCores; i += 1) {
|
||||
w = cluster.fork();
|
||||
setupWorker(w);
|
||||
}
|
||||
});
|
||||
|
||||
process.on('beforeExit', function () {
|
||||
console.log("[MASTER] I've got nothing left to live for... ugh... death is upon me...");
|
||||
});
|
||||
// The native Promise implementation ignores errors because... dumbness???
|
||||
process.on('unhandledRejection', function (err) {
|
||||
console.error('Unhandled Promise Rejection');
|
||||
console.error(err);
|
||||
console.error(err.stack);
|
||||
|
||||
process.exit(1);
|
||||
});
|
|
@ -0,0 +1,122 @@
|
|||
'use strict';
|
||||
|
||||
var cluster = require('cluster');
|
||||
|
||||
function testSelect(client) {
|
||||
var PromiseA = require('bluebird');
|
||||
|
||||
return new PromiseA(function (resolve, reject) {
|
||||
client.run('CREATE TABLE IF NOT EXISTS meta (version TEXT)', function (err) {
|
||||
if (err) {
|
||||
console.error('[ERROR]', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.error(err.stack);
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
client.get("SELECT version FROM meta", [], function (err, result) {
|
||||
if (err) {
|
||||
console.error('[ERROR] create table', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.error(err);
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('[this] Worker #', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.log(this);
|
||||
|
||||
console.log('[result] Worker #', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.log(result);
|
||||
|
||||
resolve(client);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function init(ipcKey) {
|
||||
var sqlite3 = require('../../cluster');
|
||||
|
||||
return sqlite3.create({
|
||||
bits: 128
|
||||
, dirname: '/tmp/'
|
||||
, prefix: 'foobar.'
|
||||
, dbname: 'cluster'
|
||||
, suffix: '.test'
|
||||
, ext: '.sqlcipher'
|
||||
, verbose: null
|
||||
, standalone: null
|
||||
, serve: null
|
||||
, connect: true
|
||||
, sock: '/tmp/foobar.sqlite3-cluster.test.sock'
|
||||
, ipcKey: ipcKey
|
||||
}).then(function (client) {
|
||||
//console.log('[INIT] begin');
|
||||
return client.init({ algorithm: 'aes', bits: 128, key: '00000000000000000000000000000000' }).then(function (args) {
|
||||
//console.log('[INIT]', args);
|
||||
return client;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function run(connect, ipcKey) {
|
||||
var sqlite3 = require('../../cluster');
|
||||
|
||||
return sqlite3.create({
|
||||
bits: 128
|
||||
, dirname: '/tmp/'
|
||||
, prefix: 'foobar.'
|
||||
, dbname: 'cluster'
|
||||
, suffix: '.test'
|
||||
, ext: '.sqlcipher'
|
||||
, verbose: null
|
||||
, standalone: null
|
||||
, serve: !connect
|
||||
, connect: connect
|
||||
, sock: '/tmp/foobar.sqlite3-cluster.test.sock'
|
||||
, ipcKey: ipcKey
|
||||
});//.then(testSelect);
|
||||
}
|
||||
|
||||
function onMessage(msg) {
|
||||
function loseTheWillToLive() {
|
||||
process.removeListener('message', onMessage);
|
||||
// child processes do not exit when their event loop is empty
|
||||
process.nextTick(function () {
|
||||
process.exit(0);
|
||||
});
|
||||
}
|
||||
|
||||
console.log('New Worker', cluster.worker.id, msg);
|
||||
if (1 === cluster.worker.id) {
|
||||
init(msg.ipcKey).then(testSelect).then(function (client) {
|
||||
console.log('init worker closing...');
|
||||
setTimeout(function () {
|
||||
client.close(loseTheWillToLive);
|
||||
loseTheWillToLive();
|
||||
}, 1000);
|
||||
// waiting for https://github.com/websockets/ws/issues/613 to land
|
||||
});
|
||||
} else {
|
||||
setTimeout(function () {
|
||||
run(true, msg.ipcKey).then(testSelect).then(function (client) {
|
||||
console.log('other working closing...');
|
||||
client.close(loseTheWillToLive);
|
||||
loseTheWillToLive();
|
||||
});
|
||||
}, 100);
|
||||
}
|
||||
}
|
||||
process.on('message', onMessage);
|
||||
|
||||
// The native Promise implementation ignores errors because... dumbness???
|
||||
process.on('beforeExit', function () {
|
||||
console.log("[WORKER] I've got nothing left to do");
|
||||
});
|
||||
process.on('unhandledRejection', function (err) {
|
||||
console.error('Unhandled Promise Rejection');
|
||||
console.error(err);
|
||||
console.error(err.stack);
|
||||
|
||||
process.exit(1);
|
||||
});
|
|
@ -0,0 +1,8 @@
|
|||
var cluster = require('cluster');
|
||||
|
||||
if (cluster.isMaster) {
|
||||
require('./master');
|
||||
}
|
||||
else {
|
||||
require('./worker');
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
'use strict';
|
||||
|
||||
var cluster = require('cluster');
|
||||
var minCores = 3;
|
||||
var numCores = Math.max(minCores, require('os').cpus().length);
|
||||
var i;
|
||||
var sock = '/tmp/foobar.sqlite3-cluster.test.sock';
|
||||
|
||||
function run(connect, ipcKey) {
|
||||
var sqlite3 = require('../../cluster');
|
||||
|
||||
return sqlite3.create({
|
||||
verbose: null
|
||||
, standalone: null
|
||||
, serve: sock
|
||||
, connect: null
|
||||
, sock: sock
|
||||
, ipcKey: ipcKey
|
||||
});
|
||||
}
|
||||
|
||||
var ipcKey = require('crypto').randomBytes(16).toString('hex');
|
||||
// not a bad idea to setup the master before forking the workers
|
||||
|
||||
run(false, ipcKey).then(function () {
|
||||
var w;
|
||||
|
||||
function setupWorker(w) {
|
||||
function sendKey() {
|
||||
w.send({ ipcKey: ipcKey, sock: sock });
|
||||
}
|
||||
w.on('online', sendKey);
|
||||
}
|
||||
|
||||
for (i = 1; i <= numCores; i += 1) {
|
||||
w = cluster.fork();
|
||||
setupWorker(w);
|
||||
}
|
||||
});
|
||||
|
||||
process.on('beforeExit', function () {
|
||||
console.log("[MASTER] I've got nothing left to live for... ugh... death is upon me...");
|
||||
});
|
||||
// The native Promise implementation ignores errors because... dumbness???
|
||||
process.on('unhandledRejection', function (err) {
|
||||
console.error('Unhandled Promise Rejection');
|
||||
console.error(err);
|
||||
console.error(err.stack);
|
||||
|
||||
process.exit(1);
|
||||
});
|
|
@ -0,0 +1,118 @@
|
|||
'use strict';
|
||||
|
||||
var cluster = require('cluster');
|
||||
|
||||
console.log("[Worker #" + cluster.worker.id + "]");
|
||||
|
||||
function testSelect(client) {
|
||||
var PromiseA = require('bluebird');
|
||||
|
||||
return new PromiseA(function (resolve, reject) {
|
||||
client.run('CREATE TABLE IF NOT EXISTS meta (version TEXT)', function (err) {
|
||||
if (err) {
|
||||
if ('E_NO_INIT' === err.code) {
|
||||
console.error(err.message);
|
||||
resolve(client);
|
||||
return;
|
||||
}
|
||||
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
client.get("SELECT version FROM meta", [], function (err, result) {
|
||||
if (err) {
|
||||
console.error('[ERROR] create table', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.error(err.stack || err);
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('[this] Worker #', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.log(this);
|
||||
|
||||
console.log('[result] Worker #', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.log(result);
|
||||
|
||||
resolve(client);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function run(ipcKey, sock) {
|
||||
var sqlite3 = require('../../cluster');
|
||||
|
||||
return sqlite3.create({
|
||||
bits: 128
|
||||
, dirname: '/tmp/'
|
||||
, prefix: 'foobar.'
|
||||
, dbname: 'cluster'
|
||||
, suffix: '.test'
|
||||
, ext: '.sqlcipher'
|
||||
, verbose: null
|
||||
, standalone: null
|
||||
, serve: null
|
||||
, connect: sock
|
||||
, sock: sock
|
||||
, ipcKey: ipcKey
|
||||
});
|
||||
}
|
||||
|
||||
function init(client) {
|
||||
//console.log('[INIT] begin');
|
||||
return client.init({
|
||||
algorithm: 'aes'
|
||||
, bits: 128
|
||||
, mode: 'cbc'
|
||||
, key: '00000000000000000000000000000000'
|
||||
}).then(function (/*args*/) {
|
||||
//console.log('[INIT]', args);
|
||||
return client;
|
||||
});
|
||||
}
|
||||
|
||||
function onMessage(msg) {
|
||||
function loseTheWillToLive() {
|
||||
process.removeListener('message', onMessage);
|
||||
// child processes do not exit when their event loop is empty
|
||||
setTimeout(function () {
|
||||
console.log('#' + cluster.worker.id + ' dead');
|
||||
process.exit(0);
|
||||
}, 100);
|
||||
}
|
||||
|
||||
console.log('New Worker', cluster.worker.id, msg);
|
||||
if (1 === cluster.worker.id) {
|
||||
//setTimeout(function () {
|
||||
run(msg.ipcKey, msg.sock).then(testSelect).then(function (client) {
|
||||
setTimeout(function () {
|
||||
console.log('init worker closing...');
|
||||
client.close(loseTheWillToLive);
|
||||
loseTheWillToLive();
|
||||
}, 1000);
|
||||
// waiting for https://github.com/websockets/ws/issues/613 to land
|
||||
});
|
||||
//}, 1000);
|
||||
} else {
|
||||
setTimeout(function () {
|
||||
run(msg.ipcKey, msg.sock).then(init).then(testSelect).then(function (client) {
|
||||
console.log('#' + cluster.worker.id + ' other working closing...');
|
||||
client.close(loseTheWillToLive);
|
||||
loseTheWillToLive();
|
||||
});
|
||||
}, 100 * cluster.worker.id);
|
||||
}
|
||||
}
|
||||
process.on('message', onMessage);
|
||||
|
||||
// The native Promise implementation ignores errors because... dumbness???
|
||||
process.on('beforeExit', function () {
|
||||
console.log("[WORKER] I've got nothing left to do");
|
||||
});
|
||||
process.on('unhandledRejection', function (err) {
|
||||
console.error('Unhandled Promise Rejection');
|
||||
console.error(err.stack || err);
|
||||
|
||||
process.exit(1);
|
||||
});
|
|
@ -0,0 +1,8 @@
|
|||
var cluster = require('cluster');
|
||||
|
||||
if (cluster.isMaster) {
|
||||
require('./master');
|
||||
}
|
||||
else {
|
||||
require('./worker');
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
'use strict';
|
||||
|
||||
var cluster = require('cluster');
|
||||
var minCores = 3;
|
||||
var numCores = Math.max(minCores, require('os').cpus().length);
|
||||
var i;
|
||||
var sock = '/tmp/foobar.sqlite3-cluster.test.sock';
|
||||
|
||||
function run(connect, ipcKey) {
|
||||
var sqlite3 = require('../../server');
|
||||
|
||||
return sqlite3.createServer({
|
||||
verbose: null
|
||||
, sock: sock
|
||||
, ipcKey: ipcKey
|
||||
});
|
||||
}
|
||||
|
||||
var ipcKey = require('crypto').randomBytes(16).toString('hex');
|
||||
// not a bad idea to setup the master before forking the workers
|
||||
|
||||
run(false, ipcKey).then(function () {
|
||||
var w;
|
||||
|
||||
function setupWorker(w) {
|
||||
function sendKey() {
|
||||
w.send({ ipcKey: ipcKey, sock: sock });
|
||||
}
|
||||
w.on('online', sendKey);
|
||||
}
|
||||
|
||||
for (i = 1; i <= numCores; i += 1) {
|
||||
w = cluster.fork();
|
||||
setupWorker(w);
|
||||
}
|
||||
});
|
||||
|
||||
process.on('beforeExit', function () {
|
||||
console.log("[MASTER] I've got nothing left to live for... ugh... death is upon me...");
|
||||
});
|
||||
// The native Promise implementation ignores errors because... dumbness???
|
||||
process.on('unhandledRejection', function (err) {
|
||||
console.error('Unhandled Promise Rejection');
|
||||
console.error(err);
|
||||
console.error(err.stack);
|
||||
|
||||
process.exit(1);
|
||||
});
|
|
@ -0,0 +1,115 @@
|
|||
'use strict';
|
||||
|
||||
var cluster = require('cluster');
|
||||
|
||||
console.log("[Worker #" + cluster.worker.id + "]");
|
||||
|
||||
function testSelect(client) {
|
||||
var PromiseA = require('bluebird');
|
||||
|
||||
return new PromiseA(function (resolve, reject) {
|
||||
client.run('CREATE TABLE IF NOT EXISTS meta (version TEXT)', function (err) {
|
||||
if (err) {
|
||||
if ('E_NO_INIT' === err.code) {
|
||||
console.error(err.message);
|
||||
resolve(client);
|
||||
return;
|
||||
}
|
||||
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
client.get("SELECT version FROM meta", [], function (err, result) {
|
||||
if (err) {
|
||||
console.error('[ERROR] create table', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.error(err.stack || err);
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('Worker #', (cluster.isMaster && '0' || cluster.worker.id), '[this], [result]:');
|
||||
console.log(this);
|
||||
console.log(result);
|
||||
|
||||
resolve(client);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function run(clientFactory) {
|
||||
return clientFactory.create({
|
||||
init: false
|
||||
, dbname: 'factory.' + cluster.worker.id
|
||||
});
|
||||
}
|
||||
|
||||
function init(client) {
|
||||
return client.init({
|
||||
algorithm: 'aes'
|
||||
, bits: 128
|
||||
, mode: 'cbc'
|
||||
, key: '00000000000000000000000000000000'
|
||||
}).then(function (/*db*/) {
|
||||
return client;
|
||||
});
|
||||
}
|
||||
|
||||
function onMessage(msg) {
|
||||
function loseTheWillToLive() {
|
||||
process.removeListener('message', onMessage);
|
||||
// child processes do not exit when their event loop is empty
|
||||
setTimeout(function () {
|
||||
console.log('#' + cluster.worker.id + ' dead');
|
||||
process.exit(0);
|
||||
}, 100);
|
||||
}
|
||||
|
||||
var clientFactory = require('../../client').createClientFactory({
|
||||
algorithm: 'aes'
|
||||
, bits: 128
|
||||
, mode: 'cbc'
|
||||
, dirname: '/tmp/'
|
||||
, prefix: 'foobar.'
|
||||
//, dbname: 'cluster'
|
||||
, suffix: '.test'
|
||||
, ext: '.sqlcipher'
|
||||
, sock: msg.sock
|
||||
, ipcKey: msg.ipcKey
|
||||
});
|
||||
|
||||
console.log('New Worker', cluster.worker.id, msg);
|
||||
if (1 === cluster.worker.id) {
|
||||
//setTimeout(function () {
|
||||
run(clientFactory).then(testSelect).then(function (client) {
|
||||
setTimeout(function () {
|
||||
console.log('init worker closing...');
|
||||
client.close(loseTheWillToLive);
|
||||
loseTheWillToLive();
|
||||
}, 1000);
|
||||
// waiting for https://github.com/websockets/ws/issues/613 to land
|
||||
});
|
||||
//}, 1000);
|
||||
} else {
|
||||
setTimeout(function () {
|
||||
run(clientFactory).then(init).then(testSelect).then(function (client) {
|
||||
console.log('#' + cluster.worker.id + ' other working closing...');
|
||||
client.close(loseTheWillToLive);
|
||||
loseTheWillToLive();
|
||||
});
|
||||
}, 100 * cluster.worker.id);
|
||||
}
|
||||
}
|
||||
process.on('message', onMessage);
|
||||
|
||||
// The native Promise implementation ignores errors because... dumbness???
|
||||
process.on('beforeExit', function () {
|
||||
console.log("[WORKER] I've got nothing left to do");
|
||||
});
|
||||
process.on('unhandledRejection', function (err) {
|
||||
console.error('Unhandled Promise Rejection');
|
||||
console.error(err.stack || err);
|
||||
|
||||
process.exit(1);
|
||||
});
|
|
@ -1,17 +1,24 @@
|
|||
'use strict';
|
||||
|
||||
function run() {
|
||||
var sqlite3 = require('./standalone');
|
||||
var sqlite3 = require('../standalone');
|
||||
var promise;
|
||||
|
||||
sqlite3.create({
|
||||
promise = sqlite3.create({
|
||||
key: '00000000000000000000000000000000'
|
||||
, bits: 128
|
||||
, filename: '/tmp/test.cluster.sqlcipher'
|
||||
, dirname: '/tmp/'
|
||||
, prefix: 'foobar.'
|
||||
, dbname: 'standalone'
|
||||
, suffix: '.test'
|
||||
, ext: '.sqlcipher'
|
||||
, verbose: null
|
||||
, standalone: true
|
||||
, serve: null
|
||||
, connect: null
|
||||
}).then(function (client) {
|
||||
});
|
||||
|
||||
promise.then(function (client) {
|
||||
client.all("SELECT ?", ['Hello World!'], function (err, result) {
|
||||
if (err) {
|
||||
console.error('[ERROR] standalone');
|
||||
|
@ -31,7 +38,7 @@ function run() {
|
|||
run();
|
||||
|
||||
// The native Promise implementation ignores errors because... dumbness???
|
||||
process.on('unhandledPromiseRejection', function (err) {
|
||||
process.on('unhandledRejection', function (err) {
|
||||
console.error('Unhandled Promise Rejection');
|
||||
console.error(err);
|
||||
console.error(err.stack);
|
14
package.json
14
package.json
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "sqlite3-cluster",
|
||||
"version": "1.1.2",
|
||||
"version": "2.1.2",
|
||||
"description": "A wrapper to enable the use of sqlite3 with node cluster via a socket server (i.e. for Raspberry Pi 2).",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
|
@ -9,7 +9,7 @@
|
|||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git+https://github.com/coolaj86/sqlite3-cluster.git"
|
||||
"url": "git+https://git.coolaj86.com/coolaj86/sqlite3-cluster.js.git"
|
||||
},
|
||||
"keywords": [
|
||||
"sqlite3",
|
||||
|
@ -18,14 +18,14 @@
|
|||
"cluster",
|
||||
"rpi2"
|
||||
],
|
||||
"author": "AJ ONeal <coolaj86@gmail.com> (http://coolaj86.com/)",
|
||||
"license": "Apache-2.0",
|
||||
"author": "AJ ONeal <coolaj86@gmail.com> (https://coolaj86.com/)",
|
||||
"license": "(MIT OR Apache-2.0)",
|
||||
"bugs": {
|
||||
"url": "https://github.com/coolaj86/sqlite3-cluster/issues"
|
||||
"url": "https://git.coolaj86.com/coolaj86/sqlite3-cluster.js/issues"
|
||||
},
|
||||
"homepage": "https://github.com/coolaj86/sqlite3-cluster#readme",
|
||||
"homepage": "https://git.coolaj86.com/coolaj86/sqlite3-cluster.js",
|
||||
"dependencies": {
|
||||
"sqlite3": "^3.0.9",
|
||||
"ws": "^0.7.2"
|
||||
"ws": "^2.3.1"
|
||||
}
|
||||
}
|
||||
|
|
202
server.js
202
server.js
|
@ -1,55 +1,90 @@
|
|||
'use strict';
|
||||
/*global Promise*/
|
||||
|
||||
var PromiseA = Promise;
|
||||
try {
|
||||
PromiseA = require('bluebird').Promise;
|
||||
} catch(e) {
|
||||
console.warn("For better Promise support please use bluebird");
|
||||
}
|
||||
var PromiseA = require('bluebird').Promise;
|
||||
var wsses = {};
|
||||
|
||||
function createApp(server, options) {
|
||||
function createApp(servers, options) {
|
||||
var url = require('url');
|
||||
var wss = servers.wss;
|
||||
//var server = servers.server;
|
||||
|
||||
if (wsses[options.filename]) {
|
||||
return PromiseA.resolve(wsses[options.filename]);
|
||||
//var express = require('express');
|
||||
//var app = express();
|
||||
/*
|
||||
function app(req, res) {
|
||||
res.end('NOT IMPLEMENTED');
|
||||
}
|
||||
*/
|
||||
|
||||
return require('./wrapper').create(options).then(function (db) {
|
||||
wss.on('connection', function (ws) {
|
||||
var location = url.parse(ws.upgradeReq.url, true);
|
||||
// you might use location.query.access_token to authenticate or share sessions
|
||||
// or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312
|
||||
|
||||
var url = require('url');
|
||||
//var express = require('express');
|
||||
//var app = express();
|
||||
var wss = server.wss;
|
||||
|
||||
function app(req, res) {
|
||||
res.end('NOT IMPLEMENTED');
|
||||
if (!options.ipcKey) {
|
||||
console.warn("[S] [SECURITY] please include { ipcKey: crypto.randomBytes(16).toString('base64') }"
|
||||
+ " in your options and pass it from master to worker processes with worker.send()");
|
||||
ws._authorized = true;
|
||||
} else {
|
||||
ws._authorized = (options.ipcKey === (location.query.ipcKey || location.query.ipc_key));
|
||||
}
|
||||
|
||||
wss.on('connection', function (ws) {
|
||||
if (!ws._authorized) {
|
||||
ws.send(JSON.stringify({ error: { message: "Unauthorized: ipc_key does not match", code: 'E_UNAUTHORIZED_IPCKEY' } }));
|
||||
ws.close();
|
||||
return;
|
||||
}
|
||||
|
||||
var location = url.parse(ws.upgradeReq.url, true);
|
||||
// you might use location.query.access_token to authenticate or share sessions
|
||||
// or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312
|
||||
if (!wss.__count) {
|
||||
wss.__count = 0;
|
||||
}
|
||||
wss.__count += 1;
|
||||
|
||||
ws.__session_id = location.query.session_id || Math.random();
|
||||
function decrWs() {
|
||||
wss.__count -= 1;
|
||||
if (!wss.__count) {
|
||||
console.log('[S] client count is zero, but server will be left open');
|
||||
/*
|
||||
wss.close(function () {
|
||||
console.log('wss.closed');
|
||||
});
|
||||
server.close(function () {
|
||||
console.log('server closed, but will not exit due to bug? in wss.close()');
|
||||
process.exit(0);
|
||||
});
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
ws.on('message', function (buffer) {
|
||||
var cmd;
|
||||
ws.on('error', function (err) {
|
||||
console.error('[S] [WebSocket error]');
|
||||
console.error(err.stack);
|
||||
decrWs();
|
||||
});
|
||||
ws.on('close', decrWs);
|
||||
ws.on('message', function (buffer) {
|
||||
var cmd;
|
||||
var promise;
|
||||
|
||||
try {
|
||||
cmd = JSON.parse(buffer.toString('utf8'));
|
||||
} catch(e) {
|
||||
console.error('[ERROR] parse json');
|
||||
console.error(e);
|
||||
console.error(buffer);
|
||||
console.error();
|
||||
ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } }));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
cmd = JSON.parse(buffer.toString('utf8'));
|
||||
} catch(e) {
|
||||
console.error('[S] [ERROR] parse json');
|
||||
console.error(e.stack || e);
|
||||
console.error(buffer);
|
||||
console.error();
|
||||
ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } }));
|
||||
return;
|
||||
}
|
||||
|
||||
//console.log('cmd');
|
||||
//console.log(cmd);
|
||||
// caching and create logic happens in the wrapper stored here below
|
||||
promise = require('./wrapper').create(cmd && cmd.dbname && cmd || options).then(function (db) {
|
||||
|
||||
switch(cmd.type) {
|
||||
case 'init':
|
||||
//console.log('[S] init', cmd);
|
||||
db[cmd.func].apply(db, cmd.args).then(function () {
|
||||
var args = Array.prototype.slice.call(arguments);
|
||||
var myself;
|
||||
|
@ -59,6 +94,7 @@ function createApp(server, options) {
|
|||
myself = true;
|
||||
}
|
||||
|
||||
//console.log('[INIT HAPPENING]');
|
||||
ws.send(JSON.stringify({
|
||||
id: cmd.id
|
||||
, self: myself
|
||||
|
@ -69,12 +105,13 @@ function createApp(server, options) {
|
|||
break;
|
||||
|
||||
case 'rpc':
|
||||
if (!db._initialized) {
|
||||
if ('close' !== cmd.func && !db._initialized) {
|
||||
//console.log('[RPC NOT HAPPENING]');
|
||||
ws.send(JSON.stringify({
|
||||
type: 'error'
|
||||
, id: cmd.id
|
||||
, args: [{ message: 'database has not been initialized' }]
|
||||
, error: { message: 'database has not been initialized' }
|
||||
, args: [{ message: 'database has not been initialized', code: 'E_NO_INIT' }]
|
||||
, error: { message: 'database has not been initialized', code: 'E_NO_INIT' }
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
@ -88,6 +125,7 @@ function createApp(server, options) {
|
|||
myself = true;
|
||||
}
|
||||
|
||||
//console.log('[RPC HAPPENING]', args, cmd.id);
|
||||
ws.send(JSON.stringify({
|
||||
this: (!err && this !== global) ? this : {}
|
||||
, args: args
|
||||
|
@ -106,42 +144,80 @@ function createApp(server, options) {
|
|||
}
|
||||
|
||||
});
|
||||
|
||||
ws.send(JSON.stringify({ type: 'session', value: ws.__session_id }));
|
||||
});
|
||||
});
|
||||
|
||||
app.masterClient = db;
|
||||
wsses[options.filename] = app;
|
||||
// wsses[options.sock] = app;
|
||||
return PromiseA.resolve();
|
||||
}
|
||||
|
||||
return app;
|
||||
function newSocket(options) {
|
||||
if (wsses[options.sock]) {
|
||||
return PromiseA.resolve(wsses[options.sock]);
|
||||
}
|
||||
|
||||
wsses[options.sock] = new PromiseA(function (resolve) {
|
||||
var fs = require('fs');
|
||||
fs.unlink(options.sock, function () {
|
||||
var server = require('http').createServer();
|
||||
// ignore error when socket doesn't exist
|
||||
server.listen(options.sock, function () {
|
||||
resolve(server);
|
||||
});
|
||||
});
|
||||
}).then(function (server) {
|
||||
var WebSocketServer = require('ws').Server;
|
||||
var servers = {
|
||||
server: require('http').createServer()
|
||||
, wss: new WebSocketServer({ server: server })
|
||||
};
|
||||
|
||||
return createApp(servers, options).then(function (/*app*/) {
|
||||
// server.on('request', app);
|
||||
wsses[options.sock] = servers;
|
||||
|
||||
return wsses[options.sock];
|
||||
});
|
||||
});
|
||||
|
||||
return wsses[options.sock];
|
||||
}
|
||||
|
||||
function createMasterClient(options) {
|
||||
return require('./wrapper').create(options, null).then(function (db) {
|
||||
return db;
|
||||
});
|
||||
}
|
||||
|
||||
function createServer(options) {
|
||||
if (!options.sock) {
|
||||
throw new Error("Please provide options.sock as the socket to serve from");
|
||||
}
|
||||
options.server = options.sock;
|
||||
|
||||
return newSocket(options).then(function () {
|
||||
var result = {};
|
||||
|
||||
Object.keys(wsses[options.sock]).forEach(function (key) {
|
||||
result[key] = wsses[options.sock][key];
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function create(options) {
|
||||
var server = require('http').createServer();
|
||||
var WebSocketServer = require('ws').Server;
|
||||
var wss = new WebSocketServer({ server: server });
|
||||
//var port = process.env.PORT || process.argv[0] || 4080;
|
||||
return createServer(options).then(function (result) {
|
||||
if (!options.dbname) {
|
||||
return result;
|
||||
}
|
||||
|
||||
var fs = require('fs');
|
||||
var ps = [];
|
||||
return createMasterClient(options).then(function (db) {
|
||||
result.masterClient = db;
|
||||
|
||||
ps.push(new PromiseA(function (resolve) {
|
||||
fs.unlink(options.sock, function () {
|
||||
// ignore error when socket doesn't exist
|
||||
|
||||
server.listen(options.sock, resolve);
|
||||
return result;
|
||||
});
|
||||
}));
|
||||
|
||||
ps.push(createApp({ server: server, wss: wss }, options).then(function (app) {
|
||||
server.on('request', app);
|
||||
return { masterClient: app.masterClient };
|
||||
}));
|
||||
|
||||
return PromiseA.all(ps).then(function (results) {
|
||||
return results[1];
|
||||
});
|
||||
}
|
||||
|
||||
module.exports.create = create;
|
||||
module.exports.createMasterClient = createMasterClient;
|
||||
module.exports.createServer = createServer;
|
||||
|
|
|
@ -1,95 +0,0 @@
|
|||
'use strict';
|
||||
|
||||
var cluster = require('cluster');
|
||||
//var numCores = 2;
|
||||
var numCores = require('os').cpus().length;
|
||||
var i;
|
||||
|
||||
function testSelect(client) {
|
||||
return client.run('CREATE TABLE IF NOT EXISTS meta (version TEXT)', function (err) {
|
||||
if (err) {
|
||||
console.error('[ERROR] create table', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.error(err);
|
||||
return;
|
||||
}
|
||||
|
||||
return client.get("SELECT version FROM meta", [], function (err, result) {
|
||||
|
||||
if (err) {
|
||||
console.error('[ERROR]', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.error(err);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('[this]', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.log(this);
|
||||
|
||||
console.log('[result]', cluster.isMaster && '0' || cluster.worker.id);
|
||||
console.log(result);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function init() {
|
||||
var sqlite3 = require('./cluster');
|
||||
|
||||
return sqlite3.create({
|
||||
bits: 128
|
||||
, filename: '/tmp/test.cluster.sqlcipher'
|
||||
, verbose: null
|
||||
, standalone: null
|
||||
, serve: null
|
||||
, connect: null
|
||||
}).then(function (client) {
|
||||
console.log('[INIT] begin');
|
||||
return client.init({ bits: 128, key: '00000000000000000000000000000000' });
|
||||
}).then(testSelect, function (err) {
|
||||
console.error('[ERROR]');
|
||||
console.error(err);
|
||||
}).then(function () {
|
||||
console.log('success');
|
||||
}, function (err) {
|
||||
console.error('[ERROR 2]');
|
||||
console.error(err);
|
||||
});
|
||||
}
|
||||
|
||||
function run() {
|
||||
var sqlite3 = require('./cluster');
|
||||
|
||||
return sqlite3.create({
|
||||
bits: 128
|
||||
, filename: '/tmp/test.cluster.sqlcipher'
|
||||
, verbose: null
|
||||
, standalone: null
|
||||
, serve: null
|
||||
, connect: null
|
||||
});//.then(testSelect);
|
||||
}
|
||||
|
||||
if (cluster.isMaster) {
|
||||
// not a bad idea to setup the master before forking the workers
|
||||
run().then(function () {
|
||||
for (i = 1; i <= numCores; i += 1) {
|
||||
cluster.fork();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
if (1 === cluster.worker.id) {
|
||||
init().then(testSelect);
|
||||
return;
|
||||
} else {
|
||||
setTimeout(function () {
|
||||
run().then(testSelect);
|
||||
}, 100);
|
||||
}
|
||||
}
|
||||
|
||||
// The native Promise implementation ignores errors because... dumbness???
|
||||
process.on('unhandledPromiseRejection', function (err) {
|
||||
console.error('Unhandled Promise Rejection');
|
||||
console.error(err);
|
||||
console.error(err.stack);
|
||||
|
||||
process.exit(1);
|
||||
});
|
90
wrapper.js
90
wrapper.js
|
@ -1,39 +1,42 @@
|
|||
'use strict';
|
||||
|
||||
/*global Promise*/
|
||||
var PromiseA = Promise;
|
||||
try {
|
||||
PromiseA = require('bluebird').Promise;
|
||||
} catch(e) {
|
||||
console.warn("For better Promise support please use bluebird");
|
||||
}
|
||||
var sqlite3 = require('sqlite3');
|
||||
// TODO expire unused dbs from cache
|
||||
var dbs = {};
|
||||
|
||||
function sanitize(str) {
|
||||
return String(str).replace("'", "''");
|
||||
}
|
||||
|
||||
function create(opts) {
|
||||
var db;
|
||||
|
||||
function create(opts, verbs) {
|
||||
if (!verbs) {
|
||||
verbs = {};
|
||||
}
|
||||
if (!opts) {
|
||||
opts = {};
|
||||
}
|
||||
|
||||
if (opts.verbose) {
|
||||
sqlite3.verbose();
|
||||
var db;
|
||||
var PromiseA = verbs.Promise || require('bluebird');
|
||||
var dbname = "";
|
||||
|
||||
dbname += (opts.prefix || '');
|
||||
if (opts.subtenant) {
|
||||
dbname += opts.subtenant + '.';
|
||||
}
|
||||
|
||||
if (!dbs[opts.filename]) {
|
||||
dbs[opts.filename] = new sqlite3.Database(opts.filename);
|
||||
if (opts.tenant) {
|
||||
dbname += opts.tenant + '.';
|
||||
}
|
||||
dbname += (opts.dbname || '');
|
||||
dbname += (opts.suffix || '');
|
||||
dbname += (opts.ext || '');
|
||||
dbname = require('path').resolve(opts.dirname || '', dbname);
|
||||
|
||||
db = dbs[opts.filename];
|
||||
db.sanitize = sanitize;
|
||||
db.escape = sanitize;
|
||||
function initDb(newOpts) {
|
||||
if (dbs[dbname].initPromise) {
|
||||
return dbs[dbname].initPromise;
|
||||
}
|
||||
|
||||
db.init = function (newOpts) {
|
||||
if (!newOpts) {
|
||||
newOpts = {};
|
||||
}
|
||||
|
@ -41,23 +44,25 @@ function create(opts) {
|
|||
var key = newOpts.key || opts.key;
|
||||
var bits = newOpts.bits || opts.bits;
|
||||
|
||||
return new PromiseA(function (resolve, reject) {
|
||||
if (db._initialized) {
|
||||
dbs[dbname].initPromise = new PromiseA(function (resolve, reject) {
|
||||
if (dbs[dbname].db._initialized) {
|
||||
resolve(db);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!key) {
|
||||
if (!bits) {
|
||||
db._initialized = true;
|
||||
//console.log("INITIALIZED WITHOUT KEY");
|
||||
//console.log(opts);
|
||||
dbs[dbname].db._initialized = true;
|
||||
}
|
||||
dbs[dbname].db = db;
|
||||
resolve(db);
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO test key length
|
||||
|
||||
db._initialized = true;
|
||||
db.serialize(function () {
|
||||
var setup = [];
|
||||
|
||||
|
@ -83,13 +88,48 @@ function create(opts) {
|
|||
|
||||
PromiseA.all(setup).then(function () {
|
||||
// restore original functions
|
||||
dbs[dbname].db._initialized = true;
|
||||
dbs[dbname].db = db;
|
||||
|
||||
resolve(db);
|
||||
}, reject);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
return db.init(opts);
|
||||
return dbs[dbname].initPromise;
|
||||
}
|
||||
|
||||
function newDb() {
|
||||
// dbs[dbname] = db //
|
||||
db = new sqlite3.Database(dbname);
|
||||
db.init = initDb;
|
||||
db.sanitize = sanitize;
|
||||
db.escape = sanitize;
|
||||
|
||||
if (opts.verbose) {
|
||||
sqlite3.verbose();
|
||||
}
|
||||
|
||||
return db;
|
||||
}
|
||||
|
||||
// Could be any of:
|
||||
// * db object
|
||||
// * init promise
|
||||
|
||||
if (!dbs[dbname]) {
|
||||
dbs[dbname] = { db: newDb() };
|
||||
}
|
||||
|
||||
if (dbs[dbname].db._initialized) {
|
||||
return PromiseA.resolve(dbs[dbname].db);
|
||||
}
|
||||
|
||||
if (opts.init || ('init' === opts.type) || (opts.bits && opts.key)) {
|
||||
dbs[dbname].initPromise = db.init(opts);
|
||||
}
|
||||
|
||||
return dbs[dbname].initPromise || PromiseA.resolve(dbs[dbname].db);
|
||||
}
|
||||
|
||||
module.exports.sanitize = sanitize;
|
||||
|
|
Loading…
Reference in New Issue