implemented throttling when we buffer too much data
This commit is contained in:
		
							parent
							
								
									bbc3ffaf6d
								
							
						
					
					
						commit
						e9c24efd5f
					
				| @ -8,6 +8,9 @@ var program = require('commander'); | ||||
| var url = require('url'); | ||||
| var stunnel = require('../wsclient.js'); | ||||
| 
 | ||||
| var domainsMap = {}; | ||||
| var services = {}; | ||||
| 
 | ||||
| function collectDomains(val, memo) { | ||||
|   var vals = val.split(/,/g); | ||||
| 
 | ||||
| @ -237,9 +240,6 @@ function daplieTunnel() { | ||||
|   }); | ||||
| } | ||||
| 
 | ||||
| var domainsMap = {}; | ||||
| var services = {}; | ||||
| 
 | ||||
| program.locals = (program.locals || []).concat(program.domains || []); | ||||
| program.locals.forEach(function (proxy) { | ||||
|   // Create a map from which we can derive a list of all domains we want forwarded to us.
 | ||||
|  | ||||
							
								
								
									
										153
									
								
								wsclient.js
									
									
									
									
									
								
							
							
						
						
									
										153
									
								
								wsclient.js
									
									
									
									
									
								
							| @ -28,15 +28,60 @@ function run(copts) { | ||||
|   var authenticated = false; | ||||
| 
 | ||||
|   var localclients = {}; | ||||
|   var pausedClients = []; | ||||
|   var clientHandlers = { | ||||
|     onClose: function (cid, opts, err) { | ||||
|       wsHandlers.sendMessage(Packer.pack(opts, null, err && 'error' || 'end')); | ||||
|       delete localclients[cid]; | ||||
|       console.log('[local onClose] closed "' + cid + '" (' + clientHandlers.count() + ' clients)'); | ||||
|     add: function (conn, cid, opts, servername) { | ||||
|       localclients[cid] = conn; | ||||
|       console.info("[connect] new client '" + cid + "' for '" + servername + "' (" + clientHandlers.count() + " clients)"); | ||||
| 
 | ||||
|       conn.on('data', function onLocalData(chunk) { | ||||
|         if (conn.tunnelClosing) { | ||||
|           console.warn("[onLocalData] received data for '"+cid+"' over socket after connection was ended"); | ||||
|           return; | ||||
|         } | ||||
|   , onError: function(cid, opts, err) { | ||||
|       console.info("[local onError] closing '" + cid + "' because '" + err.message + "'"); | ||||
|       clientHandlers.onClose(cid, opts, err); | ||||
| 
 | ||||
|         // If we have a lot of buffered data waiting to be sent over the websocket we want to slow
 | ||||
|         // down the data we are getting to send over. We also want to pause all active connections
 | ||||
|         // if any connections are paused to make things more fair so one connection doesn't get
 | ||||
|         // stuff waiting for all other connections to finish because it tried writing near the border.
 | ||||
|         var bufSize = wsHandlers.sendMessage(Packer.pack(opts, chunk)); | ||||
|         if (pausedClients.length || bufSize > 1024*1024) { | ||||
|           console.log('paused connection', cid, 'to allow websocket to catch up'); | ||||
|           conn.pause(); | ||||
|           pausedClients.push(conn); | ||||
|         } | ||||
|       }); | ||||
|       conn.on('error', function onLocalError(err) { | ||||
|         console.info("[onLocalError] connection '" + cid + "' errored:", err); | ||||
|         // The 'close' event should fire soon, so let that send on the websocket
 | ||||
|       }); | ||||
|       conn.on('close', function onLocalClose(hadErr) { | ||||
|         delete localclients[cid]; | ||||
|         console.log('[onLocalClose] closed "' + cid + '" (' + clientHandlers.count() + ' clients)'); | ||||
|         wsHandlers.sendMessage(Packer.pack(opts, null, hadErr && 'error' || 'end')); | ||||
|       }); | ||||
|     } | ||||
| 
 | ||||
|   , write: function (cid, opts) { | ||||
|       var conn = localclients[cid]; | ||||
|       if (!conn) { | ||||
|         return false; | ||||
|       } | ||||
|       //console.log("[=>] received data from '" + cid + "' =>", opts.data.byteLength);
 | ||||
| 
 | ||||
|       if (conn.tunnelClosing) { | ||||
|         console.warn("[onmessage] received data for '"+cid+"' over socket after connection was ended"); | ||||
|         return true; | ||||
|       } | ||||
| 
 | ||||
|       conn.write(opts.data); | ||||
|       if (conn.bufferSize > 1024*1024) { | ||||
|         wsHandlers.sendMessage(Packer.pack(opts, null, 'pause')); | ||||
|         conn.once('drain', function () { | ||||
|           wsHandlers.sendMessage(Packer.pack(opts, null, 'resume')); | ||||
|         }); | ||||
|       } | ||||
|       return true; | ||||
|     } | ||||
| 
 | ||||
|   , closeSingle: function (cid) { | ||||
| @ -45,29 +90,39 @@ function run(copts) { | ||||
|       } | ||||
| 
 | ||||
|       console.log('[closeSingle]', cid); | ||||
|       PromiseA.resolve() | ||||
|         .then(function () { | ||||
|           localclients[cid].end(); | ||||
|       PromiseA.resolve().then(function () { | ||||
|         var conn = localclients[cid]; | ||||
|         conn.tunnelClosing = true; | ||||
|         conn.end(); | ||||
| 
 | ||||
|         // If no data is buffered for writing then we don't need to wait for it to drain.
 | ||||
|         if (!conn.bufferSize) { | ||||
|           return timeoutPromise(500); | ||||
|         }) | ||||
|         .then(function () { | ||||
|         } | ||||
|         // Otherwise we want the connection to be able to finish, but we also want to impose
 | ||||
|         // a time limit for it to drain, since it shouldn't have more than 1MB buffered.
 | ||||
|         return new PromiseA(function (resolve) { | ||||
|           var timeoutId = setTimeout(resolve, 60*1000); | ||||
|           conn.once('drain', function () { | ||||
|             clearTimeout(timeoutId); | ||||
|             setTimeout(resolve, 500); | ||||
|           }); | ||||
|         }); | ||||
|       }).then(function () { | ||||
|         if (localclients[cid]) { | ||||
|           console.warn('[closeSingle]', cid, 'connection still present after calling `end`'); | ||||
|           localclients[cid].destroy(); | ||||
|           return timeoutPromise(500); | ||||
|         } | ||||
|         }) | ||||
|         .then(function () { | ||||
|       }).then(function () { | ||||
|         if (localclients[cid]) { | ||||
|           console.error('[closeSingle]', cid, 'connection still present after calling `destroy`'); | ||||
|           delete localclients[cid]; | ||||
|         } | ||||
|         }) | ||||
|         .catch(function (err) { | ||||
|       }).catch(function (err) { | ||||
|         console.error('[closeSingle] failed to close connection', cid, err.toString()); | ||||
|         delete localclients[cid]; | ||||
|         }) | ||||
|         ; | ||||
|       }); | ||||
|     } | ||||
|   , closeAll: function () { | ||||
|       console.log('[closeAll]'); | ||||
| @ -169,6 +224,7 @@ function run(copts) { | ||||
| 
 | ||||
|       wsHandlers.sendMessage(Packer.pack(null, [-cmd[0], err], 'control')); | ||||
|     } | ||||
| 
 | ||||
|   , onmessage: function (opts) { | ||||
|       var net = copts.net || require('net'); | ||||
|       var cid = Packer.addrToId(opts); | ||||
| @ -179,9 +235,7 @@ function run(copts) { | ||||
|       var str; | ||||
|       var m; | ||||
| 
 | ||||
|       if (localclients[cid]) { | ||||
|         //console.log("[=>] received data from '" + cid + "' =>", opts.data.byteLength);
 | ||||
|         localclients[cid].write(opts.data); | ||||
|       if (clientHandlers.write(cid, opts)) { | ||||
|         return; | ||||
|       } | ||||
|       if (!portList) { | ||||
| @ -237,39 +291,39 @@ function run(copts) { | ||||
|       , remoteAddress: opts.address | ||||
|       , remotePort: opts.port | ||||
|       }; | ||||
|       localclients[cid] = net.createConnection(createOpts, function () { | ||||
|       var conn = net.createConnection(createOpts, function () { | ||||
|         // this will happen before 'data' or 'readable' is triggered
 | ||||
|         // We use the data from the createOpts object so that the createConnection function has
 | ||||
|         // the oppurtunity of removing/changing it if it wants/needs to handle it differently.
 | ||||
|         if (createOpts.data) { | ||||
|           localclients[cid].write(createOpts.data); | ||||
|           conn.write(createOpts.data); | ||||
|         } | ||||
|       }); | ||||
|       console.info("[connect] new client '" + cid + "' for '" + servername + "' (" + clientHandlers.count() + " clients)"); | ||||
| 
 | ||||
|       localclients[cid].on('readable', function (size) { | ||||
|         var chunk; | ||||
| 
 | ||||
|         if (!localclients[cid]) { | ||||
|           console.error("[error] localclients[cid]", cid); | ||||
|           return; | ||||
|         } | ||||
|         if (!localclients[cid].read) { | ||||
|           console.error("[error] localclients[cid].read", cid); | ||||
|           console.log(localclients[cid]); | ||||
|           return; | ||||
|       clientHandlers.add(conn, cid, opts, servername); | ||||
|     } | ||||
| 
 | ||||
|         do { | ||||
|           chunk = localclients[cid].read(size); | ||||
|           if (chunk) { | ||||
|             wsHandlers.sendMessage(Packer.pack(opts, chunk)); | ||||
|   , onpause: function (opts) { | ||||
|       var cid = Packer.addrToId(opts); | ||||
|       console.log('[TunnelPause]', cid); | ||||
|       if (localclients[cid]) { | ||||
|         localclients[cid].manualPause = true; | ||||
|         localclients[cid].pause(); | ||||
|       } else { | ||||
|         wsHandlers.sendMessage(Packer.pack(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error')); | ||||
|       } | ||||
|         } while (chunk); | ||||
|       }); | ||||
|       localclients[cid].on('error', clientHandlers.onError.bind(null, cid, opts)); | ||||
|       localclients[cid].on('end',   clientHandlers.onClose.bind(null, cid, opts)); | ||||
|     } | ||||
|   , onresume: function (opts) { | ||||
|       var cid = Packer.addrToId(opts); | ||||
|       console.log('[TunnelResume]', cid); | ||||
|       if (localclients[cid]) { | ||||
|         localclients[cid].manualPause = false; | ||||
|         localclients[cid].resume(); | ||||
|       } else { | ||||
|         wsHandlers.sendMessage(Packer.pack(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error')); | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|   , onend: function (opts) { | ||||
|       var cid = Packer.addrToId(opts); | ||||
|       //console.log("[end] '" + cid + "'");
 | ||||
| @ -280,6 +334,7 @@ function run(copts) { | ||||
|       //console.log("[error] '" + cid + "'", opts.code || '', opts.message);
 | ||||
|       clientHandlers.closeSingle(cid); | ||||
|     } | ||||
| 
 | ||||
|   , _onConnectError: function (cid, opts, err) { | ||||
|       console.info("[_onConnectError] opening '" + cid + "' failed because " + err.message); | ||||
|       wsHandlers.sendMessage(Packer.pack(opts, null, 'error')); | ||||
| @ -330,6 +385,17 @@ function run(copts) { | ||||
|       console.info("[open] connected to '" + copts.stunneld + "'"); | ||||
|       wsHandlers.refreshTimeout(); | ||||
|       timeoutId = setTimeout(wsHandlers.checkTimeout, activityTimeout); | ||||
| 
 | ||||
|       wstunneler._socket.on('drain', function () { | ||||
|         pausedClients.forEach(function (conn) { | ||||
|           if (!conn.manualPause) { | ||||
|             console.log('resuming connection', conn.tunnelCid, 'now the websocket has caught up'); | ||||
|             conn.resume(); | ||||
|           } | ||||
|         }); | ||||
| 
 | ||||
|         pausedClients.length = 0; | ||||
|       }); | ||||
|     } | ||||
| 
 | ||||
|   , onClose: function () { | ||||
| @ -363,6 +429,7 @@ function run(copts) { | ||||
|       if (wstunneler) { | ||||
|         try { | ||||
|           wstunneler.send(msg, {binary: true}); | ||||
|           return wstunneler._socket.bufferSize; | ||||
|         } catch (err) { | ||||
|           // There is a chance that this occurred after the websocket was told to close
 | ||||
|           // and before it finished, in which case we don't need to log the error.
 | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user