224 lines
6.0 KiB
JavaScript
224 lines
6.0 KiB
JavaScript
'use strict';
|
|
|
|
var PromiseA = require('bluebird').Promise;
|
|
var wsses = {};
|
|
|
|
function createApp(servers, options) {
|
|
var url = require('url');
|
|
var wss = servers.wss;
|
|
//var server = servers.server;
|
|
|
|
//var express = require('express');
|
|
//var app = express();
|
|
/*
|
|
function app(req, res) {
|
|
res.end('NOT IMPLEMENTED');
|
|
}
|
|
*/
|
|
|
|
wss.on('connection', function (ws) {
|
|
var location = url.parse(ws.upgradeReq.url, true);
|
|
// you might use location.query.access_token to authenticate or share sessions
|
|
// or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312
|
|
|
|
if (!options.ipcKey) {
|
|
console.warn("[S] [SECURITY] please include { ipcKey: crypto.randomBytes(16).toString('base64') }"
|
|
+ " in your options and pass it from master to worker processes with worker.send()");
|
|
ws._authorized = true;
|
|
} else {
|
|
ws._authorized = (options.ipcKey === (location.query.ipcKey || location.query.ipc_key));
|
|
}
|
|
|
|
if (!ws._authorized) {
|
|
ws.send(JSON.stringify({ error: { message: "Unauthorized: ipc_key does not match", code: 'E_UNAUTHORIZED_IPCKEY' } }));
|
|
ws.close();
|
|
return;
|
|
}
|
|
|
|
if (!wss.__count) {
|
|
wss.__count = 0;
|
|
}
|
|
wss.__count += 1;
|
|
|
|
function decrWs() {
|
|
wss.__count -= 1;
|
|
if (!wss.__count) {
|
|
console.log('[S] client count is zero, but server will be left open');
|
|
/*
|
|
wss.close(function () {
|
|
console.log('wss.closed');
|
|
});
|
|
server.close(function () {
|
|
console.log('server closed, but will not exit due to bug? in wss.close()');
|
|
process.exit(0);
|
|
});
|
|
*/
|
|
}
|
|
}
|
|
|
|
ws.on('error', function (err) {
|
|
console.error('[S] [WebSocket error]');
|
|
console.error(err.stack);
|
|
decrWs();
|
|
});
|
|
ws.on('close', decrWs);
|
|
ws.on('message', function (buffer) {
|
|
var cmd;
|
|
var promise;
|
|
|
|
try {
|
|
cmd = JSON.parse(buffer.toString('utf8'));
|
|
} catch(e) {
|
|
console.error('[S] [ERROR] parse json');
|
|
console.error(e.stack || e);
|
|
console.error(buffer);
|
|
console.error();
|
|
ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } }));
|
|
return;
|
|
}
|
|
|
|
//console.log('cmd');
|
|
//console.log(cmd);
|
|
// caching and create logic happens in the wrapper stored here below
|
|
promise = require('./wrapper').create(cmd && cmd.dbname && cmd || options).then(function (db) {
|
|
|
|
switch(cmd.type) {
|
|
case 'init':
|
|
//console.log('[S] init', cmd);
|
|
db[cmd.func].apply(db, cmd.args).then(function () {
|
|
var args = Array.prototype.slice.call(arguments);
|
|
var myself;
|
|
|
|
if (args[0] === db) {
|
|
args = [];
|
|
myself = true;
|
|
}
|
|
|
|
//console.log('[INIT HAPPENING]');
|
|
ws.send(JSON.stringify({
|
|
id: cmd.id
|
|
, self: myself
|
|
, args: args
|
|
//, this: this
|
|
}));
|
|
});
|
|
break;
|
|
|
|
case 'rpc':
|
|
if ('close' !== cmd.func && !db._initialized) {
|
|
//console.log('[RPC NOT HAPPENING]');
|
|
ws.send(JSON.stringify({
|
|
type: 'error'
|
|
, id: cmd.id
|
|
, args: [{ message: 'database has not been initialized', code: 'E_NO_INIT' }]
|
|
, error: { message: 'database has not been initialized', code: 'E_NO_INIT' }
|
|
}));
|
|
return;
|
|
}
|
|
|
|
cmd.args.push(function (err) {
|
|
var args = Array.prototype.slice.call(arguments);
|
|
var myself;
|
|
|
|
if (args[0] === db) {
|
|
args = [];
|
|
myself = true;
|
|
}
|
|
|
|
//console.log('[RPC HAPPENING]', args, cmd.id);
|
|
ws.send(JSON.stringify({
|
|
this: (!err && this !== global) ? this : {}
|
|
, args: args
|
|
, self: myself
|
|
, id: cmd.id
|
|
, error: err
|
|
}));
|
|
});
|
|
|
|
db[cmd.func].apply(db, cmd.args);
|
|
break;
|
|
|
|
default:
|
|
throw new Error('UNKNOWN TYPE');
|
|
//break;
|
|
}
|
|
|
|
});
|
|
});
|
|
});
|
|
|
|
// wsses[options.sock] = app;
|
|
return PromiseA.resolve();
|
|
}
|
|
|
|
function newSocket(options) {
|
|
if (wsses[options.sock]) {
|
|
return PromiseA.resolve(wsses[options.sock]);
|
|
}
|
|
|
|
wsses[options.sock] = new PromiseA(function (resolve) {
|
|
var fs = require('fs');
|
|
fs.unlink(options.sock, function () {
|
|
var server = require('http').createServer();
|
|
// ignore error when socket doesn't exist
|
|
server.listen(options.sock, function () {
|
|
resolve(server);
|
|
});
|
|
});
|
|
}).then(function (server) {
|
|
var WebSocketServer = require('ws').Server;
|
|
var servers = {
|
|
server: require('http').createServer()
|
|
, wss: new WebSocketServer({ server: server })
|
|
};
|
|
|
|
return createApp(servers, options).then(function (/*app*/) {
|
|
// server.on('request', app);
|
|
wsses[options.sock] = servers;
|
|
|
|
return wsses[options.sock];
|
|
});
|
|
});
|
|
|
|
return wsses[options.sock];
|
|
}
|
|
|
|
function createMasterClient(options) {
|
|
return require('./wrapper').create(options, null).then(function (db) {
|
|
return db;
|
|
});
|
|
}
|
|
|
|
function createServer(options) {
|
|
if (!options.sock) {
|
|
throw new Error("Please provide options.sock as the socket to serve from");
|
|
}
|
|
options.server = options.sock;
|
|
|
|
return newSocket(options).then(function () {
|
|
var result = {};
|
|
|
|
Object.keys(wsses[options.sock]).forEach(function (key) {
|
|
result[key] = wsses[options.sock][key];
|
|
});
|
|
});
|
|
}
|
|
|
|
function create(options) {
|
|
return createServer(options).then(function (result) {
|
|
if (!options.dbname) {
|
|
return result;
|
|
}
|
|
|
|
return createMasterClient(options).then(function (db) {
|
|
result.masterClient = db;
|
|
|
|
return result;
|
|
});
|
|
});
|
|
}
|
|
|
|
module.exports.create = create;
|
|
module.exports.createMasterClient = createMasterClient;
|
|
module.exports.createServer = createServer;
|