From f945f0b4961aed3f411804d925227ace7f7b0576 Mon Sep 17 00:00:00 2001 From: tigerbot Date: Fri, 8 Sep 2017 10:54:47 -0600 Subject: [PATCH] implemented throttling when we buffer too much data --- bin/stunnel.js | 6 +- wsclient.js | 179 +++++++++++++++++++++++++++++++++---------------- 2 files changed, 126 insertions(+), 59 deletions(-) diff --git a/bin/stunnel.js b/bin/stunnel.js index fc0c5b6..9261807 100755 --- a/bin/stunnel.js +++ b/bin/stunnel.js @@ -8,6 +8,9 @@ var program = require('commander'); var url = require('url'); var stunnel = require('../wsclient.js'); +var domainsMap = {}; +var services = {}; + function collectDomains(val, memo) { var vals = val.split(/,/g); @@ -237,9 +240,6 @@ function daplieTunnel() { }); } -var domainsMap = {}; -var services = {}; - program.locals = (program.locals || []).concat(program.domains || []); program.locals.forEach(function (proxy) { // Create a map from which we can derive a list of all domains we want forwarded to us. diff --git a/wsclient.js b/wsclient.js index 6fb7503..ce59518 100644 --- a/wsclient.js +++ b/wsclient.js @@ -28,15 +28,60 @@ function run(copts) { var authenticated = false; var localclients = {}; + var pausedClients = []; var clientHandlers = { - onClose: function (cid, opts, err) { - wsHandlers.sendMessage(Packer.pack(opts, null, err && 'error' || 'end')); - delete localclients[cid]; - console.log('[local onClose] closed "' + cid + '" (' + clientHandlers.count() + ' clients)'); + add: function (conn, cid, opts, servername) { + localclients[cid] = conn; + console.info("[connect] new client '" + cid + "' for '" + servername + "' (" + clientHandlers.count() + " clients)"); + + conn.on('data', function onLocalData(chunk) { + if (conn.tunnelClosing) { + console.warn("[onLocalData] received data for '"+cid+"' over socket after connection was ended"); + return; + } + + // If we have a lot of buffered data waiting to be sent over the websocket we want to slow + // down the data we are getting to send over. We also want to pause all active connections + // if any connections are paused to make things more fair so one connection doesn't get + // stuff waiting for all other connections to finish because it tried writing near the border. + var bufSize = wsHandlers.sendMessage(Packer.pack(opts, chunk)); + if (pausedClients.length || bufSize > 1024*1024) { + console.log('paused connection', cid, 'to allow websocket to catch up'); + conn.pause(); + pausedClients.push(conn); + } + }); + conn.on('error', function onLocalError(err) { + console.info("[onLocalError] connection '" + cid + "' errored:", err); + // The 'close' event should fire soon, so let that send on the websocket + }); + conn.on('close', function onLocalClose(hadErr) { + delete localclients[cid]; + console.log('[onLocalClose] closed "' + cid + '" (' + clientHandlers.count() + ' clients)'); + wsHandlers.sendMessage(Packer.pack(opts, null, hadErr && 'error' || 'end')); + }); } - , onError: function(cid, opts, err) { - console.info("[local onError] closing '" + cid + "' because '" + err.message + "'"); - clientHandlers.onClose(cid, opts, err); + + , write: function (cid, opts) { + var conn = localclients[cid]; + if (!conn) { + return false; + } + //console.log("[=>] received data from '" + cid + "' =>", opts.data.byteLength); + + if (conn.tunnelClosing) { + console.warn("[onmessage] received data for '"+cid+"' over socket after connection was ended"); + return true; + } + + conn.write(opts.data); + if (conn.bufferSize > 1024*1024) { + wsHandlers.sendMessage(Packer.pack(opts, null, 'pause')); + conn.once('drain', function () { + wsHandlers.sendMessage(Packer.pack(opts, null, 'resume')); + }); + } + return true; } , closeSingle: function (cid) { @@ -45,29 +90,39 @@ function run(copts) { } console.log('[closeSingle]', cid); - PromiseA.resolve() - .then(function () { - localclients[cid].end(); + PromiseA.resolve().then(function () { + var conn = localclients[cid]; + conn.tunnelClosing = true; + conn.end(); + + // If no data is buffered for writing then we don't need to wait for it to drain. + if (!conn.bufferSize) { return timeoutPromise(500); - }) - .then(function () { - if (localclients[cid]) { - console.warn('[closeSingle]', cid, 'connection still present after calling `end`'); - localclients[cid].destroy(); - return timeoutPromise(500); - } - }) - .then(function () { - if (localclients[cid]) { - console.error('[closeSingle]', cid, 'connection still present after calling `destroy`'); - delete localclients[cid]; - } - }) - .catch(function (err) { - console.error('[closeSingle] failed to close connection', cid, err.toString()); + } + // Otherwise we want the connection to be able to finish, but we also want to impose + // a time limit for it to drain, since it shouldn't have more than 1MB buffered. + return new PromiseA(function (resolve) { + var timeoutId = setTimeout(resolve, 60*1000); + conn.once('drain', function () { + clearTimeout(timeoutId); + setTimeout(resolve, 500); + }); + }); + }).then(function () { + if (localclients[cid]) { + console.warn('[closeSingle]', cid, 'connection still present after calling `end`'); + localclients[cid].destroy(); + return timeoutPromise(500); + } + }).then(function () { + if (localclients[cid]) { + console.error('[closeSingle]', cid, 'connection still present after calling `destroy`'); delete localclients[cid]; - }) - ; + } + }).catch(function (err) { + console.error('[closeSingle] failed to close connection', cid, err.toString()); + delete localclients[cid]; + }); } , closeAll: function () { console.log('[closeAll]'); @@ -169,6 +224,7 @@ function run(copts) { wsHandlers.sendMessage(Packer.pack(null, [-cmd[0], err], 'control')); } + , onmessage: function (opts) { var net = copts.net || require('net'); var cid = Packer.addrToId(opts); @@ -179,9 +235,7 @@ function run(copts) { var str; var m; - if (localclients[cid]) { - //console.log("[=>] received data from '" + cid + "' =>", opts.data.byteLength); - localclients[cid].write(opts.data); + if (clientHandlers.write(cid, opts)) { return; } if (!portList) { @@ -237,39 +291,39 @@ function run(copts) { , remoteAddress: opts.address , remotePort: opts.port }; - localclients[cid] = net.createConnection(createOpts, function () { + var conn = net.createConnection(createOpts, function () { // this will happen before 'data' or 'readable' is triggered // We use the data from the createOpts object so that the createConnection function has // the oppurtunity of removing/changing it if it wants/needs to handle it differently. if (createOpts.data) { - localclients[cid].write(createOpts.data); + conn.write(createOpts.data); } }); - console.info("[connect] new client '" + cid + "' for '" + servername + "' (" + clientHandlers.count() + " clients)"); - localclients[cid].on('readable', function (size) { - var chunk; - - if (!localclients[cid]) { - console.error("[error] localclients[cid]", cid); - return; - } - if (!localclients[cid].read) { - console.error("[error] localclients[cid].read", cid); - console.log(localclients[cid]); - return; - } - - do { - chunk = localclients[cid].read(size); - if (chunk) { - wsHandlers.sendMessage(Packer.pack(opts, chunk)); - } - } while (chunk); - }); - localclients[cid].on('error', clientHandlers.onError.bind(null, cid, opts)); - localclients[cid].on('end', clientHandlers.onClose.bind(null, cid, opts)); + clientHandlers.add(conn, cid, opts, servername); } + + , onpause: function (opts) { + var cid = Packer.addrToId(opts); + console.log('[TunnelPause]', cid); + if (localclients[cid]) { + localclients[cid].manualPause = true; + localclients[cid].pause(); + } else { + wsHandlers.sendMessage(Packer.pack(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error')); + } + } + , onresume: function (opts) { + var cid = Packer.addrToId(opts); + console.log('[TunnelResume]', cid); + if (localclients[cid]) { + localclients[cid].manualPause = false; + localclients[cid].resume(); + } else { + wsHandlers.sendMessage(Packer.pack(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error')); + } + } + , onend: function (opts) { var cid = Packer.addrToId(opts); //console.log("[end] '" + cid + "'"); @@ -280,6 +334,7 @@ function run(copts) { //console.log("[error] '" + cid + "'", opts.code || '', opts.message); clientHandlers.closeSingle(cid); } + , _onConnectError: function (cid, opts, err) { console.info("[_onConnectError] opening '" + cid + "' failed because " + err.message); wsHandlers.sendMessage(Packer.pack(opts, null, 'error')); @@ -330,6 +385,17 @@ function run(copts) { console.info("[open] connected to '" + copts.stunneld + "'"); wsHandlers.refreshTimeout(); timeoutId = setTimeout(wsHandlers.checkTimeout, activityTimeout); + + wstunneler._socket.on('drain', function () { + pausedClients.forEach(function (conn) { + if (!conn.manualPause) { + console.log('resuming connection', conn.tunnelCid, 'now the websocket has caught up'); + conn.resume(); + } + }); + + pausedClients.length = 0; + }); } , onClose: function () { @@ -363,6 +429,7 @@ function run(copts) { if (wstunneler) { try { wstunneler.send(msg, {binary: true}); + return wstunneler._socket.bufferSize; } catch (err) { // There is a chance that this occurred after the websocket was told to close // and before it finished, in which case we don't need to log the error.