From 6cd2d0ac16bfe827acb77b5f29ca0532959227f8 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Mon, 3 Sep 2018 22:56:52 -0600 Subject: [PATCH] begin refactor TelebitRemote --- bin/telebitd.js | 211 +++++++++++++++++++++++++++++------------------- lib/remote.js | 208 ++++++++++++++++++++++++++--------------------- 2 files changed, 243 insertions(+), 176 deletions(-) diff --git a/bin/telebitd.js b/bin/telebitd.js index 2bf0237..e3f3bbd 100755 --- a/bin/telebitd.js +++ b/bin/telebitd.js @@ -75,7 +75,7 @@ try { // ignore } var controlServer; -var tun; +var myRemote; var controllers = {}; function saveConfig(cb) { @@ -352,22 +352,21 @@ function serveControlsHelper() { res.end(JSON.stringify(dumpy)); } - if (/\b(config)\b/.test(opts.pathname) && /get/i.test(req.method)) { + function getConfigOnly() { var resp = JSON.parse(JSON.stringify(state.config)); resp.version = pkg.version; res.setHeader('Content-Type', 'application/json'); res.end(JSON.stringify(resp)); - return; } // // without proper config // - function saveAndReport(err, _tun) { + function saveAndReport(err/*, _tun*/) { console.log('[DEBUG] saveAndReport config write', confpath); console.log(YAML.safeDump(snakeCopy(state.config))); if (err) { throw err; } - tun = _tun; + //myRemote = _tun; fs.writeFile(confpath, YAML.safeDump(snakeCopy(state.config)), function (err) { if (err) { res.statusCode = 500; @@ -380,7 +379,8 @@ function serveControlsHelper() { listSuccess(); }); } - if (/\b(init|config)\b/.test(opts.pathname)) { + + function initOrConfig() { var conf = {}; if (!opts.body) { res.statusCode = 422; @@ -474,28 +474,31 @@ function serveControlsHelper() { return; } - if (tun) { - console.log('ending existing tunnel, starting anew'); - tun.end(function () { - console.log('success ending'); - rawTunnel(saveAndReport); - }); - tun = null; - setTimeout(function () { - if (!tun) { - console.log('failed to end, but starting anyway'); - rawTunnel(saveAndReport); - } - }, 3000); - } else { + if (!myRemote) { console.log('no tunnel, starting anew'); - rawTunnel(saveAndReport); + if (!state.config.disable) { + startTelebitRemote(saveAndReport); + } + return; } - return; + + console.log('ending existing tunnel, starting anew'); + myRemote.end(); + myRemote.once('end', function () { + console.log('success ending'); + startTelebitRemote(saveAndReport); + }); + myRemote = null; + setTimeout(function () { + if (!myRemote) { + console.log('failed to end, but starting anyway'); + startTelebitRemote(saveAndReport); + } + }, 3000); } - if (/restart/.test(opts.pathname)) { - tun.end(); + function restart() { + if (myRemote) { myRemote.end(); } res.setHeader('Content-Type', 'application/json'); res.end(JSON.stringify({ success: true })); controlServer.close(function () { @@ -505,35 +508,17 @@ function serveControlsHelper() { process.exit(22); // use non-success exit code }); }); - return; } - // - // Check for proper config - // - if (!state.config.relay || !state.config.email || !state.config.agreeTos) { + function invalidConfig() { res.statusCode = 400; res.setHeader('Content-Type', 'application/json'); res.end(JSON.stringify({ error: { code: "E_CONFIG", message: "Invalid config file. Please run 'telebit init'" } })); - return; } - // - // With proper config - // - if (/http/.test(opts.pathname)) { - controllers.http(req, res, opts); - return; - } - - if (/tcp/.test(opts.pathname)) { - controllers.tcp(req, res, opts); - return; - } - - if (/save|commit/.test(opts.pathname)) { + function saveAndCommit() { state.config.servernames = state.servernames; state.config.ports = state.ports; fs.writeFile(confpath, YAML.safeDump(snakeCopy(state.config)), function (err) { @@ -547,23 +532,17 @@ function serveControlsHelper() { } listSuccess(); }); - return; } - if (/ssh/.test(opts.pathname)) { - controllers.ssh(req, res, opts); - return; - } - - if (/enable/.test(opts.pathname)) { + function enable() { delete state.config.disable;// = undefined; - if (tun) { + if (myRemote) { listSuccess(); return; } - rawTunnel(function (err, _tun) { + startTelebitRemote(function (err/*, _tun*/) { if (err) { throw err; } - tun = _tun; + //myRemote = _tun; fs.writeFile(confpath, YAML.safeDump(snakeCopy(state.config)), function (err) { if (err) { res.statusCode = 500; @@ -576,12 +555,11 @@ function serveControlsHelper() { listSuccess(); }); }); - return; } - if (/disable/.test(opts.pathname)) { + function disable() { state.config.disable = true; - if (tun) { tun.end(); tun = null; } + if (myRemote) { myRemote.end(); myRemote = null; } fs.writeFile(confpath, YAML.safeDump(snakeCopy(state.config)), function (err) { res.setHeader('Content-Type', 'application/json'); if (err) { @@ -593,23 +571,71 @@ function serveControlsHelper() { } res.end('{"success":true}'); }); - return; } - if (/status/.test(opts.pathname)) { + function getStatus() { res.setHeader('Content-Type', 'application/json'); res.end(JSON.stringify( { status: (state.config.disable ? 'disabled' : 'enabled') , ready: ((state.config.relay && (state.config.token || state.config.agreeTos)) ? true : false) - , active: !!tun + , active: !!myRemote , connected: 'maybe (todo)' , version: pkg.version , servernames: state.servernames } )); - return; } + if (/\b(config)\b/.test(opts.pathname) && /get/i.test(req.method)) { + getConfigOnly(); + return; + } + if (/\b(init|config)\b/.test(opts.pathname)) { + initOrConfig(); + return; + } + if (/restart/.test(opts.pathname)) { + restart(); + return; + } + // + // Check for proper config + // + if (!state.config.relay || !state.config.email || !state.config.agreeTos) { + invalidConfig(); + return; + } + // + // With proper config + // + if (/http/.test(opts.pathname)) { + controllers.http(req, res, opts); + return; + } + if (/tcp/.test(opts.pathname)) { + controllers.tcp(req, res, opts); + return; + } + if (/save|commit/.test(opts.pathname)) { + saveAndCommit(); + return; + } + if (/ssh/.test(opts.pathname)) { + controllers.ssh(req, res, opts); + return; + } + if (/enable/.test(opts.pathname)) { + enable(); + return; + } + if (/disable/.test(opts.pathname)) { + disable(); + return; + } + if (/status/.test(opts.pathname)) { + getStatus(); + return; + } if (/list/.test(opts.pathname)) { listSuccess(); return; @@ -618,7 +644,9 @@ function serveControlsHelper() { res.setHeader('Content-Type', 'application/json'); res.end(JSON.stringify({"error":{"message":"unrecognized rpc"}})); }); + if (fs.existsSync(state._ipc.path)) { + console.log("DEBUG ipc path unlink"); fs.unlinkSync(state._ipc.path); } // mask is so that processes owned by other users @@ -630,15 +658,18 @@ function serveControlsHelper() { , exclusive: false }; if ('socket' === state._ipc.type) { + console.log("DEBUG ipc path make"); require('mkdirp').sync(path.dirname(state._ipc.path)); } // https://nodejs.org/api/net.html#net_server_listen_options_callback // path is ignore if port is defined // https://git.coolaj86.com/coolaj86/telebit.js/issues/23#issuecomment-326 if (state._ipc.port) { + console.log("DEBUG ipc localhost"); serverOpts.host = 'localhost'; serverOpts.port = state._ipc.port; } else { + console.log("DEBUG ipc socket path"); serverOpts.path = state._ipc.path; } controlServer.listen(serverOpts, function () { @@ -655,25 +686,26 @@ function serveControlsHelper() { function serveControls() { if (state.config.disable) { console.info("[info] starting disabled"); + serveControlsHelper(); return; } - if (state.config.relay && (state.config.token || state.config.pretoken)) { - console.info("[info] connecting with stored token"); - rawTunnel(function (err, _tun) { - if (err) { throw err; } - if (_tun) { tun = _tun; } - setTimeout(function () { - // TODO attach handler to tunnel - serveControlsHelper(); - }, 150); - }); - return; - } else { + if (!(state.config.relay && (state.config.token || state.config.pretoken))) { console.info("[info] waiting for init/authentication (missing relay and/or token)"); + serveControlsHelper(); + return; } - serveControlsHelper(); + console.info("[info] connecting with stored token"); + startTelebitRemote(function (err/*, _tun*/) { + console.log("DEBUG going to serve controls soon..."); + if (err) { throw err; } + //if (_tun) { myRemote = _tun; } + setTimeout(function () { + // TODO attach handler to tunnel + serveControlsHelper(); + }, 150); + }); } function parseConfig(err, text) { @@ -735,7 +767,7 @@ function parseConfig(err, text) { } } -function rawTunnel(rawCb) { +function startTelebitRemote(rawCb) { if (state.config.disable || !state.config.relay || !(state.config.token || state.config.agreeTos)) { rawCb(null, null); return; @@ -752,11 +784,12 @@ function rawTunnel(rawCb) { return; } - if (tun) { - rawCb(null, tun); + if (myRemote) { + rawCb(null, myRemote); return; } + // get the wss url common.api.wss(state, function (err, wss) { if (err) { rawCb(err); return; } state.wss = wss; @@ -771,7 +804,7 @@ function rawTunnel(rawCb) { // TODO sortingHat.print(); ? // TODO Check undefined vs false for greenlock config - var remote = require('../'); + var TelebitRemote = require('../').TelebitRemote; state.greenlockConfig = { version: state.greenlockConf.version || 'draft-11' @@ -808,7 +841,7 @@ function rawTunnel(rawCb) { // { relay, config, servernames, ports, sortingHat, net, insecure, token, handlers, greenlockConfig } console.log("[DEBUG] token", typeof token, token); - tun = remote.connect({ + myRemote = TelebitRemote.createConnection({ relay: state.relay , wss: state.wss , config: state.config @@ -821,9 +854,20 @@ function rawTunnel(rawCb) { , ports: state.ports , handlers: state.handlers , greenlockConfig: state.greenlockConfig + }, function () { + rawCb(null, myRemote); + }); + myRemote.once('error', function (err) { + // Likely causes: + // * DNS lookup failed (no Internet) + // * Rejected (bad authn) + if ('function' === typeof rawCb) { + rawCb(err); + } else { + console.error('Unhandled TelebitRemote Error:'); + console.error(err); + } }); - - rawCb(null, tun); }); } @@ -891,8 +935,8 @@ function sigHandler() { // that prevents us from exitting, in which case we want the user to be able to send // the signal again and exit the way it normally would. process.removeListener('SIGINT', sigHandler); - if (tun) { - tun.end(); + if (myRemote) { + myRemote.end(); } if (controlServer) { controlServer.close(); @@ -914,6 +958,7 @@ state.net = state.net || { } }; +console.log('DEBUG parse config'); fs.readFile(confpath, 'utf8', parseConfig); }()); diff --git a/lib/remote.js b/lib/remote.js index bb6d582..8a34060 100644 --- a/lib/remote.js +++ b/lib/remote.js @@ -11,6 +11,7 @@ var WebSocket = require('ws'); var sni = require('sni'); var Packer = require('proxy-packer'); var os = require('os'); +var EventEmitter = require('events').EventEmitter; function timeoutPromise(duration) { return new PromiseA(function (resolve) { @@ -18,8 +19,15 @@ function timeoutPromise(duration) { }); } -function _connect(state) { +function TelebitRemote(state) { // jshint latedef:false + + if (!(this instanceof TelebitRemote)) { + return new TelebitRemote(state); + } + EventEmitter.call(this); + var me = this; + var defaultHttpTimeout = (2 * 60); var activityTimeout = state.activityTimeout || (defaultHttpTimeout - 5) * 1000; var pongTimeout = state.pongTimeout || 10*1000; @@ -315,7 +323,7 @@ function _connect(state) { if (clientHandlers.write(cid, tun)) { return; } - wstunneler.pause(); + wstunneler._socket.pause(); require(state.sortingHat).assign(state, tun, function (err, conn) { if (err) { err.message = err.message.replace(/:tun_id/, tun._id); @@ -324,7 +332,7 @@ function _connect(state) { } clientHandlers.add(conn, cid, tun); if (tun.data) { conn.write(tun.data); } - wstunneler.resume(); + wstunneler._socket.resume(); }); } @@ -512,7 +520,8 @@ function _connect(state) { } }; - function connect() { + var connPromise; + me.connect = function connect() { if (wstunneler) { console.warn('attempted to connect with connection already active'); return; @@ -540,7 +549,11 @@ function _connect(state) { console.info("[connect] '" + (state.wss || state.relay) + "'"); var tunnelUrl = (state.wss || state.relay).replace(/\/$/, '') + '/'; // + auth; wstunneler = new WebSocket(tunnelUrl, { rejectUnauthorized: !state.insecure }); - wstunneler.on('open', wsHandlers.onOpen); + // XXXXXX + wstunneler.on('open', function () { + me.emit('connect'); + wsHandlers.onOpen(); + }); wstunneler.on('close', wsHandlers.onClose); wstunneler.on('error', wsHandlers.onError); @@ -555,108 +568,117 @@ function _connect(state) { } machine.fns.addChunk(data, flags); }); - } - connect(); + }; + me.end = function() { + tokens.length = 0; + if (timeoutId) { + clearTimeout(timeoutId); + timeoutId = null; + } - var connPromise; - return { - end: function(cb) { - tokens.length = 0; + if (wstunneler) { + try { + wstunneler.close(1000, 're-connect'); + wstunneler.on('close', function () { + me.emit('end'); + }); + } catch(e) { + console.error("[error] wstunneler.close()"); + console.error(e); + } + } + }; + me.authz = me.append = function (token) { + if (!token) { + throw new Error("attempted to append empty token"); + } + if ('undefined' === token) { + throw new Error("attempted to append token as the string 'undefined'"); + } + if (tokens.indexOf(token) >= 0) { + return PromiseA.resolve(); + } + tokens.push(token); + var prom; + if (tokens.length === 1 && !wstunneler) { + // We just added the only token in the list, and the websocket connection isn't up + // so we need to restart the connection. if (timeoutId) { + // Handle the case were the last token was removed and this token added between + // reconnect attempts to make sure we don't try openning multiple connections. clearTimeout(timeoutId); timeoutId = null; } - if (wstunneler) { - try { - wstunneler.close(cb); - } catch(e) { - console.error("[error] wstunneler.close()"); - console.error(e); - } - } + // We want this case to behave as much like the other case as we can, but we don't have + // the same kind of reponses when we open brand new connections, so we have to rely on + // the 'hello' and the 'un-associated' error commands to determine if the token is good. + prom = connPromise = new PromiseA(function (resolve, reject) { + connCallback = function (err) { + connCallback = null; + connPromise = null; + if (err) { + reject(err); + } else { + resolve(); + } + }; + }); + connect(); } - , append: function (token) { - if (!token) { - throw new Error("attempted to append empty token"); - } - if ('undefined' === token) { - throw new Error("attempted to append token as the string 'undefined'"); - } - if (tokens.indexOf(token) >= 0) { + else if (connPromise) { + prom = connPromise.then(function () { + return sendCommand('add_token', token); + }); + } + else { + prom = sendCommand('add_token', token); + } + + prom.catch(function (err) { + console.error('adding token', token, 'failed:', err); + // Most probably an invalid token of some kind, so we don't really want to keep it. + tokens.splice(tokens.indexOf(token), 1); + }); + + return prom; + }; + me.clear = function (token) { + if (typeof token === 'undefined') { + token = '*'; + } + + if (token === '*') { + tokens.length = 0; + } else { + var index = tokens.indexOf(token); + if (index < 0) { return PromiseA.resolve(); } - tokens.push(token); - var prom; - if (tokens.length === 1 && !wstunneler) { - // We just added the only token in the list, and the websocket connection isn't up - // so we need to restart the connection. - if (timeoutId) { - // Handle the case were the last token was removed and this token added between - // reconnect attempts to make sure we don't try openning multiple connections. - clearTimeout(timeoutId); - timeoutId = null; - } - - // We want this case to behave as much like the other case as we can, but we don't have - // the same kind of reponses when we open brand new connections, so we have to rely on - // the 'hello' and the 'un-associated' error commands to determine if the token is good. - prom = connPromise = new PromiseA(function (resolve, reject) { - connCallback = function (err) { - connCallback = null; - connPromise = null; - if (err) { - reject(err); - } else { - resolve(); - } - }; - }); - connect(); - } - else if (connPromise) { - prom = connPromise.then(function () { - return sendCommand('add_token', token); - }); - } - else { - prom = sendCommand('add_token', token); - } - - prom.catch(function (err) { - console.error('adding token', token, 'failed:', err); - // Most probably an invalid token of some kind, so we don't really want to keep it. - tokens.splice(tokens.indexOf(token), 1); - }); - - return prom; + tokens.splice(index); } - , clear: function (token) { - if (typeof token === 'undefined') { - token = '*'; - } - if (token === '*') { - tokens.length = 0; - } else { - var index = tokens.indexOf(token); - if (index < 0) { - return PromiseA.resolve(); - } - tokens.splice(index); - } + var prom = sendCommand('delete_token', token); + prom.catch(function (err) { + console.error('clearing token', token, 'failed:', err); + }); - var prom = sendCommand('delete_token', token); - prom.catch(function (err) { - console.error('clearing token', token, 'failed:', err); - }); - - return prom; - } + return prom; }; } +TelebitRemote.prototype = EventEmitter.prototype; -module.exports.connect = _connect; -module.exports.createConnection = _connect; +TelebitRemote.create = function (opts) { + return new TelebitRemote(opts); +}; +TelebitRemote.createConnection = function (opts, cb) { + var tunnel = TelebitRemote.create(opts); + tunnel.connect(opts); + tunnel.once('connect', cb); + return tunnel; +}; +TelebitRemote.connect = TelebitRemote.createConnection; + +module.exports.TelebitRemote = TelebitRemote; }());