From 49835c1a6a265170b5dd067fbd89521741861dfb Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Mon, 3 Sep 2018 18:57:38 -0600 Subject: [PATCH] WIP refactor --- lib/client.js | 245 +++++++++++++++++++++++ lib/remote.js | 486 +++++++++++++++------------------------------ lib/sorting-hat.js | 2 +- lib/ws-readable.js | 75 +++++++ 4 files changed, 478 insertions(+), 330 deletions(-) create mode 100644 lib/client.js create mode 100644 lib/ws-readable.js diff --git a/lib/client.js b/lib/client.js new file mode 100644 index 0000000..0b80e5a --- /dev/null +++ b/lib/client.js @@ -0,0 +1,245 @@ +'use strict'; + +var sni = require('sni'); +var Packer = require('proxy-packer'); + +module.exports.create = function (handlers) { + var client = module.exports; + client.pendingCommands = {}; + client.auth = null; + client.wstunneler = null; + client.localclients = {}; + client.authenticated = false; + + var multiplexed = {}; + var stream = require('stream'); + var Duplex = stream.Duplex; + + function SingleConn(tun, streamOpts) { + // Proper duplex stream with automatic flow control (backpressure) management + if(!(this instanceof SingleConn)) { return new SingleConn(tun, streamOpts); } + Duplex.call(this, streamOpts); + } + SingleConn.create = function (opts) { + return new SingleConn(opts); + }; + + if (!handlers) { handlers = {}; } + // XXX TODO + handlers.onAuthRequest = function (authRequest) { + // XXX out of scope + client._wsTunnelRemote._sendCommand('auth', authRequest).catch(function (err) { console.error('1', err); }); + }; + handlers.onAddToken = function (jwtoken) { + // XXX out of scope + client._wsTunnelRemote._sendCommand('add_token', jwtoken) + .catch(function (err) { + console.error('failed re-adding token', jwtoken, 'after reconnect', err); + // Not sure if we should do something like remove the token here. It worked + // once or it shouldn't have stayed in the list, so it's less certain why + // it would have failed here. + }); + }; + handlers.onAck = function (body) { + var packBody = true; + client._wsTunnelRemote.sendMessage(Packer.packHeader(null, body, 'control', packBody)); + }; + handlers.onNoHandler = function (cmd) { + console.warn("[telebit] state.handlers['" + cmd[1] + "'] not set"); + console.warn(cmd[2]); + }; + // TODO + // make proxy-packer a readable stream? + // create per-connection buffer? + handlers.onNonReadable = function (/*fn*/) { + client.wstunneler.pause(); + //packerHandlers.onReadable = fn; + }; + handlers.onReadable = function () { + //packerHandlers.onReadable(); + client.wstunneler.resume(); + }; + + var authsent = false; + function sendAllTokens() { + if (client.auth) { + authsent = true; + handlers.onAuthRequest(client.auth); + } + client.sharedTokens.forEach(function (jwtoken) { + // XXX out of scope + if (client._state.debug) { console.log('[DEBUG] send token'); } + authsent = true; + handlers.onAddToken(jwtoken); + }); + } + + function hyperPeek(tun) { + var m; + var str; + if (tun.data) { + if ('http' === tun.service) { + str = tun.data.toString(); + m = str.match(/(?:^|[\r\n])Host: ([^\r\n]+)[\r\n]*/im); + tun._name = tun._hostname = (m && m[1].toLowerCase() || '').split(':')[0]; + } + else if ('https' === tun.service || 'tls' === tun.service) { + tun._name = tun._servername = sni(tun.data); + } else { + tun._name = ''; + } + } + } + + var packerHandlers = { + oncontrol: function (opts) { + var cmd, err; + try { + cmd = JSON.parse(opts.data.toString()); + } catch (err) { + // ignore + } + + if (!Array.isArray(cmd) || typeof cmd[0] !== 'number') { + console.warn('received bad command "' + opts.data.toString() + '"'); + return; + } + + if (cmd[0] < 0) { + var cb = client.pendingCommands[-cmd[0]]; + if (!cb) { + console.warn('received response for unknown request:', cmd); + } else { + cb.apply(null, cmd.slice(1)); + } + return; + } + + if (cmd[0] === 0) { + console.warn('received dis-associated error from server', cmd[1]); + if (client.connCallback) { + client.connCallback(cmd[1]); + } + return; + } + + if (cmd[1] === 'hello') { + if (client._state.debug) { console.log('[DEBUG] hello received'); } + sendAllTokens(); + if (client.connCallback) { + client.connCallback(); + } + // TODO: handle the versions and commands provided by 'hello' - isn't super important + // yet since there is only one version and set of commands. + err = null; + } else if (cmd[1] === 'grant') { + client.authenticated = true; + if (client._state.handlers[cmd[1]]) { + client._state.handlers[cmd[1]](cmd[2]); + } else { + handlers.onNoHandler(cmd); + } + return; + } else if (cmd[1] === 'access_token') { + client.authenticated = true; + if (client._state.handlers[cmd[1]]) { + client._state.handlers[cmd[1]](cmd[2]); + } else { + handlers.onNoHandler(cmd); + } + return; + } else { + err = { message: 'unknown command "'+cmd[1]+'"', code: 'E_UNKNOWN_COMMAND' }; + } + + handlers.onAck([-cmd[0], err]); + } + + , onconnection: function (tun, handledCb) { + var cid = tun._id = Packer.addrToId(tun); + if (multiplexed[cid]) { + throw new Error("[Sanity Error] a new connection can't already exist in the connection pool"); + } + + // this data should have been gathered already as part of the proxy protocol + // but if it's available again here we can double check + hyperPeek(tun); + + // the next data that comes in may be the next packet of data for this connection + // and that may happen before the detection and assignment is complete + handlers.onNonReadable(); // pause() + // TODO use readable streams instead + require(client._state.sortingHat).assign(client._state, tun, function (err, conn) { + if (err) { + err.message = err.message.replace(/:tun_id/, tun._id); + console.info("[_onConnectError] opening '" + cid + "' failed because " + err.message); + client._wsTunnelRemote.sendMessage(Packer.packHeader(tun, null, 'error')); + return; + } + //handlers.on + client.clientHandlers.add(conn, cid, tun); + if (tun.data) { conn.write(tun.data); } + handlers.onReadable(); // resume + if ('function' === typeof handledCb) { handledCb(); } + }); + } + + , onmessage: function (tun) { + var cid = tun._id = Packer.addrToId(tun); + var handled; + + hyperPeek(tun); + + handled = client.clientHandlers.write(cid, tun); + + if (!handled) { + throw new Error("No 'message' event came before 'connection' event." + + " You're probably using a different version of proxy-packer on the server than the client"); + } + } + + , onpause: function (opts) { + var cid = Packer.addrToId(opts); + if (client.localclients[cid]) { + console.log("[TunnelPause] pausing '"+cid+"', remote received", opts.data.toString(), 'of', client.localclients[cid].tunnelWritten, 'sent'); + client.localclients[cid].manualPause = true; + client.localclients[cid].pause(); + } else { + console.log('[TunnelPause] remote tried pausing finished connection', cid); + // Often we have enough latency that we've finished sending before we're told to pause, so + // don't worry about sending back errors, since we won't be sending data over anyway. + // var packBody = true; + // wsTunnelRemote.sendMessage(Packer.packHeader(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error', packBody)); + } + } + , onresume: function (opts) { + var cid = Packer.addrToId(opts); + if (client.localclients[cid]) { + console.log("[TunnelResume] resuming '"+cid+"', remote received", opts.data.toString(), 'of', client.localclients[cid].tunnelWritten, 'sent'); + client.localclients[cid].manualPause = false; + client.localclients[cid].resume(); + } else { + console.log('[TunnelResume] remote tried resuming finished connection', cid); + // var packBody = true; + // wsTunnelRemote.sendMessage(Packer.packHeader(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error', packBody)); + } + } + + , onend: function (opts) { + var cid = Packer.addrToId(opts); + //console.log("[end] '" + cid + "'"); + client.clientHandlers.closeSingle(cid); + } + , onerror: function (opts) { + var cid = Packer.addrToId(opts); + //console.log("[error] '" + cid + "'", opts.code || '', opts.message); + client.clientHandlers.closeSingle(cid); + } + }; + + + client.machine = Packer.create(packerHandlers); + client.sharedTokens = []; + + return client; +}; diff --git a/lib/remote.js b/lib/remote.js index 7261d74..fa9c8f6 100644 --- a/lib/remote.js +++ b/lib/remote.js @@ -1,6 +1,9 @@ (function () { 'use strict'; +// TODO use stream-based ws +// https://github.com/websockets/ws/issues/596 + var PromiseA; try { PromiseA = require('bluebird'); @@ -8,7 +11,6 @@ try { PromiseA = global.Promise; } var WebSocket = require('ws'); -var sni = require('sni'); var Packer = require('proxy-packer'); var os = require('os'); @@ -19,35 +21,36 @@ function timeoutPromise(duration) { } function _connect(state) { - // jshint latedef:false - var defaultHttpTimeout = (2 * 60); - var activityTimeout = state.activityTimeout || (defaultHttpTimeout - 5) * 1000; - var pongTimeout = state.pongTimeout || 10*1000; // Allow the tunnel client to be created with no token. This will prevent the connection from // being established initialy and allows the caller to use `.append` for the first token so // they can get a promise that will provide feedback about invalid tokens. - var tokens = []; - var auth; if(!state.sortingHat) { state.sortingHat = "./sorting-hat.js"; } + + var sharedPausedClients = []; + var sharedTimeoutId; + var client = require('./client').create({}); + // client.wstunneler = null; + // client.pendingCommands = {}; + // client.machine = machine + // client.auth = null; + // client.sharedTokens = []; + // client.localclients = {}; + // client.authenticated = false; + client._state = state; + if (state.token) { if ('undefined' === state.token) { throw new Error("passed string 'undefined' as token"); } - tokens.push(state.token); + client.sharedTokens.push(state.token); } - var wstunneler; - var authenticated = false; - var authsent = false; - var initialConnect = true; - - var localclients = {}; - var pausedClients = []; var clientHandlers = { - add: function (conn, cid, tun) { - localclients[cid] = conn; + _initialConnect: true + , add: function (conn, cid, tun) { + client.localclients[cid] = conn; console.info("[connect] new client '" + cid + "' for '" + tun.name + ":" + tun.serviceport + "' " + "(" + clientHandlers.count() + " clients)"); @@ -59,6 +62,7 @@ function _connect(state) { } conn.tunnelWritten = 0; + // TODO use readable conn.on('data', function onLocalData(chunk) { //var chunk = conn.read(); if (conn.tunnelClosing) { @@ -72,13 +76,13 @@ function _connect(state) { // down the data we are getting to send over. We also want to pause all active connections // if any connections are paused to make things more fair so one connection doesn't get // stuff waiting for all other connections to finish because it tried writing near the border. - var bufSize = wsHandlers.sendMessage(Packer.packHeader(tun, chunk)); + var bufSize = wsTunnelRemote.sendMessage(Packer.packHeader(tun, chunk)); // Sending 2 messages instead of copying the buffer - var bufSize2 = wsHandlers.sendMessage(chunk); - if (pausedClients.length || (bufSize + bufSize2) > 1024*1024) { + var bufSize2 = wsTunnelRemote.sendMessage(chunk); + if (sharedPausedClients.length || (bufSize + bufSize2) > 1024*1024) { // console.log('[onLocalData] paused connection', cid, 'to allow websocket to catch up'); conn.pause(); - pausedClients.push(conn); + sharedPausedClients.push(conn); } }); @@ -87,7 +91,7 @@ function _connect(state) { console.info("[onLocalEnd] connection '" + cid + "' ended, will probably close soon"); conn.tunnelClosing = true; if (!sentEnd) { - wsHandlers.sendMessage(Packer.packHeader(tun, null, 'end')); + wsTunnelRemote.sendMessage(Packer.packHeader(tun, null, 'end')); sentEnd = true; } }); @@ -95,22 +99,22 @@ function _connect(state) { console.info("[onLocalError] connection '" + cid + "' errored:", err); if (!sentEnd) { var packBody = true; - wsHandlers.sendMessage(Packer.packHeader(tun, {message: err.message, code: err.code}, 'error', packBody)); + wsTunnelRemote.sendMessage(Packer.packHeader(tun, {message: err.message, code: err.code}, 'error', packBody)); sentEnd = true; } }); conn.on('close', function onLocalClose(hadErr) { - delete localclients[cid]; + delete client.localclients[cid]; console.log('[onLocalClose] closed "' + cid + '" read:'+conn.tunnelRead+', wrote:'+conn.tunnelWritten+' (' + clientHandlers.count() + ' clients)'); if (!sentEnd) { - wsHandlers.sendMessage(Packer.packHeader(tun, null, hadErr && 'error' || 'end')); + wsTunnelRemote.sendMessage(Packer.packHeader(tun, null, hadErr && 'error' || 'end')); sentEnd = true; } }); } , write: function (cid, opts) { - var conn = localclients[cid]; + var conn = client.localclients[cid]; if (!conn) { return false; } @@ -128,12 +132,12 @@ function _connect(state) { if (!conn.remotePaused && conn.bufferSize > 1024*1024) { var packBody = true; - wsHandlers.sendMessage(Packer.packHeader(opts, conn.tunnelRead, 'pause', packBody)); + wsTunnelRemote.sendMessage(Packer.packHeader(opts, conn.tunnelRead, 'pause', packBody)); conn.remotePaused = true; conn.once('drain', function () { var packBody = true; - wsHandlers.sendMessage(Packer.packHeader(opts, conn.tunnelRead, 'resume', packBody)); + wsTunnelRemote.sendMessage(Packer.packHeader(opts, conn.tunnelRead, 'resume', packBody)); conn.remotePaused = false; }); } @@ -141,13 +145,13 @@ function _connect(state) { } , closeSingle: function (cid) { - if (!localclients[cid]) { + if (!client.localclients[cid]) { return; } console.log('[closeSingle]', cid); PromiseA.resolve().then(function () { - var conn = localclients[cid]; + var conn = client.localclients[cid]; conn.tunnelClosing = true; conn.end(); @@ -158,341 +162,160 @@ function _connect(state) { // Otherwise we want the connection to be able to finish, but we also want to impose // a time limit for it to drain, since it shouldn't have more than 1MB buffered. return new PromiseA(function (resolve) { - var timeoutId = setTimeout(resolve, 60*1000); + var myTimeoutId = setTimeout(resolve, 60*1000); conn.once('drain', function () { - clearTimeout(timeoutId); + clearTimeout(myTimeoutId); setTimeout(resolve, 500); }); }); }).then(function () { - if (localclients[cid]) { + if (client.localclients[cid]) { console.warn('[closeSingle]', cid, 'connection still present after calling `end`'); - localclients[cid].destroy(); + client.localclients[cid].destroy(); return timeoutPromise(500); } }).then(function () { - if (localclients[cid]) { + if (client.localclients[cid]) { console.error('[closeSingle]', cid, 'connection still present after calling `destroy`'); - delete localclients[cid]; + delete client.localclients[cid]; } }).catch(function (err) { console.error('[closeSingle] failed to close connection', cid, err.toString()); - delete localclients[cid]; + delete client.localclients[cid]; }); } , closeAll: function () { console.log('[closeAll]'); - Object.keys(localclients).forEach(function (cid) { + Object.keys(client.localclients).forEach(function (cid) { clientHandlers.closeSingle(cid); }); } , count: function () { - return Object.keys(localclients).length; + return Object.keys(client.localclients).length; } }; - var pendingCommands = {}; - function sendCommand(name) { - var id = Math.ceil(1e9 * Math.random()); - var cmd = [id, name].concat(Array.prototype.slice.call(arguments, 1)); - if (state.debug) { console.log('[DEBUG] command sending', cmd); } - - var packBody = true; - wsHandlers.sendMessage(Packer.packHeader(null, cmd, 'control', packBody)); - setTimeout(function () { - if (pendingCommands[id]) { - console.warn('command', name, id, 'timed out'); - pendingCommands[id]({ - message: 'response not received in time' - , code: 'E_TIMEOUT' - }); - } - }, pongTimeout); - - return new PromiseA(function (resolve, reject) { - pendingCommands[id] = function (err, result) { - delete pendingCommands[id]; - if (err) { - reject(err); - } else { - resolve(result); - } - }; - }); - } - - function sendAllTokens() { - if (auth) { - authsent = true; - sendCommand('auth', auth).catch(function (err) { console.error('1', err); }); - } - tokens.forEach(function (jwtoken) { - if (state.debug) { console.log('[DEBUG] send token'); } - authsent = true; - sendCommand('add_token', jwtoken) - .catch(function (err) { - console.error('failed re-adding token', jwtoken, 'after reconnect', err); - // Not sure if we should do something like remove the token here. It worked - // once or it shouldn't have stayed in the list, so it's less certain why - // it would have failed here. - }); - }); - } - - function noHandler(cmd) { - console.warn("[telebit] state.handlers['" + cmd[1] + "'] not set"); - console.warn(cmd[2]); - } - - var connCallback; - - function hyperPeek(tun) { - var m; - var str; - if (tun.data) { - if ('http' === tun.service) { - str = tun.data.toString(); - m = str.match(/(?:^|[\r\n])Host: ([^\r\n]+)[\r\n]*/im); - tun._name = tun._hostname = (m && m[1].toLowerCase() || '').split(':')[0]; - } - else if ('https' === tun.service || 'tls' === tun.service) { - tun._name = tun._servername = sni(tun.data); - } else { - tun._name = ''; - } - } - } - - var packerHandlers = { - oncontrol: function (opts) { - var cmd, err; - try { - cmd = JSON.parse(opts.data.toString()); - } catch (err) {} - if (!Array.isArray(cmd) || typeof cmd[0] !== 'number') { - console.warn('received bad command "' + opts.data.toString() + '"'); - return; - } - - if (cmd[0] < 0) { - var cb = pendingCommands[-cmd[0]]; - if (!cb) { - console.warn('received response for unknown request:', cmd); - } else { - cb.apply(null, cmd.slice(1)); - } - return; - } - - if (cmd[0] === 0) { - console.warn('received dis-associated error from server', cmd[1]); - if (connCallback) { - connCallback(cmd[1]); - } - return; - } - - if (cmd[1] === 'hello') { - if (state.debug) { console.log('[DEBUG] hello received'); } - sendAllTokens(); - if (connCallback) { - connCallback(); - } - // TODO: handle the versions and commands provided by 'hello' - isn't super important - // yet since there is only one version and set of commands. - err = null; - } else if (cmd[1] === 'grant') { - authenticated = true; - if (state.handlers[cmd[1]]) { - state.handlers[cmd[1]](cmd[2]); - } else { - noHandler(cmd); - } - return; - } else if (cmd[1] === 'access_token') { - authenticated = true; - if (state.handlers[cmd[1]]) { - state.handlers[cmd[1]](cmd[2]); - } else { - noHandler(cmd); - } - return; - } else { - err = { message: 'unknown command "'+cmd[1]+'"', code: 'E_UNKNOWN_COMMAND' }; - } + var DEFAULT_HTTP_TIMEOUT = (2 * 60); + var wsTunnelRemote = { + _activityTimeout: state.activityTimeout || (DEFAULT_HTTP_TIMEOUT - 5) * 1000 + , _pongTimeout: state.pongTimeout || 10*1000 + , _lastActivity: 0 + , _sendCommand: function (name) { + var id = Math.ceil(1e9 * Math.random()); + var cmd = [id, name].concat(Array.prototype.slice.call(arguments, 1)); + if (state.debug) { console.log('[DEBUG] command sending', cmd); } var packBody = true; - wsHandlers.sendMessage(Packer.packHeader(null, [-cmd[0], err], 'control', packBody)); - } - - , onconnection: function (tun) { - var cid = tun._id = Packer.addrToId(tun); - - // this data should have been gathered already as part of the proxy protocol - // but if it's available again here we can double check - hyperPeek(tun); - - // TODO use readable streams instead - wstunneler.pause(); - require(state.sortingHat).assign(state, tun, function (err, conn) { - if (err) { - err.message = err.message.replace(/:tun_id/, tun._id); - packerHandlers._onConnectError(cid, tun, err); - return; + wsTunnelRemote.sendMessage(Packer.packHeader(null, cmd, 'control', packBody)); + setTimeout(function () { + if (client.pendingCommands[id]) { + console.warn('command', name, id, 'timed out'); + client.pendingCommands[id]({ + message: 'response not received in time' + , code: 'E_TIMEOUT' + }); } - clientHandlers.add(conn, cid, tun); - if (tun.data) { conn.write(tun.data); } - wstunneler.resume(); + }, wsTunnelRemote._pongTimeout); + + return new PromiseA(function (resolve, reject) { + client.pendingCommands[id] = function (err, result) { + delete client.pendingCommands[id]; + if (err) { + reject(err); + } else { + resolve(result); + } + }; }); } - - , onmessage: function (tun) { - var cid = tun._id = Packer.addrToId(tun); - var handled; - - hyperPeek(tun); - - handled = clientHandlers.write(cid, tun); - - // quasi backwards compat - if (!handled) { console.log("[debug] did not get 'connection' event"); packerHandlers.onconnection(tun); } - } - - , onpause: function (opts) { - var cid = Packer.addrToId(opts); - if (localclients[cid]) { - console.log("[TunnelPause] pausing '"+cid+"', remote received", opts.data.toString(), 'of', localclients[cid].tunnelWritten, 'sent'); - localclients[cid].manualPause = true; - localclients[cid].pause(); - } else { - console.log('[TunnelPause] remote tried pausing finished connection', cid); - // Often we have enough latency that we've finished sending before we're told to pause, so - // don't worry about sending back errors, since we won't be sending data over anyway. - // var packBody = true; - // wsHandlers.sendMessage(Packer.packHeader(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error', packBody)); - } - } - , onresume: function (opts) { - var cid = Packer.addrToId(opts); - if (localclients[cid]) { - console.log("[TunnelResume] resuming '"+cid+"', remote received", opts.data.toString(), 'of', localclients[cid].tunnelWritten, 'sent'); - localclients[cid].manualPause = false; - localclients[cid].resume(); - } else { - console.log('[TunnelResume] remote tried resuming finished connection', cid); - // var packBody = true; - // wsHandlers.sendMessage(Packer.packHeader(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error', packBody)); - } - } - - , onend: function (opts) { - var cid = Packer.addrToId(opts); - //console.log("[end] '" + cid + "'"); - clientHandlers.closeSingle(cid); - } - , onerror: function (opts) { - var cid = Packer.addrToId(opts); - //console.log("[error] '" + cid + "'", opts.code || '', opts.message); - clientHandlers.closeSingle(cid); - } - - , _onConnectError: function (cid, opts, err) { - console.info("[_onConnectError] opening '" + cid + "' failed because " + err.message); - wsHandlers.sendMessage(Packer.packHeader(opts, null, 'error')); - } - }; - - var lastActivity; - var timeoutId; - var wsHandlers = { - refreshTimeout: function () { - lastActivity = Date.now(); + , refreshTimeout: function () { + wsTunnelRemote._lastActivity = Date.now(); } , checkTimeout: function () { - if (!wstunneler) { + if (!client.wstunneler) { console.warn('checkTimeout called when websocket already closed'); return; } // Determine how long the connection has been "silent", ie no activity. - var silent = Date.now() - lastActivity; + var silent = Date.now() - wsTunnelRemote._lastActivity; // If we have had activity within the last activityTimeout then all we need to do is // call this function again at the soonest time when the connection could be timed out. - if (silent < activityTimeout) { - timeoutId = setTimeout(wsHandlers.checkTimeout, activityTimeout-silent); + if (silent < wsTunnelRemote._activityTimeout) { + sharedTimeoutId = setTimeout(wsTunnelRemote.checkTimeout, wsTunnelRemote._activityTimeout-silent); } // Otherwise we check to see if the pong has also timed out, and if not we send a ping // and call this function again when the pong will have timed out. - else if (silent < activityTimeout + pongTimeout) { + else if (silent < wsTunnelRemote._activityTimeout + wsTunnelRemote._pongTimeout) { console.log('pinging tunnel server'); try { - wstunneler.ping(); + client.wstunneler.ping(); } catch (err) { console.warn('failed to ping tunnel server', err); } - timeoutId = setTimeout(wsHandlers.checkTimeout, pongTimeout); + sharedTimeoutId = setTimeout(wsTunnelRemote.checkTimeout, wsTunnelRemote._pongTimeout); } // Last case means the ping we sent before didn't get a response soon enough, so we // need to close the websocket connection. else { console.log('connection timed out'); - wstunneler.close(1000, 'connection timeout'); + client.wstunneler.close(1000, 'connection timeout'); } } , onOpen: function () { console.info("[open] connected to '" + (state.wss || state.relay) + "'"); - wsHandlers.refreshTimeout(); + wsTunnelRemote.refreshTimeout(); - timeoutId = setTimeout(wsHandlers.checkTimeout, activityTimeout); + sharedTimeoutId = setTimeout(wsTunnelRemote.checkTimeout, wsTunnelRemote._activityTimeout); - wstunneler._socket.on('drain', function () { + client.wstunneler._socket.on('drain', function () { // the websocket library has it's own buffer apart from node's socket buffer, but that one // is much more difficult to watch, so we watch for the lower level buffer to drain and // then check to see if the upper level buffer is still too full to write to. Note that // the websocket library buffer has something to do with compression, so I'm not requiring // that to be 0 before we start up again. - if (wstunneler.bufferedAmount > 128*1024) { + if (client.wstunneler.bufferedAmount > 128*1024) { return; } - pausedClients.forEach(function (conn) { + sharedPausedClients.forEach(function (conn) { if (!conn.manualPause) { // console.log('resuming connection', conn.tunnelCid, 'now the websocket has caught up'); conn.resume(); } }); - pausedClients.length = 0; + sharedPausedClients.length = 0; }); //Call either Open or Reconnect handlers. - if(state.handlers.onOpen && initialConnect) { + if(state.handlers.onOpen && clientHandlers._initialConnect) { state.handlers.onOpen(); - } else if (state.handlers.onReconnect && !initialConnect) { + } else if (state.handlers.onReconnect && !clientHandlers._initialConnect) { state.handlers.onReconnect(); } - initialConnect = false; + clientHandlers._initialConnect = false; } , onClose: function () { - clearTimeout(timeoutId); - wstunneler = null; + clearTimeout(sharedTimeoutId); + client.wstunneler = null; clientHandlers.closeAll(); var error = new Error('websocket connection closed before response'); error.code = 'E_CONN_CLOSED'; - Object.keys(pendingCommands).forEach(function (id) { - pendingCommands[id](error); + Object.keys(client.pendingCommands).forEach(function (id) { + client.pendingCommands[id](error); }); - if (connCallback) { - connCallback(error); + if (client.connCallback) { + client.connCallback(error); } - if (!authenticated) { + if (!client.authenticated) { if(state.handlers.onError) { var err = new Error('Failed to connect on first attempt... check authentication'); state.handlers.onError(err); @@ -501,14 +324,14 @@ function _connect(state) { state.handlers.onClose(); } console.info('[close] failed on first attempt... check authentication.'); - timeoutId = null; + sharedTimeoutId = null; } - else if (tokens.length) { + else if (client.sharedTokens.length) { if(state.handlers.onDisconnect) { state.handlers.onDisconnect(); } console.info('[retry] disconnected and waiting...'); - timeoutId = setTimeout(connect, 5000); + sharedTimeoutId = setTimeout(connect, 5000); } else { if(state.handlers.onClose) { state.handlers.onClose(); @@ -519,25 +342,25 @@ function _connect(state) { , onError: function (err) { if ('ENOTFOUND' === err.code) { // DNS issue, probably network is disconnected - timeoutId = setTimeout(connect, 90 * 1000); + sharedTimeoutId = setTimeout(connect, 90 * 1000); return; } console.error("[tunnel error] " + err.message); console.error(err); - if (connCallback) { - connCallback(err); + if (client.connCallback) { + client.connCallback(err); } } , sendMessage: function (msg) { - if (wstunneler) { + if (client.wstunneler) { try { - wstunneler.send(msg, {binary: true}); - return wstunneler.bufferedAmount; + client.wstunneler.send(msg, {binary: true}); + return client.wstunneler.bufferedAmount; } catch (err) { // There is a chance that this occurred after the websocket was told to close // and before it finished, in which case we don't need to log the error. - if (wstunneler.readyState !== wstunneler.CLOSING) { + if (client.wstunneler.readyState !== client.wstunneler.CLOSING) { console.warn('[sendMessage] error sending websocket message', err); } } @@ -546,13 +369,13 @@ function _connect(state) { }; function connect() { - if (wstunneler) { + if (client.wstunneler) { console.warn('attempted to connect with connection already active'); return; } - if (!tokens.length) { + if (!client.sharedTokens.length) { if (state.config.email) { - auth = { + client.auth = { subject: state.config.email , subject_scheme: 'mailto' // TODO create domains list earlier @@ -567,44 +390,44 @@ function _connect(state) { }; } } - timeoutId = null; - var machine = Packer.create(packerHandlers); + sharedTimeoutId = null; console.info("[connect] '" + (state.wss || state.relay) + "'"); - var tunnelUrl = (state.wss || state.relay).replace(/\/$/, '') + '/'; // + auth; - wstunneler = new WebSocket(tunnelUrl, { rejectUnauthorized: !state.insecure }); - wstunneler.on('open', wsHandlers.onOpen); - wstunneler.on('close', wsHandlers.onClose); - wstunneler.on('error', wsHandlers.onError); + var tunnelUrl = (state.wss || state.relay).replace(/\/$/, '') + '/'; // + client.auth; + var wsOpts = { binary: true, rejectUnauthorized: !state.insecure }; + client.wstunneler = new WebSocket(tunnelUrl, wsOpts); + client.wsreader = require('./ws-readable').create(client.wstunneler); + client.wstunneler.on('open', wsTunnelRemote.onOpen); + client.wstunneler.on('close', wsTunnelRemote.onClose); + client.wstunneler.on('error', wsTunnelRemote.onError); // Our library will automatically handle sending the pong respose to ping requests. - wstunneler.on('ping', wsHandlers.refreshTimeout); - wstunneler.on('pong', wsHandlers.refreshTimeout); - wstunneler.on('message', function (data, flags) { - wsHandlers.refreshTimeout(); + client.wstunneler.on('ping', wsTunnelRemote.refreshTimeout); + client.wstunneler.on('pong', wsTunnelRemote.refreshTimeout); + client.wstunneler.on('message', function (data, flags) { + wsTunnelRemote.refreshTimeout(); if (data.error || '{' === data[0]) { console.log(data); return; } - machine.fns.addChunk(data, flags); + client.machine.fns.addChunk(data, flags); }); } - connect(); - var connPromise; - return { - end: function(cb) { - tokens.length = 0; - if (timeoutId) { - clearTimeout(timeoutId); - timeoutId = null; + var xyzHandlers = { + _connPromise: null + , end: function(cb) { + client.sharedTokens.length = 0; + if (sharedTimeoutId) { + clearTimeout(sharedTimeoutId); + sharedTimeoutId = null; } - if (wstunneler) { + if (client.wstunneler) { try { - wstunneler.close(cb); + client.wstunneler.close(cb); } catch(e) { - console.error("[error] wstunneler.close()"); + console.error("[error] client.wstunneler.close()"); console.error(e); } } @@ -616,28 +439,28 @@ function _connect(state) { if ('undefined' === token) { throw new Error("attempted to append token as the string 'undefined'"); } - if (tokens.indexOf(token) >= 0) { + if (client.sharedTokens.indexOf(token) >= 0) { return PromiseA.resolve(); } - tokens.push(token); + client.sharedTokens.push(token); var prom; - if (tokens.length === 1 && !wstunneler) { + if (client.sharedTokens.length === 1 && !client.wstunneler) { // We just added the only token in the list, and the websocket connection isn't up // so we need to restart the connection. - if (timeoutId) { + if (sharedTimeoutId) { // Handle the case were the last token was removed and this token added between // reconnect attempts to make sure we don't try openning multiple connections. - clearTimeout(timeoutId); - timeoutId = null; + clearTimeout(sharedTimeoutId); + sharedTimeoutId = null; } // We want this case to behave as much like the other case as we can, but we don't have // the same kind of reponses when we open brand new connections, so we have to rely on // the 'hello' and the 'un-associated' error commands to determine if the token is good. - prom = connPromise = new PromiseA(function (resolve, reject) { - connCallback = function (err) { - connCallback = null; - connPromise = null; + prom = xyzHandlers._connPromise = new PromiseA(function (resolve, reject) { + client.connCallback = function (err) { + client.connCallback = null; + xyzHandlers._connPromise = null; if (err) { reject(err); } else { @@ -647,19 +470,19 @@ function _connect(state) { }); connect(); } - else if (connPromise) { - prom = connPromise.then(function () { - return sendCommand('add_token', token); + else if (xyzHandlers._connPromise) { + prom = xyzHandlers._connPromise.then(function () { + return wsTunnelRemote._sendCommand('add_token', token); }); } else { - prom = sendCommand('add_token', token); + prom = wsTunnelRemote._sendCommand('add_token', token); } prom.catch(function (err) { console.error('adding token', token, 'failed:', err); // Most probably an invalid token of some kind, so we don't really want to keep it. - tokens.splice(tokens.indexOf(token), 1); + client.sharedTokens.splice(client.sharedTokens.indexOf(token), 1); }); return prom; @@ -670,16 +493,16 @@ function _connect(state) { } if (token === '*') { - tokens.length = 0; + client.sharedTokens.length = 0; } else { - var index = tokens.indexOf(token); + var index = client.sharedTokens.indexOf(token); if (index < 0) { return PromiseA.resolve(); } - tokens.splice(index); + client.sharedTokens.splice(index); } - var prom = sendCommand('delete_token', token); + var prom = wsTunnelRemote._sendCommand('delete_token', token); prom.catch(function (err) { console.error('clearing token', token, 'failed:', err); }); @@ -687,6 +510,11 @@ function _connect(state) { return prom; } }; + + client._wsTunnelRemote = wsTunnelRemote; + client.clientHandlers = clientHandlers; + connect(); + return xyzHandlers; } module.exports.connect = _connect; diff --git a/lib/sorting-hat.js b/lib/sorting-hat.js index b7719f0..eda9ce5 100644 --- a/lib/sorting-hat.js +++ b/lib/sorting-hat.js @@ -372,7 +372,7 @@ module.exports.assign = function (state, tun, cb) { serveStatic = state._serveStatic(conf.handler, { dotfiles: 'allow', index: [ 'index.html' ] }); dlStatic = state._serveStatic(conf.handler, { acceptRanges: false, dotfiles: 'allow', index: [ 'index.html' ] }); serveIndex = state._serveIndex(conf.handler, { - hidden: true, icons: true + hidden: true, icons: true, view: 'tiles' , template: require('serve-tpl-attachment')({ privatefiles: 'ignore' }) }); } diff --git a/lib/ws-readable.js b/lib/ws-readable.js new file mode 100644 index 0000000..af1a997 --- /dev/null +++ b/lib/ws-readable.js @@ -0,0 +1,75 @@ +// https://github.com/websockets/ws/issues/596 +var WSStream = module.exports = (function(){ + var util = require('util'); + var stream = require('stream'); + var Duplex = stream.Duplex || require('readable-stream').Duplex; + var WSStream = function(ws, wsOptions, streamingOptions) { // Only the websocket (ws) is mandatory + // Proper duplex stream with automatic flow control (backpressure) management + if(!(this instanceof WSStream)) return new WSStream(ws, wsOptions, streamingOptions); + if(!(wsOptions instanceof Object)) wsOptions = {binary: false}; + Duplex.call(this, streamingOptions); + this.waitingForData = true; + this.writeModBufferEmpty = true; + this.webSocket = ws; + this.webSocketOptions = wsOptions; + this.on('finish', finishEventHandler(this)); + ws._socket.write = writeMod(ws._socket.write, this); + ws._socket.on('drain', drainEventHandler(this)); + ws._socket.on('error', errorRouter(this)); + ws.on('close', closeEventHandler(this)); + ws.on('message', messageHandler(this)); + }, + finishEventHandler = function(self) { + return function() { + if(self.webSocket !== null) { + self.webSocket.close(); + self.webSocket = null; + }; + self.emit('close'); + }; + }, + writeMod = function(nativeWriteFunction, self) { + return function() { + return self.writeModBufferEmpty = nativeWriteFunction.apply(this, arguments); + }; + }, + drainEventHandler = function(self) { + return function() { + self.writeModBufferEmpty = true; + self.emit('drain'); + }; + }, + closeEventHandler = function(self) { + return function() { + self.push(null); + } + }, + errorRouter = function(self) { + return function() { + self.emit.apply(self, ['error'].concat(arguments)); + }; + }, + messageHandler = function(self) { + return function(data) { + if(!self.push(data) && self.waitingForData) { + self.webSocket._socket.pause(); + self.waitingForData = false; + } + }; + }; + util.inherits(WSStream, Duplex); + WSStream.prototype._read = function(size) { + if(!this.waitingForData) { + this.waitingForData = true; + this.webSocket._socket.resume(); + } + }; + WSStream.prototype._write = function(chunk, encoding, callback) { + this.webSocket.send(chunk, this.webSocketOptions, callback); + return this.writeModBufferEmpty; + }; + return WSStream; + }()); +module.exports.create = function (ws, wsOpts, streamOpts) { + return new WSStream(ws, wsOpts, streamOpts); +};