begin refactor for shared socket with many dbs

This commit is contained in:
AJ ONeal 2015-11-09 20:49:53 +00:00
parent d80eb63c13
commit 3b0b2f7097
7 changed files with 644 additions and 324 deletions

518
client.js
View File

@ -1,54 +1,73 @@
'use strict'; 'use strict';
/*global Promise*/ // I'm not a fan of singletons (as they are EVIL).
// However, this module is, by intent, meant to
// represent a worker process in a cluster.
// Thus, it is appropriately a singleton.
var processWebSocket;
var promiseWebSocket;
var PromiseA = Promise; function startServer(opts, verbs, myServer) {
try { if (myServer) {
PromiseA = require('bluebird').Promise; return verbs.Promise.resolve(myServer);
} catch(e) { }
console.warn("For better Promise support please use bluebird"); return require('./server').create(opts, verbs).then(function (server) {
}
// TODO iterate over the prototype
// translate request / response
var sqlite3real = require('sqlite3');
/*
function createConnection(opts) {
var server = ;
return server.create(opts).then(function () {
// created and listening
});
}
*/
function startServer(opts) {
return require('./server').create(opts).then(function (server) {
// this process doesn't need to connect to itself // this process doesn't need to connect to itself
// through a socket // through a socket
return server.masterClient; return server.masterClient;
}); });
} }
function getConnection(opts) { // connection is scoped per-process, nothing more
return new PromiseA(function (resolve) { function getConnection(opts, verbs, mySocket) {
function incr(ws) {
if (!ws.__count) {
ws.__count = 0;
}
ws.__count += 1;
return ws;
}
if (mySocket || processWebSocket) {
promiseWebSocket = verbs.Promise.resolve(mySocket || processWebSocket);
return promiseWebSocket.then(incr);
}
if (promiseWebSocket) {
return promiseWebSocket.then(incr);
}
promiseWebSocket = new verbs.Promise(function (resolve) {
//setTimeout(function () { //setTimeout(function () {
var WebSocket = require('ws'); var WebSocket = require('ws');
var ws = new WebSocket('ws+unix:' + opts.sock); // TODO how to include path and such?
// http://unix:/absolute/path/to/unix.socket:/request/path
// https://github.com/websockets/ws/issues/236
var address = require('url').parse('ws+unix:' + opts.sock);
var ws;
address.pathname = opts.sock;
address.path = '/' + require('cluster').worker.id + '/' + opts.ipcKey;
address.query = {
ipcKey: opts.ipcKey
, ipc_key: opts.ipcKey
, worker_id: require('cluster').worker.id
};
address.path += '?' + require('querystring').stringify(address.query);
ws = new WebSocket(address);
ws.on('error', function (err) { ws.on('error', function (err) {
console.error('[ERROR] ws connection failed, retrying'); console.error('[ERROR] ws connection failed, retrying');
console.error(err); console.error(err.stack);
function retry() { function retry() {
// TODO eventually throw up
setTimeout(function () { setTimeout(function () {
getConnection(opts).then(resolve, retry); getConnection(opts, verbs, mySocket).then(resolve, retry);
}, 100 + (Math.random() * 250)); }, 100 + (Math.random() * 250));
} }
if (!opts.connect && ('ENOENT' === err.code || 'ECONNREFUSED' === err.code)) { if (!opts.connect && ('ENOENT' === err.code || 'ECONNREFUSED' === err.code)) {
console.log('[NO SERVER] attempting to create a server #######################'); console.log('[NO SERVER] attempting to create a server #######################');
return startServer(opts).then(function (client) { return startServer(opts, verbs).then(function (client) {
// ws.masterClient = client; // ws.masterClient = client;
resolve({ masterClient: client }); resolve({ masterClient: client });
}, function (err) { }, function (err) {
@ -62,66 +81,266 @@ function getConnection(opts) {
}); });
ws.on('open', function () { ws.on('open', function () {
resolve(ws); resolve(ws);
}); });
//}, 100 + (Math.random() * 250)); //}, 100 + (Math.random() * 250));
}); }).then(function (ws) {
} if (!processWebSocket) {
processWebSocket = ws;
function create(opts) {
if (!opts.sock) {
opts.sock = opts.filename + '.sock';
}
var promise;
var numcpus = require('os').cpus().length;
if (opts.standalone || (1 === numcpus && !opts.serve && !opts.connect)) {
return require('./wrapper').create(opts);
}
function retryServe() {
return startServer(opts).then(function (client) {
// ws.masterClient = client;
return { masterClient: client };
}, function (err) {
console.error('[ERROR] retryServe()');
console.error(err);
retryServe();
});
}
if (opts.serve) {
promise = retryServe();
} else {
promise = getConnection(opts);
}
/*
if (opts.connect) {
}
*/
// TODO maybe use HTTP POST instead?
return promise.then(function (ws) {
if (ws.masterClient) {
return ws.masterClient;
} }
var db = {}; return ws;
var proto = sqlite3real.Database.prototype; });
var messages = [];
function init(opts) { return promiseWebSocket.then(incr);
return new PromiseA(function (resolve, reject) { }
var id = Math.random();
module.exports.createClientFactory = function (conf, verbs, _socket) {
// TODO distinguish between defaults and mandates
if (!conf.ipcKey) {
throw new Error("[E_NO_IPCKEY] Your config must specify an ipcKey.");
}
return {
create: function (opts, _s) {
var copy = {};
if (_socket && _s) {
throw new Error("[E_USR_SOCKET] Your parent has decided that you may not choose your own SOCKET. Don't get mad at me, take it up with them.");
}
if (opts.dirname && conf.dirname) {
throw new Error("[E_USR_TENANT] Your parent has decided that you may not choose your own TENANT. Don't get mad at me, take it up with them.");
}
if (opts.tenant && conf.tenant) {
throw new Error("[E_USR_TENANT] Your parent has decided that you may not choose your own TENANT. Don't get mad at me, take it up with them.");
}
if (opts.prefix && conf.prefix) {
throw new Error("[E_USR_PREFIX] Your parent has decided that you may not choose your own PREFIX. Don't get mad at me, take it up with them.");
}
if (opts.suffix && conf.suffix) {
throw new Error("[E_USR_SUFFIX] Your parent has decided that you may not choose your own SUFFIX. Don't get mad at me, take it up with them.");
}
if (opts.ext && conf.ext) {
throw new Error("[E_USR_EXT] Your parent has decided that you may not choose your own EXT. Don't get mad at me, take it up with them.");
}
if (opts.serve) {
throw new Error("[E_USR_SERVE] Your parent have forbidden you to SERVE. Don't get mad at me, take it up with them.");
}
if (opts.sock && conf.sock) {
throw new Error("[E_USR_SERVE] Your parent have forbidden you to choose your own SOCK. Don't get mad at me, take it up with them.");
}
Object.keys(conf).forEach(function (key) {
copy[key] = conf[key];
});
Object.keys(opts).forEach(function (key) {
copy[key] = opts[key];
});
if (!verbs) {
verbs = {
Promise: null
};
}
if (!verbs.Promise) {
verbs.Promise = require('bluebird');
}
copy.connect = true;
copy.sock = conf.sock;
copy.tenant = conf.tenant;
copy.ipcKey = conf.ipcKey;
return module.exports.create(copy, verbs, _socket || _s || processWebSocket);
}
};
};
module.exports.create = function (opts, verbs, mySocket) {
if (!verbs) {
verbs = {};
}
var PromiseA = verbs && verbs.Promise || require('bluebird');
verbs.Promise = PromiseA;
// TODO iterate over the prototype
// translate request / response
var sqlite3real = require('sqlite3');
if (!mySocket) {
mySocket = processWebSocket;
}
function create(opts) {
if (!opts.tenant) {
opts.tenant = "";
}
if (!opts.subtenant) {
opts.subtenant = "";
}
if (!opts.prefix) {
opts.prefix = '';
}
if (!opts.suffix) {
opts.suffix = '';
}
if (!opts.ext) {
opts.ext = '.sqlite3'; // also .sqlcipher
}
if (!opts.dbname) {
throw new Error("Please specify opts.dbname as the name of the database");
}
var promise;
var numcpus = require('os').cpus().length;
// if we're only on one core, use the lib directly, no socket
if (opts.standalone || (1 === numcpus && !opts.serve && !opts.connect)) {
return require('./wrapper').create(opts, verbs);
}
function retryServe() {
return startServer(opts, verbs).then(function (client) {
// ws.masterClient = client;
return { masterClient: client };
}, function (err) {
console.error('[ERROR] retryServe()');
console.error(err);
retryServe();
});
}
if (!opts.sock) {
throw new Error("Please specify opts.sock as the path to the master socket. '/tmp/sqlite3-cluster' would do nicely.");
}
if (opts.serve) {
promise = retryServe(opts);
} else {
promise = getConnection(opts, verbs, mySocket).then(function (socket) {
mySocket = socket;
return mySocket;
});
}
// TODO maybe use HTTP POST instead?
return promise.then(function (ws) {
if (ws.masterClient) {
// for the server
return ws.masterClient;
}
var db = {};
var proto = sqlite3real.Database.prototype;
var messages = [];
var idprefix = require('crypto').randomBytes(12).toString('base64');
var idcount = 0;
function genId() {
idcount += 1;
return idprefix + idcount;
}
function init(opts) {
return new PromiseA(function (resolve, reject) {
// TODO needs to reject by a timeout
var id = genId();
ws.send(JSON.stringify({
type: 'init'
, args: [opts]
, func: 'init'
, dirname: opts.dirname
, prefix: opts.prefix
, subtenant: opts.subtenant
, tenant: opts.tenant
, dbname: opts.dbname
, suffix: opts.suffix
, ext: opts.ext
, id: id
}));
function onMessage(data) {
var cmd;
if (
(data.dbname !== opts.dbname)
|| (data.dirname !== opts.dirname)
|| (data.prefix !== opts.prefix)
|| (data.subtenant !== opts.subtenant)
|| (data.tenant !== opts.tenant)
|| (data.suffix !== opts.suffix)
|| (data.ext !== opts.ext)
) {
return reject(new Error("suxors to rejexors"));
}
try {
cmd = JSON.parse(data.toString('utf8'));
} catch(e) {
console.error('[ERROR] in client, from sql server parse json');
console.error(e);
console.error(data);
console.error();
// ignore this message, it came out of order
return reject(new Error("suxors to rejexors"));
}
if (cmd.id !== id) {
// ignore this message, it came out of order
return;
}
if (cmd.self) {
cmd.args = [db];
}
messages.splice(messages.indexOf(onMessage), 1);
if ('error' === cmd.type) {
reject(cmd.args[0]);
return;
}
//console.log('RESOLVING INIT');
resolve(cmd.args[0]);
return;
}
messages.push(onMessage);
});
}
function rpcThunk(fname, args) {
var id;
var cb;
if ('function' === typeof args[args.length - 1]) {
id = genId();
cb = args.pop();
}
ws.send(JSON.stringify({ ws.send(JSON.stringify({
type: 'init' type: 'rpc'
, args: [opts] , func: fname
, func: 'init' , args: args
, filename: opts.filename , dirname: opts.dirname
, prefix: opts.prefix
, subtenant: opts.subtenant
, tenant: opts.tenant
, dbname: opts.dbname
, suffix: opts.suffix
, ext: opts.ext
, id: id , id: id
})); }));
if (!cb) {
return;
}
function onMessage(data) { function onMessage(data) {
var cmd; var cmd;
@ -138,119 +357,74 @@ function create(opts) {
} }
if (cmd.id !== id) { if (cmd.id !== id) {
// ignore this message, it probably came out of order
return; return;
} }
if (cmd.self) { if (cmd.self) {
cmd.args = [db]; cmd.args = [db];
} }
//console.log('RESOLVING RPC', cmd.this, cmd.args);
cb.apply(cmd.this, cmd.args);
messages.splice(messages.indexOf(onMessage), 1); if ('on' !== fname) {
var index = messages.indexOf(onMessage);
if ('error' === cmd.type) { messages.splice(index, 1);
reject(cmd.args[0]);
return;
} }
resolve(cmd.args[0]);
} }
messages.push(onMessage); messages.push(onMessage);
});
}
function rpcThunk(fname, args) {
var id;
var cb;
if ('function' === typeof args[args.length - 1]) {
id = Math.random();
cb = args.pop();
} }
ws.send(JSON.stringify({ db.sanitize = require('./wrapper').sanitize;
type: 'rpc' db.escape = require('./wrapper').escape;
, func: fname
, args: args
, filename: opts.filename
, id: id
}));
if (!cb) { // TODO get methods from server (cluster-store does this)
return; // instead of using the prototype
} Object.keys(sqlite3real.Database.prototype).forEach(function (key) {
function onMessage(data) { if ('function' === typeof proto[key]) {
var cmd; db[key] = function () {
rpcThunk(key, Array.prototype.slice.call(arguments));
try { };
cmd = JSON.parse(data.toString('utf8'));
} catch(e) {
console.error('[ERROR] in client, from sql server parse json');
console.error(e);
console.error(data);
console.error();
//ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } }));
return;
}
if (cmd.id !== id) {
return;
}
if (cmd.self) {
cmd.args = [db];
}
cb.apply(cmd.this, cmd.args);
if ('on' !== fname) {
var index = messages.indexOf(onMessage);
messages.splice(index, 1);
}
}
messages.push(onMessage);
}
db.sanitize = require('./wrapper').sanitize;
db.escape = require('./wrapper').escape;
// TODO get methods from server (cluster-store does this)
// instead of using the prototype
Object.keys(sqlite3real.Database.prototype).forEach(function (key) {
if ('function' === typeof proto[key]) {
db[key] = function () {
rpcThunk(key, Array.prototype.slice.call(arguments));
};
}
});
db.init = init;
ws.on('message', function (data) {
messages.forEach(function (fn) {
try {
fn(data);
} catch(e) {
console.error("[ERROR] ws.on('message', fn) (multi-callback)");
console.error(e);
// ignore
} }
}); });
db.init = init;
ws.on('message', function (data) {
messages.forEach(function (fn) {
try {
fn(data);
} catch(e) {
console.error("[ERROR] ws.on('message', fn) (multi-callback)");
console.error(e);
// ignore
}
});
});
// serialize
// parallel
db.serialize = db.parallel = function () {
throw new Error('NOT IMPLEMENTED in SQLITE3-remote');
};
db.close = function (fn) {
ws.__count -= 1;
if (!ws.__count) {
// waiting for https://github.com/websockets/ws/issues/613 to land
// 1000 means 'normal' https://github.com/websockets/ws/blob/master/lib/ErrorCodes.js
ws.close(1000, null, fn);
}
};
return db;
}); });
}
// serialize return create(opts);
// parallel };
db.serialize = db.parallel = function () {
throw new Error('NOT IMPLEMENTED in SQLITE3-remote');
};
return db;
});
}
module.exports.sanitize = require('./wrapper').sanitize; module.exports.sanitize = require('./wrapper').sanitize;
module.exports.escape = require('./wrapper').escape; module.exports.escape = require('./wrapper').escape;
module.exports.create = create;

109
server.js
View File

@ -1,52 +1,72 @@
'use strict'; 'use strict';
/*global Promise*/
var PromiseA = Promise; var PromiseA = require('bluebird').Promise;
try {
PromiseA = require('bluebird').Promise;
} catch(e) {
console.warn("For better Promise support please use bluebird");
}
var wsses = {}; var wsses = {};
function createApp(server, options) { function createApp(servers, options) {
if (wsses[options.sock]) {
if (wsses[options.filename]) { return PromiseA.resolve(wsses[options.sock]);
return PromiseA.resolve(wsses[options.filename]);
} }
return require('./wrapper').create(options).then(function (db) {
var url = require('url'); var url = require('url');
//var express = require('express'); //var express = require('express');
//var app = express(); //var app = express();
var wss = server.wss; var wss = servers.wss;
var server = servers.server;
function app(req, res) { function app(req, res) {
res.end('NOT IMPLEMENTED'); res.end('NOT IMPLEMENTED');
}
wss.on('connection', function (ws) {
if (!wss.__count) {
wss.__count = 0;
}
wss.__count += 1;
var location = url.parse(ws.upgradeReq.url, true);
// you might use location.query.access_token to authenticate or share sessions
// or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312
if (!options.ipcKey) {
console.warn("[SECURITY] please include { ipcKey: crypto.randomBytes(16).toString('base64') }"
+ " in your options and pass it from master to worker processes with worker.send()");
ws._authorized = true;
} else {
ws._authorized = (options.ipcKey === (location.query.ipcKey || location.query.ipc_key));
} }
wss.on('connection', function (ws) { if (!ws._authorized) {
ws.send(JSON.stringify({ error: { message: "Unauthorized: ipc_key does not match", code: 'E_UNAUTHORIZED_IPCKEY' } }));
ws.close();
return;
}
var location = url.parse(ws.upgradeReq.url, true); ws.on('close', function () {
// you might use location.query.access_token to authenticate or share sessions wss.__count -= 1;
// or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312 if (!wss.__count) {
wss.close();
server.close();
}
});
ws.on('message', function (buffer) {
var cmd;
var promise;
ws.__session_id = location.query.session_id || Math.random(); try {
cmd = JSON.parse(buffer.toString('utf8'));
} catch(e) {
console.error('[ERROR] parse json');
console.error(e);
console.error(buffer);
console.error();
ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } }));
return;
}
ws.on('message', function (buffer) { // caching and create logic happens in the wrapper stored here below
var cmd; promise = require('./wrapper').create(options, cmd).then(function (db) {
try {
cmd = JSON.parse(buffer.toString('utf8'));
} catch(e) {
console.error('[ERROR] parse json');
console.error(e);
console.error(buffer);
console.error();
ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } }));
return;
}
switch(cmd.type) { switch(cmd.type) {
case 'init': case 'init':
@ -59,6 +79,7 @@ function createApp(server, options) {
myself = true; myself = true;
} }
//console.log('[INIT HAPPENING]');
ws.send(JSON.stringify({ ws.send(JSON.stringify({
id: cmd.id id: cmd.id
, self: myself , self: myself
@ -70,6 +91,7 @@ function createApp(server, options) {
case 'rpc': case 'rpc':
if (!db._initialized) { if (!db._initialized) {
//console.log('[RPC NOT HAPPENING]');
ws.send(JSON.stringify({ ws.send(JSON.stringify({
type: 'error' type: 'error'
, id: cmd.id , id: cmd.id
@ -88,6 +110,7 @@ function createApp(server, options) {
myself = true; myself = true;
} }
//console.log('[RPC HAPPENING]', args, cmd.id);
ws.send(JSON.stringify({ ws.send(JSON.stringify({
this: (!err && this !== global) ? this : {} this: (!err && this !== global) ? this : {}
, args: args , args: args
@ -106,15 +129,13 @@ function createApp(server, options) {
} }
}); });
ws.send(JSON.stringify({ type: 'session', value: ws.__session_id }));
}); });
app.masterClient = db;
wsses[options.filename] = app;
return app;
}); });
//app.masterClient = db;
wsses[options.sock] = app;
return PromiseA.resolve(app);
} }
function create(options) { function create(options) {
@ -136,7 +157,7 @@ function create(options) {
ps.push(createApp({ server: server, wss: wss }, options).then(function (app) { ps.push(createApp({ server: server, wss: wss }, options).then(function (app) {
server.on('request', app); server.on('request', app);
return { masterClient: app.masterClient }; return { masterClient: app.masterClient || true };
})); }));
return PromiseA.all(ps).then(function (results) { return PromiseA.all(ps).then(function (results) {

56
test-cluster-master.js Normal file
View File

@ -0,0 +1,56 @@
'use strict';
var cluster = require('cluster');
var minCores = 2;
var numCores = Math.max(minCores, require('os').cpus().length);
var i;
function run(connect, ipcKey) {
var sqlite3 = require('./cluster');
return sqlite3.create({
bits: 128
, dirname: '/tmp/'
, prefix: 'foobar.'
, dbname: 'cluster'
, suffix: '.test'
, ext: '.sqlcipher'
, verbose: null
, standalone: null
, serve: !connect
, connect: connect
, sock: '/tmp/foobar.sqlite3-cluster.test.sock'
, ipcKey: ipcKey
});
}
var ipcKey = require('crypto').randomBytes(16).toString('hex');
// not a bad idea to setup the master before forking the workers
run(false, ipcKey).then(function () {
var w;
function setupWorker(w) {
function sendKey() {
w.send({ ipcKey: ipcKey });
}
w.on('online', sendKey);
}
for (i = 1; i <= numCores; i += 1) {
w = cluster.fork();
setupWorker(w);
}
});
process.on('beforeExit', function () {
console.log("[MASTER] I've got nothing left to live for... ugh... death is upon me...");
});
// The native Promise implementation ignores errors because... dumbness???
process.on('unhandledRejection', function (err) {
console.error('Unhandled Promise Rejection');
console.error(err);
console.error(err.stack);
process.exit(1);
});

122
test-cluster-worker.js Normal file
View File

@ -0,0 +1,122 @@
'use strict';
var cluster = require('cluster');
function testSelect(client) {
var PromiseA = require('bluebird');
return new PromiseA(function (resolve, reject) {
client.run('CREATE TABLE IF NOT EXISTS meta (version TEXT)', function (err) {
if (err) {
console.error('[ERROR]', cluster.isMaster && '0' || cluster.worker.id);
console.error(err.stack);
reject(err);
return;
}
client.get("SELECT version FROM meta", [], function (err, result) {
if (err) {
console.error('[ERROR] create table', cluster.isMaster && '0' || cluster.worker.id);
console.error(err);
reject(err);
return;
}
console.log('[this] Worker #', cluster.isMaster && '0' || cluster.worker.id);
console.log(this);
console.log('[result] Worker #', cluster.isMaster && '0' || cluster.worker.id);
console.log(result);
resolve(client);
});
});
});
}
function init(ipcKey) {
var sqlite3 = require('./cluster');
return sqlite3.create({
bits: 128
, dirname: '/tmp/'
, prefix: 'foobar.'
, dbname: 'cluster'
, suffix: '.test'
, ext: '.sqlcipher'
, verbose: null
, standalone: null
, serve: null
, connect: true
, sock: '/tmp/foobar.sqlite3-cluster.test.sock'
, ipcKey: ipcKey
}).then(function (client) {
//console.log('[INIT] begin');
return client.init({ algorithm: 'aes', bits: 128, key: '00000000000000000000000000000000' }).then(function (args) {
//console.log('[INIT]', args);
return client;
});
});
}
function run(connect, ipcKey) {
var sqlite3 = require('./cluster');
return sqlite3.create({
bits: 128
, dirname: '/tmp/'
, prefix: 'foobar.'
, dbname: 'cluster'
, suffix: '.test'
, ext: '.sqlcipher'
, verbose: null
, standalone: null
, serve: !connect
, connect: connect
, sock: '/tmp/foobar.sqlite3-cluster.test.sock'
, ipcKey: ipcKey
});//.then(testSelect);
}
function onMessage(msg) {
function loseTheWillToLive() {
process.removeListener('message', onMessage);
// child processes do not exit when their event loop is empty
process.nextTick(function () {
process.exit(0);
});
}
console.log('New Worker', cluster.worker.id, msg);
if (1 === cluster.worker.id) {
init(msg.ipcKey).then(testSelect).then(function (client) {
console.log('init worker closing...');
setTimeout(function () {
client.close(loseTheWillToLive);
loseTheWillToLive();
}, 1000);
// waiting for https://github.com/websockets/ws/issues/613 to land
});
} else {
setTimeout(function () {
run(true, msg.ipcKey).then(testSelect).then(function (client) {
console.log('other working closing...');
client.close(loseTheWillToLive);
loseTheWillToLive();
});
}, 100);
}
}
process.on('message', onMessage);
// The native Promise implementation ignores errors because... dumbness???
process.on('beforeExit', function () {
console.log("[WORKER] I've got nothing left to do");
});
process.on('unhandledRejection', function (err) {
console.error('Unhandled Promise Rejection');
console.error(err);
console.error(err.stack);
process.exit(1);
});

View File

@ -1,95 +1,8 @@
'use strict';
var cluster = require('cluster'); var cluster = require('cluster');
//var numCores = 2;
var numCores = require('os').cpus().length;
var i;
function testSelect(client) {
return client.run('CREATE TABLE IF NOT EXISTS meta (version TEXT)', function (err) {
if (err) {
console.error('[ERROR] create table', cluster.isMaster && '0' || cluster.worker.id);
console.error(err);
return;
}
return client.get("SELECT version FROM meta", [], function (err, result) {
if (err) {
console.error('[ERROR]', cluster.isMaster && '0' || cluster.worker.id);
console.error(err);
return;
}
console.log('[this]', cluster.isMaster && '0' || cluster.worker.id);
console.log(this);
console.log('[result]', cluster.isMaster && '0' || cluster.worker.id);
console.log(result);
});
});
}
function init() {
var sqlite3 = require('./cluster');
return sqlite3.create({
bits: 128
, filename: '/tmp/test.cluster.sqlcipher'
, verbose: null
, standalone: null
, serve: null
, connect: null
}).then(function (client) {
console.log('[INIT] begin');
return client.init({ bits: 128, key: '00000000000000000000000000000000' });
}).then(testSelect, function (err) {
console.error('[ERROR]');
console.error(err);
}).then(function () {
console.log('success');
}, function (err) {
console.error('[ERROR 2]');
console.error(err);
});
}
function run() {
var sqlite3 = require('./cluster');
return sqlite3.create({
bits: 128
, filename: '/tmp/test.cluster.sqlcipher'
, verbose: null
, standalone: null
, serve: null
, connect: null
});//.then(testSelect);
}
if (cluster.isMaster) { if (cluster.isMaster) {
// not a bad idea to setup the master before forking the workers require('./test-cluster-master');
run().then(function () { }
for (i = 1; i <= numCores; i += 1) { else {
cluster.fork(); require('./test-cluster-worker');
}
});
} else {
if (1 === cluster.worker.id) {
init().then(testSelect);
return;
} else {
setTimeout(function () {
run().then(testSelect);
}, 100);
}
} }
// The native Promise implementation ignores errors because... dumbness???
process.on('unhandledPromiseRejection', function (err) {
console.error('Unhandled Promise Rejection');
console.error(err);
console.error(err.stack);
process.exit(1);
});

View File

@ -2,16 +2,23 @@
function run() { function run() {
var sqlite3 = require('./standalone'); var sqlite3 = require('./standalone');
var promise;
sqlite3.create({ promise = sqlite3.create({
key: '00000000000000000000000000000000' key: '00000000000000000000000000000000'
, bits: 128 , bits: 128
, filename: '/tmp/test.cluster.sqlcipher' , dirname: '/tmp/'
, prefix: 'foobar.'
, dbname: 'standalone'
, suffix: '.test'
, ext: '.sqlcipher'
, verbose: null , verbose: null
, standalone: true , standalone: true
, serve: null , serve: null
, connect: null , connect: null
}).then(function (client) { });
promise.then(function (client) {
client.all("SELECT ?", ['Hello World!'], function (err, result) { client.all("SELECT ?", ['Hello World!'], function (err, result) {
if (err) { if (err) {
console.error('[ERROR] standalone'); console.error('[ERROR] standalone');
@ -31,7 +38,7 @@ function run() {
run(); run();
// The native Promise implementation ignores errors because... dumbness??? // The native Promise implementation ignores errors because... dumbness???
process.on('unhandledPromiseRejection', function (err) { process.on('unhandledRejection', function (err) {
console.error('Unhandled Promise Rejection'); console.error('Unhandled Promise Rejection');
console.error(err); console.error(err);
console.error(err.stack); console.error(err.stack);

View File

@ -1,12 +1,5 @@
'use strict'; 'use strict';
/*global Promise*/
var PromiseA = Promise;
try {
PromiseA = require('bluebird').Promise;
} catch(e) {
console.warn("For better Promise support please use bluebird");
}
var sqlite3 = require('sqlite3'); var sqlite3 = require('sqlite3');
var dbs = {}; var dbs = {};
@ -14,8 +7,12 @@ function sanitize(str) {
return String(str).replace("'", "''"); return String(str).replace("'", "''");
} }
function create(opts) { function create(opts, verbs) {
if (!verbs) {
verbs = {};
}
var db; var db;
var PromiseA = verbs.Promise || require('bluebird');
if (!opts) { if (!opts) {
opts = {}; opts = {};
@ -25,11 +22,37 @@ function create(opts) {
sqlite3.verbose(); sqlite3.verbose();
} }
if (!dbs[opts.filename]) { // TODO expire unused dbs from cache
dbs[opts.filename] = new sqlite3.Database(opts.filename); var dbname = "";
if (opts.dirname) {
dbname += opts.dirname;
}
if (opts.prefix) {
dbname += opts.prefix;
}
if (opts.subtenant) {
dbname += opts.subtenant + '.';
}
if (opts.tenant) {
dbname += opts.tenant + '.';
}
if (opts.dbname) {
dbname += opts.dbname;
}
if (opts.suffix) {
dbname += opts.suffix;
}
if (opts.ext) {
dbname += opts.ext;
} }
db = dbs[opts.filename]; if (dbs[dbname]) {
return PromiseA.resolve(dbs[dbname]);
}
db = new sqlite3.Database(dbname);
// dbs[dbname] = db //
db.sanitize = sanitize; db.sanitize = sanitize;
db.escape = sanitize; db.escape = sanitize;
@ -43,6 +66,7 @@ function create(opts) {
return new PromiseA(function (resolve, reject) { return new PromiseA(function (resolve, reject) {
if (db._initialized) { if (db._initialized) {
dbs[dbname] = db;
resolve(db); resolve(db);
return; return;
} }
@ -51,13 +75,13 @@ function create(opts) {
if (!bits) { if (!bits) {
db._initialized = true; db._initialized = true;
} }
dbs[dbname] = db;
resolve(db); resolve(db);
return; return;
} }
// TODO test key length // TODO test key length
db._initialized = true;
db.serialize(function () { db.serialize(function () {
var setup = []; var setup = [];
@ -83,13 +107,16 @@ function create(opts) {
PromiseA.all(setup).then(function () { PromiseA.all(setup).then(function () {
// restore original functions // restore original functions
db._initialized = true;
dbs[dbname] = db;
resolve(db); resolve(db);
}, reject); }, reject);
}); });
}); });
}; };
return db.init(opts); dbs[dbname] = db.init(opts);
return dbs[dbname];
} }
module.exports.sanitize = sanitize; module.exports.sanitize = sanitize;