From 3b0b2f70970ed87aa5220f956ac3211b7a35f481 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Mon, 9 Nov 2015 20:49:53 +0000 Subject: [PATCH] begin refactor for shared socket with many dbs --- client.js | 518 +++++++++++++++++++++++++++-------------- server.js | 109 +++++---- test-cluster-master.js | 56 +++++ test-cluster-worker.js | 122 ++++++++++ test-cluster.js | 95 +------- test-standalone.js | 15 +- wrapper.js | 53 +++-- 7 files changed, 644 insertions(+), 324 deletions(-) create mode 100644 test-cluster-master.js create mode 100644 test-cluster-worker.js diff --git a/client.js b/client.js index 5d03375..180fbd8 100644 --- a/client.js +++ b/client.js @@ -1,54 +1,73 @@ 'use strict'; -/*global Promise*/ +// I'm not a fan of singletons (as they are EVIL). +// However, this module is, by intent, meant to +// represent a worker process in a cluster. +// Thus, it is appropriately a singleton. +var processWebSocket; +var promiseWebSocket; -var PromiseA = Promise; -try { - PromiseA = require('bluebird').Promise; -} catch(e) { - console.warn("For better Promise support please use bluebird"); -} -// TODO iterate over the prototype -// translate request / response -var sqlite3real = require('sqlite3'); - -/* -function createConnection(opts) { - var server = ; - - return server.create(opts).then(function () { - // created and listening - }); -} -*/ - -function startServer(opts) { - return require('./server').create(opts).then(function (server) { +function startServer(opts, verbs, myServer) { + if (myServer) { + return verbs.Promise.resolve(myServer); + } + return require('./server').create(opts, verbs).then(function (server) { // this process doesn't need to connect to itself // through a socket return server.masterClient; }); } -function getConnection(opts) { - return new PromiseA(function (resolve) { +// connection is scoped per-process, nothing more +function getConnection(opts, verbs, mySocket) { + function incr(ws) { + if (!ws.__count) { + ws.__count = 0; + } + ws.__count += 1; + return ws; + } + if (mySocket || processWebSocket) { + promiseWebSocket = verbs.Promise.resolve(mySocket || processWebSocket); + return promiseWebSocket.then(incr); + } + + if (promiseWebSocket) { + return promiseWebSocket.then(incr); + } + + promiseWebSocket = new verbs.Promise(function (resolve) { //setTimeout(function () { var WebSocket = require('ws'); - var ws = new WebSocket('ws+unix:' + opts.sock); + // TODO how to include path and such? + // http://unix:/absolute/path/to/unix.socket:/request/path + // https://github.com/websockets/ws/issues/236 + var address = require('url').parse('ws+unix:' + opts.sock); + var ws; + address.pathname = opts.sock; + address.path = '/' + require('cluster').worker.id + '/' + opts.ipcKey; + address.query = { + ipcKey: opts.ipcKey + , ipc_key: opts.ipcKey + , worker_id: require('cluster').worker.id + }; + address.path += '?' + require('querystring').stringify(address.query); + ws = new WebSocket(address); ws.on('error', function (err) { console.error('[ERROR] ws connection failed, retrying'); - console.error(err); + console.error(err.stack); function retry() { + // TODO eventually throw up setTimeout(function () { - getConnection(opts).then(resolve, retry); + getConnection(opts, verbs, mySocket).then(resolve, retry); }, 100 + (Math.random() * 250)); } if (!opts.connect && ('ENOENT' === err.code || 'ECONNREFUSED' === err.code)) { console.log('[NO SERVER] attempting to create a server #######################'); - return startServer(opts).then(function (client) { + return startServer(opts, verbs).then(function (client) { // ws.masterClient = client; resolve({ masterClient: client }); }, function (err) { @@ -62,66 +81,266 @@ function getConnection(opts) { }); ws.on('open', function () { + resolve(ws); }); //}, 100 + (Math.random() * 250)); - }); -} - -function create(opts) { - if (!opts.sock) { - opts.sock = opts.filename + '.sock'; - } - - var promise; - var numcpus = require('os').cpus().length; - if (opts.standalone || (1 === numcpus && !opts.serve && !opts.connect)) { - return require('./wrapper').create(opts); - } - - function retryServe() { - return startServer(opts).then(function (client) { - // ws.masterClient = client; - return { masterClient: client }; - }, function (err) { - console.error('[ERROR] retryServe()'); - console.error(err); - retryServe(); - }); - } - - if (opts.serve) { - promise = retryServe(); - } else { - promise = getConnection(opts); - } - /* - if (opts.connect) { - } - */ - - // TODO maybe use HTTP POST instead? - return promise.then(function (ws) { - if (ws.masterClient) { - return ws.masterClient; + }).then(function (ws) { + if (!processWebSocket) { + processWebSocket = ws; } - var db = {}; - var proto = sqlite3real.Database.prototype; - var messages = []; + return ws; + }); - function init(opts) { - return new PromiseA(function (resolve, reject) { - var id = Math.random(); + return promiseWebSocket.then(incr); +} + +module.exports.createClientFactory = function (conf, verbs, _socket) { + // TODO distinguish between defaults and mandates + + if (!conf.ipcKey) { + throw new Error("[E_NO_IPCKEY] Your config must specify an ipcKey."); + } + + return { + create: function (opts, _s) { + var copy = {}; + + if (_socket && _s) { + throw new Error("[E_USR_SOCKET] Your parent has decided that you may not choose your own SOCKET. Don't get mad at me, take it up with them."); + } + if (opts.dirname && conf.dirname) { + throw new Error("[E_USR_TENANT] Your parent has decided that you may not choose your own TENANT. Don't get mad at me, take it up with them."); + } + if (opts.tenant && conf.tenant) { + throw new Error("[E_USR_TENANT] Your parent has decided that you may not choose your own TENANT. Don't get mad at me, take it up with them."); + } + if (opts.prefix && conf.prefix) { + throw new Error("[E_USR_PREFIX] Your parent has decided that you may not choose your own PREFIX. Don't get mad at me, take it up with them."); + } + if (opts.suffix && conf.suffix) { + throw new Error("[E_USR_SUFFIX] Your parent has decided that you may not choose your own SUFFIX. Don't get mad at me, take it up with them."); + } + if (opts.ext && conf.ext) { + throw new Error("[E_USR_EXT] Your parent has decided that you may not choose your own EXT. Don't get mad at me, take it up with them."); + } + if (opts.serve) { + throw new Error("[E_USR_SERVE] Your parent have forbidden you to SERVE. Don't get mad at me, take it up with them."); + } + if (opts.sock && conf.sock) { + throw new Error("[E_USR_SERVE] Your parent have forbidden you to choose your own SOCK. Don't get mad at me, take it up with them."); + } + + Object.keys(conf).forEach(function (key) { + copy[key] = conf[key]; + }); + Object.keys(opts).forEach(function (key) { + copy[key] = opts[key]; + }); + + if (!verbs) { + verbs = { + Promise: null + }; + } + if (!verbs.Promise) { + verbs.Promise = require('bluebird'); + } + + copy.connect = true; + copy.sock = conf.sock; + copy.tenant = conf.tenant; + copy.ipcKey = conf.ipcKey; + return module.exports.create(copy, verbs, _socket || _s || processWebSocket); + } + }; +}; + +module.exports.create = function (opts, verbs, mySocket) { + if (!verbs) { + verbs = {}; + } + + var PromiseA = verbs && verbs.Promise || require('bluebird'); + verbs.Promise = PromiseA; + // TODO iterate over the prototype + // translate request / response + var sqlite3real = require('sqlite3'); + + if (!mySocket) { + mySocket = processWebSocket; + } + + function create(opts) { + if (!opts.tenant) { + opts.tenant = ""; + } + if (!opts.subtenant) { + opts.subtenant = ""; + } + if (!opts.prefix) { + opts.prefix = ''; + } + if (!opts.suffix) { + opts.suffix = ''; + } + if (!opts.ext) { + opts.ext = '.sqlite3'; // also .sqlcipher + } + if (!opts.dbname) { + throw new Error("Please specify opts.dbname as the name of the database"); + } + + + var promise; + var numcpus = require('os').cpus().length; + + // if we're only on one core, use the lib directly, no socket + if (opts.standalone || (1 === numcpus && !opts.serve && !opts.connect)) { + return require('./wrapper').create(opts, verbs); + } + + function retryServe() { + return startServer(opts, verbs).then(function (client) { + // ws.masterClient = client; + return { masterClient: client }; + }, function (err) { + console.error('[ERROR] retryServe()'); + console.error(err); + retryServe(); + }); + } + + if (!opts.sock) { + throw new Error("Please specify opts.sock as the path to the master socket. '/tmp/sqlite3-cluster' would do nicely."); + } + + if (opts.serve) { + promise = retryServe(opts); + } else { + promise = getConnection(opts, verbs, mySocket).then(function (socket) { + mySocket = socket; + return mySocket; + }); + } + + // TODO maybe use HTTP POST instead? + return promise.then(function (ws) { + if (ws.masterClient) { + // for the server + return ws.masterClient; + } + + var db = {}; + var proto = sqlite3real.Database.prototype; + var messages = []; + var idprefix = require('crypto').randomBytes(12).toString('base64'); + var idcount = 0; + + function genId() { + idcount += 1; + return idprefix + idcount; + } + + function init(opts) { + return new PromiseA(function (resolve, reject) { + // TODO needs to reject by a timeout + + var id = genId(); + ws.send(JSON.stringify({ + type: 'init' + , args: [opts] + , func: 'init' + , dirname: opts.dirname + , prefix: opts.prefix + , subtenant: opts.subtenant + , tenant: opts.tenant + , dbname: opts.dbname + , suffix: opts.suffix + , ext: opts.ext + , id: id + })); + + function onMessage(data) { + var cmd; + + if ( + (data.dbname !== opts.dbname) + || (data.dirname !== opts.dirname) + || (data.prefix !== opts.prefix) + || (data.subtenant !== opts.subtenant) + || (data.tenant !== opts.tenant) + || (data.suffix !== opts.suffix) + || (data.ext !== opts.ext) + ) { + return reject(new Error("suxors to rejexors")); + } + + try { + cmd = JSON.parse(data.toString('utf8')); + } catch(e) { + console.error('[ERROR] in client, from sql server parse json'); + console.error(e); + console.error(data); + console.error(); + + // ignore this message, it came out of order + return reject(new Error("suxors to rejexors")); + } + + if (cmd.id !== id) { + // ignore this message, it came out of order + return; + } + + if (cmd.self) { + cmd.args = [db]; + } + + messages.splice(messages.indexOf(onMessage), 1); + + if ('error' === cmd.type) { + reject(cmd.args[0]); + return; + } + + //console.log('RESOLVING INIT'); + resolve(cmd.args[0]); + return; + } + + messages.push(onMessage); + }); + } + + function rpcThunk(fname, args) { + var id; + var cb; + + if ('function' === typeof args[args.length - 1]) { + id = genId(); + cb = args.pop(); + } ws.send(JSON.stringify({ - type: 'init' - , args: [opts] - , func: 'init' - , filename: opts.filename + type: 'rpc' + , func: fname + , args: args + , dirname: opts.dirname + , prefix: opts.prefix + , subtenant: opts.subtenant + , tenant: opts.tenant + , dbname: opts.dbname + , suffix: opts.suffix + , ext: opts.ext , id: id })); + if (!cb) { + return; + } + function onMessage(data) { var cmd; @@ -138,119 +357,74 @@ function create(opts) { } if (cmd.id !== id) { + // ignore this message, it probably came out of order return; } if (cmd.self) { cmd.args = [db]; } + //console.log('RESOLVING RPC', cmd.this, cmd.args); + cb.apply(cmd.this, cmd.args); - messages.splice(messages.indexOf(onMessage), 1); - - if ('error' === cmd.type) { - reject(cmd.args[0]); - return; + if ('on' !== fname) { + var index = messages.indexOf(onMessage); + messages.splice(index, 1); } - resolve(cmd.args[0]); } messages.push(onMessage); - }); - } - - function rpcThunk(fname, args) { - var id; - var cb; - - if ('function' === typeof args[args.length - 1]) { - id = Math.random(); - cb = args.pop(); } - ws.send(JSON.stringify({ - type: 'rpc' - , func: fname - , args: args - , filename: opts.filename - , id: id - })); + db.sanitize = require('./wrapper').sanitize; + db.escape = require('./wrapper').escape; - if (!cb) { - return; - } + // TODO get methods from server (cluster-store does this) + // instead of using the prototype + Object.keys(sqlite3real.Database.prototype).forEach(function (key) { - function onMessage(data) { - var cmd; - - try { - cmd = JSON.parse(data.toString('utf8')); - } catch(e) { - console.error('[ERROR] in client, from sql server parse json'); - console.error(e); - console.error(data); - console.error(); - - //ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } })); - return; - } - - if (cmd.id !== id) { - return; - } - - if (cmd.self) { - cmd.args = [db]; - } - cb.apply(cmd.this, cmd.args); - - if ('on' !== fname) { - var index = messages.indexOf(onMessage); - messages.splice(index, 1); - } - } - - messages.push(onMessage); - } - - db.sanitize = require('./wrapper').sanitize; - db.escape = require('./wrapper').escape; - - // TODO get methods from server (cluster-store does this) - // instead of using the prototype - Object.keys(sqlite3real.Database.prototype).forEach(function (key) { - - if ('function' === typeof proto[key]) { - db[key] = function () { - rpcThunk(key, Array.prototype.slice.call(arguments)); - }; - } - }); - - db.init = init; - - ws.on('message', function (data) { - messages.forEach(function (fn) { - try { - fn(data); - } catch(e) { - console.error("[ERROR] ws.on('message', fn) (multi-callback)"); - console.error(e); - // ignore + if ('function' === typeof proto[key]) { + db[key] = function () { + rpcThunk(key, Array.prototype.slice.call(arguments)); + }; } }); + + db.init = init; + + ws.on('message', function (data) { + messages.forEach(function (fn) { + try { + fn(data); + } catch(e) { + console.error("[ERROR] ws.on('message', fn) (multi-callback)"); + console.error(e); + // ignore + } + }); + }); + + // serialize + // parallel + db.serialize = db.parallel = function () { + throw new Error('NOT IMPLEMENTED in SQLITE3-remote'); + }; + + db.close = function (fn) { + ws.__count -= 1; + if (!ws.__count) { + // waiting for https://github.com/websockets/ws/issues/613 to land + // 1000 means 'normal' https://github.com/websockets/ws/blob/master/lib/ErrorCodes.js + ws.close(1000, null, fn); + } + }; + + return db; }); + } - // serialize - // parallel - db.serialize = db.parallel = function () { - throw new Error('NOT IMPLEMENTED in SQLITE3-remote'); - }; - - return db; - }); -} - + return create(opts); +}; module.exports.sanitize = require('./wrapper').sanitize; module.exports.escape = require('./wrapper').escape; -module.exports.create = create; diff --git a/server.js b/server.js index 513f7f2..9ad97a6 100644 --- a/server.js +++ b/server.js @@ -1,52 +1,72 @@ 'use strict'; -/*global Promise*/ -var PromiseA = Promise; -try { - PromiseA = require('bluebird').Promise; -} catch(e) { - console.warn("For better Promise support please use bluebird"); -} +var PromiseA = require('bluebird').Promise; var wsses = {}; -function createApp(server, options) { - - if (wsses[options.filename]) { - return PromiseA.resolve(wsses[options.filename]); +function createApp(servers, options) { + if (wsses[options.sock]) { + return PromiseA.resolve(wsses[options.sock]); } - return require('./wrapper').create(options).then(function (db) { - var url = require('url'); - //var express = require('express'); - //var app = express(); - var wss = server.wss; + var url = require('url'); + //var express = require('express'); + //var app = express(); + var wss = servers.wss; + var server = servers.server; - function app(req, res) { - res.end('NOT IMPLEMENTED'); + function app(req, res) { + res.end('NOT IMPLEMENTED'); + } + + wss.on('connection', function (ws) { + if (!wss.__count) { + wss.__count = 0; + } + wss.__count += 1; + + var location = url.parse(ws.upgradeReq.url, true); + // you might use location.query.access_token to authenticate or share sessions + // or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312 + + if (!options.ipcKey) { + console.warn("[SECURITY] please include { ipcKey: crypto.randomBytes(16).toString('base64') }" + + " in your options and pass it from master to worker processes with worker.send()"); + ws._authorized = true; + } else { + ws._authorized = (options.ipcKey === (location.query.ipcKey || location.query.ipc_key)); } - wss.on('connection', function (ws) { + if (!ws._authorized) { + ws.send(JSON.stringify({ error: { message: "Unauthorized: ipc_key does not match", code: 'E_UNAUTHORIZED_IPCKEY' } })); + ws.close(); + return; + } - var location = url.parse(ws.upgradeReq.url, true); - // you might use location.query.access_token to authenticate or share sessions - // or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312 + ws.on('close', function () { + wss.__count -= 1; + if (!wss.__count) { + wss.close(); + server.close(); + } + }); + ws.on('message', function (buffer) { + var cmd; + var promise; - ws.__session_id = location.query.session_id || Math.random(); + try { + cmd = JSON.parse(buffer.toString('utf8')); + } catch(e) { + console.error('[ERROR] parse json'); + console.error(e); + console.error(buffer); + console.error(); + ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } })); + return; + } - ws.on('message', function (buffer) { - var cmd; - - try { - cmd = JSON.parse(buffer.toString('utf8')); - } catch(e) { - console.error('[ERROR] parse json'); - console.error(e); - console.error(buffer); - console.error(); - ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } })); - return; - } + // caching and create logic happens in the wrapper stored here below + promise = require('./wrapper').create(options, cmd).then(function (db) { switch(cmd.type) { case 'init': @@ -59,6 +79,7 @@ function createApp(server, options) { myself = true; } + //console.log('[INIT HAPPENING]'); ws.send(JSON.stringify({ id: cmd.id , self: myself @@ -70,6 +91,7 @@ function createApp(server, options) { case 'rpc': if (!db._initialized) { + //console.log('[RPC NOT HAPPENING]'); ws.send(JSON.stringify({ type: 'error' , id: cmd.id @@ -88,6 +110,7 @@ function createApp(server, options) { myself = true; } + //console.log('[RPC HAPPENING]', args, cmd.id); ws.send(JSON.stringify({ this: (!err && this !== global) ? this : {} , args: args @@ -106,15 +129,13 @@ function createApp(server, options) { } }); - - ws.send(JSON.stringify({ type: 'session', value: ws.__session_id })); }); - - app.masterClient = db; - wsses[options.filename] = app; - - return app; }); + + //app.masterClient = db; + wsses[options.sock] = app; + + return PromiseA.resolve(app); } function create(options) { @@ -136,7 +157,7 @@ function create(options) { ps.push(createApp({ server: server, wss: wss }, options).then(function (app) { server.on('request', app); - return { masterClient: app.masterClient }; + return { masterClient: app.masterClient || true }; })); return PromiseA.all(ps).then(function (results) { diff --git a/test-cluster-master.js b/test-cluster-master.js new file mode 100644 index 0000000..49627eb --- /dev/null +++ b/test-cluster-master.js @@ -0,0 +1,56 @@ +'use strict'; + +var cluster = require('cluster'); +var minCores = 2; +var numCores = Math.max(minCores, require('os').cpus().length); +var i; + +function run(connect, ipcKey) { + var sqlite3 = require('./cluster'); + + return sqlite3.create({ + bits: 128 + , dirname: '/tmp/' + , prefix: 'foobar.' + , dbname: 'cluster' + , suffix: '.test' + , ext: '.sqlcipher' + , verbose: null + , standalone: null + , serve: !connect + , connect: connect + , sock: '/tmp/foobar.sqlite3-cluster.test.sock' + , ipcKey: ipcKey + }); +} + +var ipcKey = require('crypto').randomBytes(16).toString('hex'); +// not a bad idea to setup the master before forking the workers + +run(false, ipcKey).then(function () { + var w; + + function setupWorker(w) { + function sendKey() { + w.send({ ipcKey: ipcKey }); + } + w.on('online', sendKey); + } + + for (i = 1; i <= numCores; i += 1) { + w = cluster.fork(); + setupWorker(w); + } +}); + +process.on('beforeExit', function () { + console.log("[MASTER] I've got nothing left to live for... ugh... death is upon me..."); +}); +// The native Promise implementation ignores errors because... dumbness??? +process.on('unhandledRejection', function (err) { + console.error('Unhandled Promise Rejection'); + console.error(err); + console.error(err.stack); + + process.exit(1); +}); diff --git a/test-cluster-worker.js b/test-cluster-worker.js new file mode 100644 index 0000000..b3998a8 --- /dev/null +++ b/test-cluster-worker.js @@ -0,0 +1,122 @@ +'use strict'; + +var cluster = require('cluster'); + +function testSelect(client) { + var PromiseA = require('bluebird'); + + return new PromiseA(function (resolve, reject) { + client.run('CREATE TABLE IF NOT EXISTS meta (version TEXT)', function (err) { + if (err) { + console.error('[ERROR]', cluster.isMaster && '0' || cluster.worker.id); + console.error(err.stack); + reject(err); + return; + } + + client.get("SELECT version FROM meta", [], function (err, result) { + if (err) { + console.error('[ERROR] create table', cluster.isMaster && '0' || cluster.worker.id); + console.error(err); + reject(err); + return; + } + + console.log('[this] Worker #', cluster.isMaster && '0' || cluster.worker.id); + console.log(this); + + console.log('[result] Worker #', cluster.isMaster && '0' || cluster.worker.id); + console.log(result); + + resolve(client); + }); + }); + }); +} + +function init(ipcKey) { + var sqlite3 = require('./cluster'); + + return sqlite3.create({ + bits: 128 + , dirname: '/tmp/' + , prefix: 'foobar.' + , dbname: 'cluster' + , suffix: '.test' + , ext: '.sqlcipher' + , verbose: null + , standalone: null + , serve: null + , connect: true + , sock: '/tmp/foobar.sqlite3-cluster.test.sock' + , ipcKey: ipcKey + }).then(function (client) { + //console.log('[INIT] begin'); + return client.init({ algorithm: 'aes', bits: 128, key: '00000000000000000000000000000000' }).then(function (args) { + //console.log('[INIT]', args); + return client; + }); + }); +} + +function run(connect, ipcKey) { + var sqlite3 = require('./cluster'); + + return sqlite3.create({ + bits: 128 + , dirname: '/tmp/' + , prefix: 'foobar.' + , dbname: 'cluster' + , suffix: '.test' + , ext: '.sqlcipher' + , verbose: null + , standalone: null + , serve: !connect + , connect: connect + , sock: '/tmp/foobar.sqlite3-cluster.test.sock' + , ipcKey: ipcKey + });//.then(testSelect); +} + +function onMessage(msg) { + function loseTheWillToLive() { + process.removeListener('message', onMessage); + // child processes do not exit when their event loop is empty + process.nextTick(function () { + process.exit(0); + }); + } + + console.log('New Worker', cluster.worker.id, msg); + if (1 === cluster.worker.id) { + init(msg.ipcKey).then(testSelect).then(function (client) { + console.log('init worker closing...'); + setTimeout(function () { + client.close(loseTheWillToLive); + loseTheWillToLive(); + }, 1000); + // waiting for https://github.com/websockets/ws/issues/613 to land + }); + } else { + setTimeout(function () { + run(true, msg.ipcKey).then(testSelect).then(function (client) { + console.log('other working closing...'); + client.close(loseTheWillToLive); + loseTheWillToLive(); + }); + }, 100); + } +} +process.on('message', onMessage); + +// The native Promise implementation ignores errors because... dumbness??? +process.on('beforeExit', function () { + console.log("[WORKER] I've got nothing left to do"); +}); +process.on('unhandledRejection', function (err) { + console.error('Unhandled Promise Rejection'); + console.error(err); + console.error(err.stack); + + process.exit(1); +}); diff --git a/test-cluster.js b/test-cluster.js index 6148201..7797d9c 100644 --- a/test-cluster.js +++ b/test-cluster.js @@ -1,95 +1,8 @@ -'use strict'; - var cluster = require('cluster'); -//var numCores = 2; -var numCores = require('os').cpus().length; -var i; - -function testSelect(client) { - return client.run('CREATE TABLE IF NOT EXISTS meta (version TEXT)', function (err) { - if (err) { - console.error('[ERROR] create table', cluster.isMaster && '0' || cluster.worker.id); - console.error(err); - return; - } - - return client.get("SELECT version FROM meta", [], function (err, result) { - - if (err) { - console.error('[ERROR]', cluster.isMaster && '0' || cluster.worker.id); - console.error(err); - return; - } - - console.log('[this]', cluster.isMaster && '0' || cluster.worker.id); - console.log(this); - - console.log('[result]', cluster.isMaster && '0' || cluster.worker.id); - console.log(result); - }); - }); -} - -function init() { - var sqlite3 = require('./cluster'); - - return sqlite3.create({ - bits: 128 - , filename: '/tmp/test.cluster.sqlcipher' - , verbose: null - , standalone: null - , serve: null - , connect: null - }).then(function (client) { - console.log('[INIT] begin'); - return client.init({ bits: 128, key: '00000000000000000000000000000000' }); - }).then(testSelect, function (err) { - console.error('[ERROR]'); - console.error(err); - }).then(function () { - console.log('success'); - }, function (err) { - console.error('[ERROR 2]'); - console.error(err); - }); -} - -function run() { - var sqlite3 = require('./cluster'); - - return sqlite3.create({ - bits: 128 - , filename: '/tmp/test.cluster.sqlcipher' - , verbose: null - , standalone: null - , serve: null - , connect: null - });//.then(testSelect); -} if (cluster.isMaster) { - // not a bad idea to setup the master before forking the workers - run().then(function () { - for (i = 1; i <= numCores; i += 1) { - cluster.fork(); - } - }); -} else { - if (1 === cluster.worker.id) { - init().then(testSelect); - return; - } else { - setTimeout(function () { - run().then(testSelect); - }, 100); - } + require('./test-cluster-master'); +} +else { + require('./test-cluster-worker'); } - -// The native Promise implementation ignores errors because... dumbness??? -process.on('unhandledPromiseRejection', function (err) { - console.error('Unhandled Promise Rejection'); - console.error(err); - console.error(err.stack); - - process.exit(1); -}); diff --git a/test-standalone.js b/test-standalone.js index 9b87546..a11ce08 100644 --- a/test-standalone.js +++ b/test-standalone.js @@ -2,16 +2,23 @@ function run() { var sqlite3 = require('./standalone'); + var promise; - sqlite3.create({ + promise = sqlite3.create({ key: '00000000000000000000000000000000' , bits: 128 - , filename: '/tmp/test.cluster.sqlcipher' + , dirname: '/tmp/' + , prefix: 'foobar.' + , dbname: 'standalone' + , suffix: '.test' + , ext: '.sqlcipher' , verbose: null , standalone: true , serve: null , connect: null - }).then(function (client) { + }); + + promise.then(function (client) { client.all("SELECT ?", ['Hello World!'], function (err, result) { if (err) { console.error('[ERROR] standalone'); @@ -31,7 +38,7 @@ function run() { run(); // The native Promise implementation ignores errors because... dumbness??? -process.on('unhandledPromiseRejection', function (err) { +process.on('unhandledRejection', function (err) { console.error('Unhandled Promise Rejection'); console.error(err); console.error(err.stack); diff --git a/wrapper.js b/wrapper.js index 3c00d3e..6f4284a 100644 --- a/wrapper.js +++ b/wrapper.js @@ -1,12 +1,5 @@ 'use strict'; -/*global Promise*/ -var PromiseA = Promise; -try { - PromiseA = require('bluebird').Promise; -} catch(e) { - console.warn("For better Promise support please use bluebird"); -} var sqlite3 = require('sqlite3'); var dbs = {}; @@ -14,8 +7,12 @@ function sanitize(str) { return String(str).replace("'", "''"); } -function create(opts) { +function create(opts, verbs) { + if (!verbs) { + verbs = {}; + } var db; + var PromiseA = verbs.Promise || require('bluebird'); if (!opts) { opts = {}; @@ -25,11 +22,37 @@ function create(opts) { sqlite3.verbose(); } - if (!dbs[opts.filename]) { - dbs[opts.filename] = new sqlite3.Database(opts.filename); + // TODO expire unused dbs from cache + var dbname = ""; + if (opts.dirname) { + dbname += opts.dirname; + } + if (opts.prefix) { + dbname += opts.prefix; + } + if (opts.subtenant) { + dbname += opts.subtenant + '.'; + } + if (opts.tenant) { + dbname += opts.tenant + '.'; + } + if (opts.dbname) { + dbname += opts.dbname; + } + if (opts.suffix) { + dbname += opts.suffix; + } + if (opts.ext) { + dbname += opts.ext; } - db = dbs[opts.filename]; + if (dbs[dbname]) { + return PromiseA.resolve(dbs[dbname]); + } + + + db = new sqlite3.Database(dbname); + // dbs[dbname] = db // db.sanitize = sanitize; db.escape = sanitize; @@ -43,6 +66,7 @@ function create(opts) { return new PromiseA(function (resolve, reject) { if (db._initialized) { + dbs[dbname] = db; resolve(db); return; } @@ -51,13 +75,13 @@ function create(opts) { if (!bits) { db._initialized = true; } + dbs[dbname] = db; resolve(db); return; } // TODO test key length - db._initialized = true; db.serialize(function () { var setup = []; @@ -83,13 +107,16 @@ function create(opts) { PromiseA.all(setup).then(function () { // restore original functions + db._initialized = true; + dbs[dbname] = db; resolve(db); }, reject); }); }); }; - return db.init(opts); + dbs[dbname] = db.init(opts); + return dbs[dbname]; } module.exports.sanitize = sanitize;