cleanup and bugfix

This commit is contained in:
AJ ONeal 2018-06-01 07:24:00 +00:00
parent 3fe62c6b02
commit d0ae3a1c0f
1 changed files with 74 additions and 49 deletions

View File

@ -86,9 +86,29 @@ module.exports.create = function (state) {
} }
function onWsConnection(ws, upgradeReq) { function onWsConnection(ws, upgradeReq) {
if (state.debug) { console.log('[ws] connection'); }
var socketId = Packer.socketToId(upgradeReq.socket); var socketId = Packer.socketToId(upgradeReq.socket);
if (state.debug) { console.log('[ws] connection', socketId); }
var remotes = {}; var remotes = {};
var firstToken;
var authn = (upgradeReq.headers.authorization||'').split(/\s+/);
if (authn[0] && 'basic' === authn[0].toLowerCase()) {
try {
authn = new Buffer(authn[1], 'base64').toString('ascii').split(':');
firstToken = authn[1];
} catch (err) { }
}
if (!firstToken) {
firstToken = url.parse(upgradeReq.url, true).query.access_token;
}
if (firstToken) {
var err = addToken(firstToken, true);
if (err) {
sendTunnelMsg(null, [0, err], 'control');
ws.close();
return;
}
}
function logName() { function logName() {
var result = Object.keys(remotes).map(function (jwtoken) { var result = Object.keys(remotes).map(function (jwtoken) {
@ -97,6 +117,7 @@ module.exports.create = function (state) {
return result || socketId; return result || socketId;
} }
function sendTunnelMsg(addr, data, service) { function sendTunnelMsg(addr, data, service) {
ws.send(Packer.pack(addr, data, service), {binary: true}); ws.send(Packer.pack(addr, data, service), {binary: true});
} }
@ -112,6 +133,7 @@ module.exports.create = function (state) {
return browserConn; return browserConn;
} }
function closeBrowserConn(cid) { function closeBrowserConn(cid) {
var remote; var remote;
Object.keys(remotes).some(function (jwtoken) { Object.keys(remotes).some(function (jwtoken) {
@ -194,7 +216,6 @@ module.exports.create = function (state) {
token.ws = ws; token.ws = ws;
token.upgradeReq = upgradeReq; token.upgradeReq = upgradeReq;
token.clients = {}; token.clients = {};
token.dynamicPorts = [];
token.pausedConns = []; token.pausedConns = [];
ws._socket.on('drain', function () { ws._socket.on('drain', function () {
@ -220,20 +241,38 @@ module.exports.create = function (state) {
Devices.add(state.deviceLists, domainname, token); Devices.add(state.deviceLists, domainname, token);
}); });
function handleTcpServer() { if (!firstToken || firstToken === jwtoken) {
firstToken = jwtoken;
token.dynamicPorts = [];
function onDynTcpReady() {
var serviceport = this.address().port; var serviceport = this.address().port;
console.info('[DynTcpConn] Port', serviceport, 'now open for', token.deviceId); console.info('[DynTcpConn] Port', serviceport, 'now open for', token.deviceId);
token.dynamicPorts.push(serviceport); token.dynamicPorts.push(serviceport);
Devices.add(state.deviceLists, serviceport, token); Devices.add(state.deviceLists, serviceport, token);
sendTunnelMsg(
null
, [ 2
, 'grant'
, [ ['tcp', serviceport]
, ['https', 'ssh.test.telebit.cloud' ]
]
]
, 'control'
);
} }
try { try {
token.server = require('net').createServer(onDynTcpConn).listen(0, handleTcpServer); token.server = require('net').createServer(onDynTcpConn).listen(0, onDynTcpReady);
token.server.on('error', function (e) {
console.error("Server Error assigning a dynamic port to a new connection:", e);
});
} catch(e) { } catch(e) {
// what a wonderful problem it will be the day that this bug needs to be fixed // what a wonderful problem it will be the day that this bug needs to be fixed
// (i.e. there are enough users to run out of ports) // (i.e. there are enough users to run out of ports)
console.error("Error assigning a dynamic port to a new connection:", e); console.error("Error assigning a dynamic port to a new connection:", e);
} }
}
remotes[jwtoken] = token; remotes[jwtoken] = token;
console.info("[ws] authorized", socketId, "for", token.deviceId); console.info("[ws] authorized", socketId, "for", token.deviceId);
@ -256,12 +295,14 @@ module.exports.create = function (state) {
}); });
remote.ws = null; remote.ws = null;
remote.upgradeReq = null; remote.upgradeReq = null;
if (remote.server) {
remote.serverPort = remote.server.address().port; remote.serverPort = remote.server.address().port;
remote.server.close(function () { remote.server.close(function () {
console.log("[DynTcpConn] closing server for ", remote.serverPort); console.log("[DynTcpConn] closing server for ", remote.serverPort);
remote.serverPort = null; remote.serverPort = null;
}); });
remote.server = null; remote.server = null;
}
// Close all of the existing browser connections associated with this websocket connection. // Close all of the existing browser connections associated with this websocket connection.
Object.keys(remote.clients).forEach(function (cid) { Object.keys(remote.clients).forEach(function (cid) {
@ -272,26 +313,6 @@ module.exports.create = function (state) {
return null; return null;
} }
var firstToken;
var authn = (upgradeReq.headers.authorization||'').split(/\s+/);
if (authn[0] && 'basic' === authn[0].toLowerCase()) {
try {
authn = new Buffer(authn[1], 'base64').toString('ascii').split(':');
firstToken = authn[1];
} catch (err) { }
}
if (!firstToken) {
firstToken = url.parse(upgradeReq.url, true).query.access_token;
}
if (firstToken) {
var err = addToken(firstToken);
if (err) {
sendTunnelMsg(null, [0, err], 'control');
ws.close();
return;
}
}
var commandHandlers = { var commandHandlers = {
add_token: addToken add_token: addToken
, delete_token: function (token) { , delete_token: function (token) {
@ -413,11 +434,10 @@ module.exports.create = function (state) {
}; };
var unpacker = Packer.create(packerHandlers); var unpacker = Packer.create(packerHandlers);
var lastActivity = Date.now();
var timeoutId;
function refreshTimeout() { function refreshTimeout() {
lastActivity = Date.now(); lastActivity = Date.now();
} }
function checkTimeout() { function checkTimeout() {
// Determine how long the connection has been "silent", ie no activity. // Determine how long the connection has been "silent", ie no activity.
var silent = Date.now() - lastActivity; var silent = Date.now() - lastActivity;
@ -447,18 +467,13 @@ module.exports.create = function (state) {
ws.close(1013, 'connection timeout'); ws.close(1013, 'connection timeout');
} }
} }
timeoutId = setTimeout(checkTimeout, activityTimeout);
// Note that our websocket library automatically handles pong responses on ping requests function forwardMessage(chunk) {
// before it even emits the event.
ws.on('ping', refreshTimeout);
ws.on('pong', refreshTimeout);
ws.on('message', function forwardMessage(chunk) {
refreshTimeout(); refreshTimeout();
if (state.debug) { console.log('[ws] device => client : demultiplexing message ', chunk.byteLength, 'bytes'); } if (state.debug) { console.log('[ws] device => client : demultiplexing message ', chunk.byteLength, 'bytes'); }
//console.log(chunk.toString()); //console.log(chunk.toString());
unpacker.fns.addChunk(chunk); unpacker.fns.addChunk(chunk);
}); }
function hangup() { function hangup() {
clearTimeout(timeoutId); clearTimeout(timeoutId);
@ -469,6 +484,16 @@ module.exports.create = function (state) {
ws.terminate(); ws.terminate();
} }
var lastActivity = Date.now();
var timeoutId;
timeoutId = setTimeout(checkTimeout, activityTimeout);
// Note that our websocket library automatically handles pong responses on ping requests
// before it even emits the event.
ws.on('ping', refreshTimeout);
ws.on('pong', refreshTimeout);
ws.on('message', forwardMessage);
ws.on('close', hangup); ws.on('close', hangup);
ws.on('error', hangup); ws.on('error', hangup);