From bbc4939a45b1042a95410b04c77c62c6015965cb Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Sun, 26 Jul 2015 01:35:22 -0600 Subject: [PATCH] initial commit --- README.md | 2 +- client.js | 189 +++++++++++++++++++++++++++++++++++++++++++++ cluster.js | 20 +++++ index.js | 3 + memstore.js | 60 ++++++++++++++ package.json | 32 ++++++++ server.js | 143 ++++++++++++++++++++++++++++++++++ standalone.js | 15 ++++ test-cluster.js | 52 +++++++++++++ test-standalone.js | 37 +++++++++ 10 files changed, 552 insertions(+), 1 deletion(-) create mode 100644 client.js create mode 100644 cluster.js create mode 100644 index.js create mode 100644 memstore.js create mode 100644 package.json create mode 100644 server.js create mode 100644 standalone.js create mode 100644 test-cluster.js create mode 100644 test-standalone.js diff --git a/README.md b/README.md index 1bfeca7..1487351 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ var store = require('memstore-cluster'); var numCores = require('os').cpus().length; var opts = { - sock: '/tmp/mydb.memstore.sock' + sock: '/tmp/memstore.sock' // If left 'null' or 'undefined' this defaults to a similar memstore // with no special logic for 'cookie' or 'expires' diff --git a/client.js b/client.js new file mode 100644 index 0000000..afbd018 --- /dev/null +++ b/client.js @@ -0,0 +1,189 @@ +'use strict'; + +/*global Promise*/ + +function startServer(opts) { + return require('./server').create(opts).then(function (server) { + // this process doesn't need to connect to itself + // through a socket + return server.masterClient; + }); +} + +function getConnection(opts) { + return new Promise(function (resolve) { + //setTimeout(function () { + var WebSocket = require('ws'); + var ws = new WebSocket('ws+unix:' + opts.sock); + + ws.on('error', function (err) { + console.error('[ERROR] ws connection failed, retrying'); + console.error(err); + + function retry() { + setTimeout(function () { + getConnection(opts).then(resolve, retry); + }, 100 + (Math.random() * 250)); + } + + if (!opts.connect && ('ENOENT' === err.code || 'ECONNREFUSED' === err.code)) { + console.log('[NO SERVER] attempting to create a server #######################'); + return startServer(opts).then(function (client) { + // ws.masterClient = client; + resolve({ masterClient: client }); + }, function () { + retry(); + }); + } + + retry(); + }); + + /* + ws.on('open', function () { + resolve(ws); + }); + */ + ws.___listeners = []; + ws.on('message', function (data) { + ws.___listeners.forEach(function (fn) { + try { + fn(data); + } catch(e) { + console.error("[ERROR] ws.on('message', fn) (multi-callback)"); + console.error(e); + // ignore + } + }); + }); + + function onInitMessage(str) { + // TODO there's no way to remove a listener... what to do? + var data; + + try { + data = JSON.parse(str); + } catch(e) { + console.error('[ERROR]'); + console.error(e); + } + + if ('methods' !== data.type) { + return; + } + + var index = ws.___listeners.indexOf(onInitMessage); + ws.___listeners.splice(index, 1); + ws._methods = data.methods; + + resolve(ws); + } + + ws.___listeners.push(onInitMessage); + //}, 100 + (Math.random() * 250)); + }); +} + +function create(opts) { + if (!opts.sock) { + opts.sock = '/tmp/memstore' + '.sock'; + } + + var promise; + var numcpus = require('os').cpus().length; + if (opts.standalone || (1 === numcpus && !opts.serve && !opts.connect)) { + return require('./memstore').create(opts); + } + + function retryServe() { + return startServer(opts).then(function (client) { + // ws.masterClient = client; + return { masterClient: client }; + }, function (err) { + console.error('[ERROR] retryServe()'); + console.error(err); + retryServe(); + }); + } + + if (opts.serve) { + promise = retryServe(); + } else { + promise = getConnection(opts); + } + + /* + if (opts.connect) { + } + */ + + // TODO maybe use HTTP POST instead? + return promise.then(function (ws) { + if (ws.masterClient) { + return ws.masterClient; + } + + var db = {}; + + function rpc(fname, args) { + var id; + var cb; + + if ('function' === typeof args[args.length - 1]) { + id = Math.random(); + cb = args.pop(); + } + + ws.send(JSON.stringify({ + type: 'rpc' + , func: fname + , args: args + , filename: opts.filename + , id: id + })); + + if (!cb) { + return; + } + + function onMessage(data) { + var cmd; + + try { + cmd = JSON.parse(data.toString('utf8')); + } catch(e) { + console.error('[ERROR] in client, from sql server parse json'); + console.error(e); + console.error(data); + console.error(); + + //ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } })); + return; + } + + if (cmd.id !== id) { + return; + } + + if ('on' !== fname) { + var index = ws.___listeners.indexOf(onMessage); + ws.___listeners.splice(index, 1); + } + + cb.apply(cmd.this, cmd.args); + } + + ws.___listeners.push(onMessage); + } + + ws._methods.forEach(function (key) { + db[key] = function () { + rpc(key, Array.prototype.slice.call(arguments)); + }; + }); + + return db; + }); +} + +module.exports.create = create; diff --git a/cluster.js b/cluster.js new file mode 100644 index 0000000..37b7bbd --- /dev/null +++ b/cluster.js @@ -0,0 +1,20 @@ +'use strict'; + +var memstore = require('./index'); + +function create(opts) { + var cluster = require('cluster'); + var numCores = require('os').cpus().length; + + if (!opts.serve && ('boolean' !== typeof opts.serve)) { + opts.serve = (numCores > 1) && cluster.isMaster; + } + + if (!opts.connect && ('boolean' !== typeof opts.connect)) { + opts.connect = (numCores > 1) && cluster.isWorker; + } + + return memstore.create(opts); +} + +module.exports.create = create; diff --git a/index.js b/index.js new file mode 100644 index 0000000..a961216 --- /dev/null +++ b/index.js @@ -0,0 +1,3 @@ +'use strict'; + +module.exports = require('./client'); diff --git a/memstore.js b/memstore.js new file mode 100644 index 0000000..8d7d26f --- /dev/null +++ b/memstore.js @@ -0,0 +1,60 @@ +'use strict'; + +/*global Promise*/ +var defer; + +if ('function' === typeof setImmediate) { + defer = setImmediate; +} else { + defer = function (fn) { process.nextTick(fn.bind.apply(fn, arguments)); }; +} + +function create(/*opts*/) { + // don't leak prototypes as implicitly non-null + var db = Object.create(null); + + return Promise.resolve({ + // required / recommended + set: function (id, data, fn) { + db[id] = data; + + if (!fn) { return; } + defer(fn, null); + } + , get: function (id, fn) { + if (!fn) { return; } + defer(fn, null, 'undefined' === typeof db[id] ? null : db[id]); + } + , touch: function (id, data, fn) { + db[id] = data; + + if (!fn) { return; } + defer(fn, null); + } + , destroy: function (id, fn) { + delete db[id]; + + if (!fn) { return; } + defer(fn, null); + } + // optional + , all: function (fn) { + if (!fn) { return; } + defer(fn, null, Object.keys(db).map(function (key) { + return db[key]; + })); + } + , length: function (fn) { + if (!fn) { return; } + defer(fn, null, Object.keys(db).length); + } + , clear: function (fn) { + db = Object.create(null); + + if (!fn) { return; } + defer(fn, null); + } + }); +} + +module.exports.create = create; diff --git a/package.json b/package.json new file mode 100644 index 0000000..9eaec96 --- /dev/null +++ b/package.json @@ -0,0 +1,32 @@ +{ + "name": "memstore-cluster", + "version": "1.0.0", + "description": "A wrapper to enable the use of a in-process store with node cluster via a socket server (i.e. for Raspberry Pi 2).", + "main": "index.js", + "scripts": { + "test": "node test-cluster.js", + "start": "node server.js" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/coolaj86/memstore-cluster.git" + }, + "keywords": [ + "store", + "session", + "connect", + "express", + "memstore", + "cluster", + "rpi2" + ], + "author": "AJ ONeal (http://coolaj86.com/)", + "license": "Apache-2.0", + "bugs": { + "url": "https://github.com/coolaj86/memstore-cluster/issues" + }, + "homepage": "https://github.com/coolaj86/memstore-cluster#readme", + "dependencies": { + "ws": "^0.7.2" + } +} diff --git a/server.js b/server.js new file mode 100644 index 0000000..7d60f65 --- /dev/null +++ b/server.js @@ -0,0 +1,143 @@ +'use strict'; +/*global Promise*/ + +var wsses = {}; + +function createApp(server, options) { + var promise; + + if (wsses[options.filename]) { + return Promise.resolve(wsses[options.filename]); + } + + if (options.store) { + promise = Promise.resolve(options.store); + } else { + promise = require('./memstore').create(options); + } + + return promise.then(function (db) { + var url = require('url'); + //var express = require('express'); + //var app = express(); + var wss = server.wss; + + function app(req, res) { + res.end('NOT IMPLEMENTED'); + } + + function getMethods(db) { + /* + var instanceMethods = Object.keys(db) + .map(function (key) { return 'function' === typeof db[key] ? key : null; }) + .filter(function (key) { return key; }) + ; + + var protoMethods = Object.keys(Object.getPrototypeOf(db)) + .map(function (key) { return 'function' === typeof Object.getPrototypeOf(db)[key] ? key : null; }) + .filter(function (key) { return key; }) + ; + + return instanceMethods.concat(protoMethods); + */ + + return [ + 'set', 'get', 'touch', 'destroy' + , 'all', 'length', 'clear' + , 'on', 'off', 'removeEventListener', 'addEventListener' + ].filter(function (key) { + if ('function' === typeof db[key]) { + return true; + } + }); + } + + wss.on('connection', function (ws) { + ws.send(JSON.stringify({ + type: 'methods' + , methods: getMethods(db) + })); + + var location = url.parse(ws.upgradeReq.url, true); + // you might use location.query.access_token to authenticate or share sessions + // or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312 + + ws.__session_id = location.query.session_id || Math.random(); + + ws.on('message', function (buffer) { + var cmd; + + try { + cmd = JSON.parse(buffer.toString('utf8')); + } catch(e) { + console.error('[ERROR] parse json'); + console.error(e); + console.error(buffer); + console.error(); + ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } })); + return; + } + + switch(cmd.type) { + case 'init': + break; + + case 'rpc': + cmd.args.push(function () { + var args = Array.prototype.slice.call(arguments); + + ws.send(JSON.stringify({ + this: this + , args: args + , id: cmd.id + })); + }); + + db[cmd.func].apply(db, cmd.args); + break; + + default: + throw new Error('UNKNOWN TYPE'); + //break; + } + + }); + + ws.send(JSON.stringify({ type: 'session', value: ws.__session_id })); + }); + + app.masterClient = db; + //wsses[options.filename] = app; + + return app; + }); +} + +function create(options) { + var server = require('http').createServer(); + var WebSocketServer = require('ws').Server; + var wss = new WebSocketServer({ server: server }); + //var port = process.env.PORT || process.argv[0] || 4080; + + var fs = require('fs'); + var ps = []; + + ps.push(new Promise(function (resolve) { + fs.unlink(options.sock, function () { + // ignore error when socket doesn't exist + + server.listen(options.sock, resolve); + }); + })); + + ps.push(createApp({ server: server, wss: wss }, options).then(function (app) { + server.on('request', app); + return { masterClient: app.masterClient }; + })); + + return Promise.all(ps).then(function (results) { + return results[1]; + }); +} + +module.exports.create = create; diff --git a/standalone.js b/standalone.js new file mode 100644 index 0000000..29c090f --- /dev/null +++ b/standalone.js @@ -0,0 +1,15 @@ +'use strict'; + +var memstore = require('./index'); + +function create(opts) { + opts.standalone = true; + + // TODO if cluster *is* used issue a warning? + // I suppose the user could be issuing a different filename for each + // ... but then they have no need to use this module, right? + + return memstore.create(opts); +} + +module.exports.create = create; diff --git a/test-cluster.js b/test-cluster.js new file mode 100644 index 0000000..d6b8ff7 --- /dev/null +++ b/test-cluster.js @@ -0,0 +1,52 @@ +'use strict'; + +var cluster = require('cluster'); +//var numCores = 2; +var numCores = require('os').cpus().length; +var id = (cluster.isMaster && '0' || cluster.worker.id).toString(); + +function run() { + var mstore = require('./cluster'); + + return mstore.create({ + standalone: null + , serve: null + , connect: null + }).then(function (store) { + store.set('foo', 'bar', function (err) { + if (err) { console.error(err); return; } + + store.get('baz', function (err, data) { + if (err) { console.error(err); return; } + console.log(id, 'should be null:', data); + }); + + store.get('foo', function (err, data) { + if (err) { console.error(err); return; } + console.log(id, 'should be bar:', data); + }); + }); + }); +} + +if (cluster.isMaster) { + // not a bad idea to setup the master before forking the workers + run().then(function () { + var i; + + for (i = 1; i <= numCores; i += 1) { + cluster.fork(); + } + }); +} else { + run(); +} + +// The native Promise implementation ignores errors because... dumbness??? +process.on('unhandledPromiseRejection', function (err) { + console.error('Unhandled Promise Rejection'); + console.error(err); + console.error(err.stack); + + process.exit(1); +}); diff --git a/test-standalone.js b/test-standalone.js new file mode 100644 index 0000000..fe9ae75 --- /dev/null +++ b/test-standalone.js @@ -0,0 +1,37 @@ +'use strict'; + +function run() { + var mstore = require('./standalone'); + + mstore.create({ + sock: '/tmp/memstore.sock' + , standalone: null + , serve: null + , connect: null + }).then(function (store) { + store.set('foo', 'bar', function (err) { + if (err) { console.error(err); return; } + + store.get('baz', function (err, data) { + if (err) { console.error(err); return; } + console.log('should be null:', data); + }); + + store.get('foo', function (err, data) { + if (err) { console.error(err); return; } + console.log('should be bar:', data); + }); + }); + }); +} + +run(); + +// The native Promise implementation ignores errors because... dumbness??? +process.on('unhandledPromiseRejection', function (err) { + console.error('Unhandled Promise Rejection'); + console.error(err); + console.error(err.stack); + + process.exit(1); +});