sqlite3-cluster.js/server.js

224 lines
6.0 KiB
JavaScript

'use strict';
var PromiseA = require('bluebird').Promise;
var wsses = {};
function createApp(servers, options) {
var url = require('url');
var wss = servers.wss;
//var server = servers.server;
//var express = require('express');
//var app = express();
/*
function app(req, res) {
res.end('NOT IMPLEMENTED');
}
*/
wss.on('connection', function (ws) {
var location = url.parse(ws.upgradeReq.url, true);
// you might use location.query.access_token to authenticate or share sessions
// or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312
if (!options.ipcKey) {
console.warn("[S] [SECURITY] please include { ipcKey: crypto.randomBytes(16).toString('base64') }"
+ " in your options and pass it from master to worker processes with worker.send()");
ws._authorized = true;
} else {
ws._authorized = (options.ipcKey === (location.query.ipcKey || location.query.ipc_key));
}
if (!ws._authorized) {
ws.send(JSON.stringify({ error: { message: "Unauthorized: ipc_key does not match", code: 'E_UNAUTHORIZED_IPCKEY' } }));
ws.close();
return;
}
if (!wss.__count) {
wss.__count = 0;
}
wss.__count += 1;
function decrWs() {
wss.__count -= 1;
if (!wss.__count) {
console.log('[S] client count is zero, but server will be left open');
/*
wss.close(function () {
console.log('wss.closed');
});
server.close(function () {
console.log('server closed, but will not exit due to bug? in wss.close()');
process.exit(0);
});
*/
}
}
ws.on('error', function (err) {
console.error('[S] [WebSocket error]');
console.error(err.stack);
decrWs();
});
ws.on('close', decrWs);
ws.on('message', function (buffer) {
var cmd;
var promise;
try {
cmd = JSON.parse(buffer.toString('utf8'));
} catch(e) {
console.error('[S] [ERROR] parse json');
console.error(e.stack || e);
console.error(buffer);
console.error();
ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } }));
return;
}
//console.log('cmd');
//console.log(cmd);
// caching and create logic happens in the wrapper stored here below
promise = require('./wrapper').create(cmd && cmd.dbname && cmd || options).then(function (db) {
switch(cmd.type) {
case 'init':
//console.log('[S] init', cmd);
db[cmd.func].apply(db, cmd.args).then(function () {
var args = Array.prototype.slice.call(arguments);
var myself;
if (args[0] === db) {
args = [];
myself = true;
}
//console.log('[INIT HAPPENING]');
ws.send(JSON.stringify({
id: cmd.id
, self: myself
, args: args
//, this: this
}));
});
break;
case 'rpc':
if ('close' !== cmd.func && !db._initialized) {
//console.log('[RPC NOT HAPPENING]');
ws.send(JSON.stringify({
type: 'error'
, id: cmd.id
, args: [{ message: 'database has not been initialized', code: 'E_NO_INIT' }]
, error: { message: 'database has not been initialized', code: 'E_NO_INIT' }
}));
return;
}
cmd.args.push(function (err) {
var args = Array.prototype.slice.call(arguments);
var myself;
if (args[0] === db) {
args = [];
myself = true;
}
//console.log('[RPC HAPPENING]', args, cmd.id);
ws.send(JSON.stringify({
this: (!err && this !== global) ? this : {}
, args: args
, self: myself
, id: cmd.id
, error: err
}));
});
db[cmd.func].apply(db, cmd.args);
break;
default:
throw new Error('UNKNOWN TYPE');
//break;
}
});
});
});
// wsses[options.sock] = app;
return PromiseA.resolve();
}
function newSocket(options) {
if (wsses[options.sock]) {
return PromiseA.resolve(wsses[options.sock]);
}
wsses[options.sock] = new PromiseA(function (resolve) {
var fs = require('fs');
fs.unlink(options.sock, function () {
var server = require('http').createServer();
// ignore error when socket doesn't exist
server.listen(options.sock, function () {
resolve(server);
});
});
}).then(function (server) {
var WebSocketServer = require('ws').Server;
var servers = {
server: require('http').createServer()
, wss: new WebSocketServer({ server: server })
};
return createApp(servers, options).then(function (/*app*/) {
// server.on('request', app);
wsses[options.sock] = servers;
return wsses[options.sock];
});
});
return wsses[options.sock];
}
function createMasterClient(options) {
return require('./wrapper').create(options, null).then(function (db) {
return db;
});
}
function createServer(options) {
if (!options.sock) {
throw new Error("Please provide options.sock as the socket to serve from");
}
options.server = options.sock;
return newSocket(options).then(function () {
var result = {};
Object.keys(wsses[options.sock]).forEach(function (key) {
result[key] = wsses[options.sock][key];
});
});
}
function create(options) {
return createServer(options).then(function (result) {
if (!options.dbname) {
return result;
}
return createMasterClient(options).then(function (db) {
result.masterClient = db;
return result;
});
});
}
module.exports.create = create;
module.exports.createMasterClient = createMasterClient;
module.exports.createServer = createServer;