begin refactor TelebitRemote

This commit is contained in:
AJ ONeal 2018-09-03 22:56:52 -06:00
parent 4870cd1ee0
commit 6cd2d0ac16
2 changed files with 243 additions and 176 deletions

View File

@ -75,7 +75,7 @@ try {
// ignore
}
var controlServer;
var tun;
var myRemote;
var controllers = {};
function saveConfig(cb) {
@ -352,22 +352,21 @@ function serveControlsHelper() {
res.end(JSON.stringify(dumpy));
}
if (/\b(config)\b/.test(opts.pathname) && /get/i.test(req.method)) {
function getConfigOnly() {
var resp = JSON.parse(JSON.stringify(state.config));
resp.version = pkg.version;
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(resp));
return;
}
//
// without proper config
//
function saveAndReport(err, _tun) {
function saveAndReport(err/*, _tun*/) {
console.log('[DEBUG] saveAndReport config write', confpath);
console.log(YAML.safeDump(snakeCopy(state.config)));
if (err) { throw err; }
tun = _tun;
//myRemote = _tun;
fs.writeFile(confpath, YAML.safeDump(snakeCopy(state.config)), function (err) {
if (err) {
res.statusCode = 500;
@ -380,7 +379,8 @@ function serveControlsHelper() {
listSuccess();
});
}
if (/\b(init|config)\b/.test(opts.pathname)) {
function initOrConfig() {
var conf = {};
if (!opts.body) {
res.statusCode = 422;
@ -474,28 +474,31 @@ function serveControlsHelper() {
return;
}
if (tun) {
console.log('ending existing tunnel, starting anew');
tun.end(function () {
console.log('success ending');
rawTunnel(saveAndReport);
});
tun = null;
setTimeout(function () {
if (!tun) {
console.log('failed to end, but starting anyway');
rawTunnel(saveAndReport);
}
}, 3000);
} else {
if (!myRemote) {
console.log('no tunnel, starting anew');
rawTunnel(saveAndReport);
if (!state.config.disable) {
startTelebitRemote(saveAndReport);
}
return;
}
return;
console.log('ending existing tunnel, starting anew');
myRemote.end();
myRemote.once('end', function () {
console.log('success ending');
startTelebitRemote(saveAndReport);
});
myRemote = null;
setTimeout(function () {
if (!myRemote) {
console.log('failed to end, but starting anyway');
startTelebitRemote(saveAndReport);
}
}, 3000);
}
if (/restart/.test(opts.pathname)) {
tun.end();
function restart() {
if (myRemote) { myRemote.end(); }
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify({ success: true }));
controlServer.close(function () {
@ -505,35 +508,17 @@ function serveControlsHelper() {
process.exit(22); // use non-success exit code
});
});
return;
}
//
// Check for proper config
//
if (!state.config.relay || !state.config.email || !state.config.agreeTos) {
function invalidConfig() {
res.statusCode = 400;
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify({
error: { code: "E_CONFIG", message: "Invalid config file. Please run 'telebit init'" }
}));
return;
}
//
// With proper config
//
if (/http/.test(opts.pathname)) {
controllers.http(req, res, opts);
return;
}
if (/tcp/.test(opts.pathname)) {
controllers.tcp(req, res, opts);
return;
}
if (/save|commit/.test(opts.pathname)) {
function saveAndCommit() {
state.config.servernames = state.servernames;
state.config.ports = state.ports;
fs.writeFile(confpath, YAML.safeDump(snakeCopy(state.config)), function (err) {
@ -547,23 +532,17 @@ function serveControlsHelper() {
}
listSuccess();
});
return;
}
if (/ssh/.test(opts.pathname)) {
controllers.ssh(req, res, opts);
return;
}
if (/enable/.test(opts.pathname)) {
function enable() {
delete state.config.disable;// = undefined;
if (tun) {
if (myRemote) {
listSuccess();
return;
}
rawTunnel(function (err, _tun) {
startTelebitRemote(function (err/*, _tun*/) {
if (err) { throw err; }
tun = _tun;
//myRemote = _tun;
fs.writeFile(confpath, YAML.safeDump(snakeCopy(state.config)), function (err) {
if (err) {
res.statusCode = 500;
@ -576,12 +555,11 @@ function serveControlsHelper() {
listSuccess();
});
});
return;
}
if (/disable/.test(opts.pathname)) {
function disable() {
state.config.disable = true;
if (tun) { tun.end(); tun = null; }
if (myRemote) { myRemote.end(); myRemote = null; }
fs.writeFile(confpath, YAML.safeDump(snakeCopy(state.config)), function (err) {
res.setHeader('Content-Type', 'application/json');
if (err) {
@ -593,23 +571,71 @@ function serveControlsHelper() {
}
res.end('{"success":true}');
});
return;
}
if (/status/.test(opts.pathname)) {
function getStatus() {
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(
{ status: (state.config.disable ? 'disabled' : 'enabled')
, ready: ((state.config.relay && (state.config.token || state.config.agreeTos)) ? true : false)
, active: !!tun
, active: !!myRemote
, connected: 'maybe (todo)'
, version: pkg.version
, servernames: state.servernames
}
));
return;
}
if (/\b(config)\b/.test(opts.pathname) && /get/i.test(req.method)) {
getConfigOnly();
return;
}
if (/\b(init|config)\b/.test(opts.pathname)) {
initOrConfig();
return;
}
if (/restart/.test(opts.pathname)) {
restart();
return;
}
//
// Check for proper config
//
if (!state.config.relay || !state.config.email || !state.config.agreeTos) {
invalidConfig();
return;
}
//
// With proper config
//
if (/http/.test(opts.pathname)) {
controllers.http(req, res, opts);
return;
}
if (/tcp/.test(opts.pathname)) {
controllers.tcp(req, res, opts);
return;
}
if (/save|commit/.test(opts.pathname)) {
saveAndCommit();
return;
}
if (/ssh/.test(opts.pathname)) {
controllers.ssh(req, res, opts);
return;
}
if (/enable/.test(opts.pathname)) {
enable();
return;
}
if (/disable/.test(opts.pathname)) {
disable();
return;
}
if (/status/.test(opts.pathname)) {
getStatus();
return;
}
if (/list/.test(opts.pathname)) {
listSuccess();
return;
@ -618,7 +644,9 @@ function serveControlsHelper() {
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify({"error":{"message":"unrecognized rpc"}}));
});
if (fs.existsSync(state._ipc.path)) {
console.log("DEBUG ipc path unlink");
fs.unlinkSync(state._ipc.path);
}
// mask is so that processes owned by other users
@ -630,15 +658,18 @@ function serveControlsHelper() {
, exclusive: false
};
if ('socket' === state._ipc.type) {
console.log("DEBUG ipc path make");
require('mkdirp').sync(path.dirname(state._ipc.path));
}
// https://nodejs.org/api/net.html#net_server_listen_options_callback
// path is ignore if port is defined
// https://git.coolaj86.com/coolaj86/telebit.js/issues/23#issuecomment-326
if (state._ipc.port) {
console.log("DEBUG ipc localhost");
serverOpts.host = 'localhost';
serverOpts.port = state._ipc.port;
} else {
console.log("DEBUG ipc socket path");
serverOpts.path = state._ipc.path;
}
controlServer.listen(serverOpts, function () {
@ -655,25 +686,26 @@ function serveControlsHelper() {
function serveControls() {
if (state.config.disable) {
console.info("[info] starting disabled");
serveControlsHelper();
return;
}
if (state.config.relay && (state.config.token || state.config.pretoken)) {
console.info("[info] connecting with stored token");
rawTunnel(function (err, _tun) {
if (err) { throw err; }
if (_tun) { tun = _tun; }
setTimeout(function () {
// TODO attach handler to tunnel
serveControlsHelper();
}, 150);
});
return;
} else {
if (!(state.config.relay && (state.config.token || state.config.pretoken))) {
console.info("[info] waiting for init/authentication (missing relay and/or token)");
serveControlsHelper();
return;
}
serveControlsHelper();
console.info("[info] connecting with stored token");
startTelebitRemote(function (err/*, _tun*/) {
console.log("DEBUG going to serve controls soon...");
if (err) { throw err; }
//if (_tun) { myRemote = _tun; }
setTimeout(function () {
// TODO attach handler to tunnel
serveControlsHelper();
}, 150);
});
}
function parseConfig(err, text) {
@ -735,7 +767,7 @@ function parseConfig(err, text) {
}
}
function rawTunnel(rawCb) {
function startTelebitRemote(rawCb) {
if (state.config.disable || !state.config.relay || !(state.config.token || state.config.agreeTos)) {
rawCb(null, null);
return;
@ -752,11 +784,12 @@ function rawTunnel(rawCb) {
return;
}
if (tun) {
rawCb(null, tun);
if (myRemote) {
rawCb(null, myRemote);
return;
}
// get the wss url
common.api.wss(state, function (err, wss) {
if (err) { rawCb(err); return; }
state.wss = wss;
@ -771,7 +804,7 @@ function rawTunnel(rawCb) {
// TODO sortingHat.print(); ?
// TODO Check undefined vs false for greenlock config
var remote = require('../');
var TelebitRemote = require('../').TelebitRemote;
state.greenlockConfig = {
version: state.greenlockConf.version || 'draft-11'
@ -808,7 +841,7 @@ function rawTunnel(rawCb) {
// { relay, config, servernames, ports, sortingHat, net, insecure, token, handlers, greenlockConfig }
console.log("[DEBUG] token", typeof token, token);
tun = remote.connect({
myRemote = TelebitRemote.createConnection({
relay: state.relay
, wss: state.wss
, config: state.config
@ -821,9 +854,20 @@ function rawTunnel(rawCb) {
, ports: state.ports
, handlers: state.handlers
, greenlockConfig: state.greenlockConfig
}, function () {
rawCb(null, myRemote);
});
myRemote.once('error', function (err) {
// Likely causes:
// * DNS lookup failed (no Internet)
// * Rejected (bad authn)
if ('function' === typeof rawCb) {
rawCb(err);
} else {
console.error('Unhandled TelebitRemote Error:');
console.error(err);
}
});
rawCb(null, tun);
});
}
@ -891,8 +935,8 @@ function sigHandler() {
// that prevents us from exitting, in which case we want the user to be able to send
// the signal again and exit the way it normally would.
process.removeListener('SIGINT', sigHandler);
if (tun) {
tun.end();
if (myRemote) {
myRemote.end();
}
if (controlServer) {
controlServer.close();
@ -914,6 +958,7 @@ state.net = state.net || {
}
};
console.log('DEBUG parse config');
fs.readFile(confpath, 'utf8', parseConfig);
}());

View File

@ -11,6 +11,7 @@ var WebSocket = require('ws');
var sni = require('sni');
var Packer = require('proxy-packer');
var os = require('os');
var EventEmitter = require('events').EventEmitter;
function timeoutPromise(duration) {
return new PromiseA(function (resolve) {
@ -18,8 +19,15 @@ function timeoutPromise(duration) {
});
}
function _connect(state) {
function TelebitRemote(state) {
// jshint latedef:false
if (!(this instanceof TelebitRemote)) {
return new TelebitRemote(state);
}
EventEmitter.call(this);
var me = this;
var defaultHttpTimeout = (2 * 60);
var activityTimeout = state.activityTimeout || (defaultHttpTimeout - 5) * 1000;
var pongTimeout = state.pongTimeout || 10*1000;
@ -315,7 +323,7 @@ function _connect(state) {
if (clientHandlers.write(cid, tun)) { return; }
wstunneler.pause();
wstunneler._socket.pause();
require(state.sortingHat).assign(state, tun, function (err, conn) {
if (err) {
err.message = err.message.replace(/:tun_id/, tun._id);
@ -324,7 +332,7 @@ function _connect(state) {
}
clientHandlers.add(conn, cid, tun);
if (tun.data) { conn.write(tun.data); }
wstunneler.resume();
wstunneler._socket.resume();
});
}
@ -512,7 +520,8 @@ function _connect(state) {
}
};
function connect() {
var connPromise;
me.connect = function connect() {
if (wstunneler) {
console.warn('attempted to connect with connection already active');
return;
@ -540,7 +549,11 @@ function _connect(state) {
console.info("[connect] '" + (state.wss || state.relay) + "'");
var tunnelUrl = (state.wss || state.relay).replace(/\/$/, '') + '/'; // + auth;
wstunneler = new WebSocket(tunnelUrl, { rejectUnauthorized: !state.insecure });
wstunneler.on('open', wsHandlers.onOpen);
// XXXXXX
wstunneler.on('open', function () {
me.emit('connect');
wsHandlers.onOpen();
});
wstunneler.on('close', wsHandlers.onClose);
wstunneler.on('error', wsHandlers.onError);
@ -555,108 +568,117 @@ function _connect(state) {
}
machine.fns.addChunk(data, flags);
});
}
connect();
};
me.end = function() {
tokens.length = 0;
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
var connPromise;
return {
end: function(cb) {
tokens.length = 0;
if (wstunneler) {
try {
wstunneler.close(1000, 're-connect');
wstunneler.on('close', function () {
me.emit('end');
});
} catch(e) {
console.error("[error] wstunneler.close()");
console.error(e);
}
}
};
me.authz = me.append = function (token) {
if (!token) {
throw new Error("attempted to append empty token");
}
if ('undefined' === token) {
throw new Error("attempted to append token as the string 'undefined'");
}
if (tokens.indexOf(token) >= 0) {
return PromiseA.resolve();
}
tokens.push(token);
var prom;
if (tokens.length === 1 && !wstunneler) {
// We just added the only token in the list, and the websocket connection isn't up
// so we need to restart the connection.
if (timeoutId) {
// Handle the case were the last token was removed and this token added between
// reconnect attempts to make sure we don't try openning multiple connections.
clearTimeout(timeoutId);
timeoutId = null;
}
if (wstunneler) {
try {
wstunneler.close(cb);
} catch(e) {
console.error("[error] wstunneler.close()");
console.error(e);
}
}
// We want this case to behave as much like the other case as we can, but we don't have
// the same kind of reponses when we open brand new connections, so we have to rely on
// the 'hello' and the 'un-associated' error commands to determine if the token is good.
prom = connPromise = new PromiseA(function (resolve, reject) {
connCallback = function (err) {
connCallback = null;
connPromise = null;
if (err) {
reject(err);
} else {
resolve();
}
};
});
connect();
}
, append: function (token) {
if (!token) {
throw new Error("attempted to append empty token");
}
if ('undefined' === token) {
throw new Error("attempted to append token as the string 'undefined'");
}
if (tokens.indexOf(token) >= 0) {
else if (connPromise) {
prom = connPromise.then(function () {
return sendCommand('add_token', token);
});
}
else {
prom = sendCommand('add_token', token);
}
prom.catch(function (err) {
console.error('adding token', token, 'failed:', err);
// Most probably an invalid token of some kind, so we don't really want to keep it.
tokens.splice(tokens.indexOf(token), 1);
});
return prom;
};
me.clear = function (token) {
if (typeof token === 'undefined') {
token = '*';
}
if (token === '*') {
tokens.length = 0;
} else {
var index = tokens.indexOf(token);
if (index < 0) {
return PromiseA.resolve();
}
tokens.push(token);
var prom;
if (tokens.length === 1 && !wstunneler) {
// We just added the only token in the list, and the websocket connection isn't up
// so we need to restart the connection.
if (timeoutId) {
// Handle the case were the last token was removed and this token added between
// reconnect attempts to make sure we don't try openning multiple connections.
clearTimeout(timeoutId);
timeoutId = null;
}
// We want this case to behave as much like the other case as we can, but we don't have
// the same kind of reponses when we open brand new connections, so we have to rely on
// the 'hello' and the 'un-associated' error commands to determine if the token is good.
prom = connPromise = new PromiseA(function (resolve, reject) {
connCallback = function (err) {
connCallback = null;
connPromise = null;
if (err) {
reject(err);
} else {
resolve();
}
};
});
connect();
}
else if (connPromise) {
prom = connPromise.then(function () {
return sendCommand('add_token', token);
});
}
else {
prom = sendCommand('add_token', token);
}
prom.catch(function (err) {
console.error('adding token', token, 'failed:', err);
// Most probably an invalid token of some kind, so we don't really want to keep it.
tokens.splice(tokens.indexOf(token), 1);
});
return prom;
tokens.splice(index);
}
, clear: function (token) {
if (typeof token === 'undefined') {
token = '*';
}
if (token === '*') {
tokens.length = 0;
} else {
var index = tokens.indexOf(token);
if (index < 0) {
return PromiseA.resolve();
}
tokens.splice(index);
}
var prom = sendCommand('delete_token', token);
prom.catch(function (err) {
console.error('clearing token', token, 'failed:', err);
});
var prom = sendCommand('delete_token', token);
prom.catch(function (err) {
console.error('clearing token', token, 'failed:', err);
});
return prom;
}
return prom;
};
}
TelebitRemote.prototype = EventEmitter.prototype;
module.exports.connect = _connect;
module.exports.createConnection = _connect;
TelebitRemote.create = function (opts) {
return new TelebitRemote(opts);
};
TelebitRemote.createConnection = function (opts, cb) {
var tunnel = TelebitRemote.create(opts);
tunnel.connect(opts);
tunnel.once('connect', cb);
return tunnel;
};
TelebitRemote.connect = TelebitRemote.createConnection;
module.exports.TelebitRemote = TelebitRemote;
}());