better handlers

This commit is contained in:
AJ ONeal 2016-09-30 02:46:02 -04:00
parent ac99146874
commit ab0731dd56
1 changed files with 85 additions and 58 deletions

137
mux-ws.js
View File

@ -79,15 +79,15 @@ require('cluster-store').create().then(function (store) {
var tcp3000 = net.createServer(); var tcp3000 = net.createServer();
wss.on('connection', function (ws) { wss.on('connection', function (ws) {
console.log('todo connection');
var location = url.parse(ws.upgradeReq.url, true); var location = url.parse(ws.upgradeReq.url, true);
//var token = jwt.decode(location.query.access_token); //var token = jwt.decode(location.query.access_token);
var token = jwt.verify(location.query.access_token, secret); var token = jwt.verify(location.query.access_token, secret);
if (!token || !token.name) {
console.log('location, token'); console.log('location, token');
console.log(location.query.access_token); console.log(location.query.access_token);
console.log(token); console.log(token);
}
if (!token) { if (!token) {
ws.send({ error: { message: "invalid access token", code: "E_INVALID_TOKEN" } }); ws.send({ error: { message: "invalid access token", code: "E_INVALID_TOKEN" } });
@ -95,19 +95,53 @@ require('cluster-store').create().then(function (store) {
return; return;
} }
console.log('token.name:', token.name);
if (!token.name) { if (!token.name) {
ws.send({ error: { message: "invalid server name", code: "E_INVALID_NAME" } }); ws.send({ error: { message: "invalid server name", code: "E_INVALID_NAME" } });
ws.close(); ws.close();
return; return;
} }
ws.on('close', function () {
console.log("TODO cleanup");
});
var remote = remotes[token.name] = remotes[token.name] || {}; var remote = remotes[token.name] = remotes[token.name] || {};
var handlers = {
onmessage: function (opts) {
// opts.data
var cid = addrToId(opts);
var cstream = remote.clients[cid];
console.log("remote '" + remote.servername + " : " + remote.id + "' has data for '" + cid + "'", opts.data.byteLength);
if (!cstream) {
remote.ws.send(packer.pack(opts, null, 'error'));
return;
}
cstream.browser.write(opts.data);
}
, onend: function (opts) {
var cid = addrToId(opts);
console.log('[TunnelEnd]', cid);
handlers._onend(cid);
}
, onerror: function (opts) {
var cid = addrToId(opts);
console.log('[TunnelError]', cid);
handlers._onend(cid);
}
, _onend: function (cid) {
var c = remote.clients[cid];
delete remote.clients[cid];
try {
c.browser.end();
} catch(e) {
// ignore
}
try {
c.wrapped.end();
} catch(e) {
// ignore
}
}
};
// TODO allow more than one remote per servername // TODO allow more than one remote per servername
remote.ws = ws; remote.ws = ws;
remote.servername = token.name; remote.servername = token.name;
@ -119,27 +153,26 @@ require('cluster-store').create().then(function (store) {
// TODO how to allow a child process to communicate with this one? // TODO how to allow a child process to communicate with this one?
remote.clients = {}; remote.clients = {};
remote.handle = { address: null, handle: null }; remote.handle = { address: null, handle: null };
remote.unpacker = packer.create({ onMessage: function (opts) { remote.unpacker = packer.create(handlers);
// opts.data
var cid = addrToId(opts);
var cstream = remote.clients[cid];
console.log("remote '" + remote.servername + " : " + remote.id + "' has data for '" + cid + "'", opts.data.byteLength);
console.log('cstream.remoteAddress', cstream.remoteAddress, cstream.remoteAddress);
if (!cstream) {
// TODO end
console.log('TODO: [end] no client for', cid, opts.data.toString('utf8').substr(0, 100));
//remote.socket.write(packer.pack(opts, Buffer.from('|__END__|')));
return;
}
cstream.browser.write(opts.data);
} });
ws.on('message', function (chunk) { ws.on('message', function (chunk) {
console.log('message from home cloud to tunneler to browser', chunk.byteLength); console.log('message from home cloud to tunneler to browser', chunk.byteLength);
console.log(chunk.toString());
remote.unpacker.fns.addChunk(chunk); remote.unpacker.fns.addChunk(chunk);
}); });
ws.on('close', function () {
// the remote will handle closing its local connections
Object.keys(remote.clients).forEach(function (cid) {
try {
remote.clients[cid].browser.end();
} catch(e) {
// ignore
}
});
});
ws.on('error', function () {
// ignore
// the remote will retry if it wants to
});
store.set(token.name, remote.handle); store.set(token.name, remote.handle);
}); });
@ -152,41 +185,15 @@ require('cluster-store').create().then(function (store) {
var tls3000 = tls.createServer(tlsOpts, function (tlsSocket) { var tls3000 = tls.createServer(tlsOpts, function (tlsSocket) {
console.log('tls connection'); console.log('tls connection');
/*
tlsSocket.on('data', function (chunk) {
console.log('secure chunk');
console.log('');
console.log(chunk.toString());
console.log('');
});
*/
console.log('');
console.log('');
console.log('');
console.log('tlsSocket.*Address');
console.log('');
//console.log(tlsSocket._handle._parentWrap._handle.owner.stream);
console.log('r', tlsSocket._handle._parentWrap._handle.owner.stream.remoteAddress);
tlsSocket._remoteFamily = tlsSocket._handle._parentWrap._handle.owner.stream.remoteFamily; tlsSocket._remoteFamily = tlsSocket._handle._parentWrap._handle.owner.stream.remoteFamily;
tlsSocket._remoteAddress = tlsSocket._handle._parentWrap._handle.owner.stream.remoteAddress; tlsSocket._remoteAddress = tlsSocket._handle._parentWrap._handle.owner.stream.remoteAddress;
tlsSocket._remotePort = tlsSocket._handle._parentWrap._handle.owner.stream.remotePort; tlsSocket._remotePort = tlsSocket._handle._parentWrap._handle.owner.stream.remotePort;
console.log('r', tlsSocket.remoteAddress); // TODO BUG XXX
// https://github.com/nodejs/node/issues/8854 // https://github.com/nodejs/node/issues/8854
// tlsSocket._remoteAddress = remoteAddress; // causes core dump // tlsSocket.remoteAddress = remoteAddress; // causes core dump
// console.log(tlsSocket.remoteAddress);
console.log('s', tlsSocket._remoteAddress); console.log('s', tlsSocket._remoteAddress);
//tlsSocket.remoteFamily = tlsSocket.remoteFamily || tlsSocket._handle._parentWrap._handle.owner.stream.remoteFamily;
//tlsSocket.remoteAddress = tlsSocket.remoteAddress || tlsSocket._handle._parentWrap._handle.owner.remoteAddress;
//tlsSocket.remotePort = tlsSocket.remotePort || tlsSocket._handle._parentWrap._handle.owner.remotePort;
//console.log(tlsSocket.localAddress);
//console.log(tlsSocket.address);
httpServer.emit('connection', tlsSocket); httpServer.emit('connection', tlsSocket);
/*
tlsSocket.on('data', function (chunk) {
console.log('chunk', chunk.byteLength);
});
*/
}); });
var Dup = { var Dup = {
@ -265,6 +272,7 @@ require('cluster-store').create().then(function (store) {
var bstream = remote.clients[cid] = { var bstream = remote.clients[cid] = {
wrapped: browser.pipe(wrapForRemote) wrapped: browser.pipe(wrapForRemote)
, browser: browser , browser: browser
, address: baddress
}; };
//var bstream = remote.clients[cid] = wrapForRemote.pipe(browser); //var bstream = remote.clients[cid] = wrapForRemote.pipe(browser);
bstream.wrapped.on('data', function (pchunk) { bstream.wrapped.on('data', function (pchunk) {
@ -274,10 +282,29 @@ require('cluster-store').create().then(function (store) {
ws.send(pchunk, { binary: true }); ws.send(pchunk, { binary: true });
}); });
bstream.wrapped.on('error', function () { bstream.wrapped.on('error', function () {
// TODO send end to tunneler try {
ws.send(packer.pack(baddress, null, 'error'), { binary: true });
} catch(e) {
// ignore
}
try {
bstream.browser.end();
} catch(e) {
// ignore
}
delete remote.clients[cid]; delete remote.clients[cid];
}); });
bstream.wrapped.on('end', function () { bstream.wrapped.on('end', function () {
try {
ws.send(packer.pack(baddress, null, 'end'), { binary: true });
} catch(e) {
// ignore
}
try {
bstream.browser.end();
} catch(e) {
// ignore
}
delete remote.clients[cid]; delete remote.clients[cid];
}); });
} }