diff --git a/index.js b/index.js index b5cbba2..b7a27ad 100644 --- a/index.js +++ b/index.js @@ -11,20 +11,56 @@ Packer.create = function (opts) { machine = {}; } - machine.onMessage = opts.onmessage || opts.onMessage; machine.onmessage = opts.onmessage || opts.onMessage; - machine.onError = opts.onerror || opts.onError; + machine.oncontrol = opts.oncontrol || opts.onControl; machine.onerror = opts.onerror || opts.onError; - machine.onEnd = opts.onend || opts.onEnd; machine.onend = opts.onend || opts.onEnd; machine._version = 1; - machine.state = 0; - machine.states = { 0: 'version', 1: 'headerLength', 2: 'header', 3: 'data'/*, 4: 'error'*/ }; - machine.states_length = Object.keys(machine.states).length; - machine.chunkIndex = 0; machine.fns = {}; + machine.chunkIndex = 0; + machine.buf = null; + machine.bufIndex = 0; + machine.fns.collectData = function (chunk, size) { + var chunkLeft = chunk.length - machine.chunkIndex; + + if (size <= 0) { + return Buffer.alloc(0); + } + + // First handle case where we don't have all the data we need yet. We need to save + // what we have in a buffer, and increment the index for both the buffer and the chunk. + if (machine.bufIndex + chunkLeft < size) { + if (!machine.buf) { + machine.buf = Buffer.alloc(size); + } + chunk.copy(machine.buf, machine.bufIndex, machine.chunkIndex); + machine.bufIndex += chunkLeft; + machine.chunkIndex += chunkLeft; + + return null; + } + + // Read and mark as read however much data we need from the chunk to complete our buffer. + var partLen = size - machine.bufIndex; + var part = chunk.slice(machine.chunkIndex, machine.chunkIndex+partLen); + machine.chunkIndex += partLen; + + // If we had nothing buffered than the part of the chunk we just read is all we need. + if (!machine.buf) { + return part; + } + + // Otherwise we need to copy the new data into the buffer. + part.copy(machine.buf, machine.bufIndex); + // Before returning the buffer we need to clear our reference to it. + var buf = machine.buf; + machine.buf = null; + machine.bufIndex = 0; + return buf; + }; + machine.fns.version = function (chunk) { //console.log(''); //console.log('[version]'); @@ -48,118 +84,58 @@ Packer.create = function (opts) { return true; }; - - machine.buf = null; - machine.bufIndex = 0; - //var buf = Buffer.alloc(4096); machine.fns.header = function (chunk) { //console.log(''); //console.log('[header]'); - var curSize = machine.bufIndex + (chunk.length - machine.chunkIndex); - var partLen = 0; - var str = ''; - var part; - - if (curSize < machine.headerLen) { - // I still don't have the whole header, - // so just create a large enough buffer, - // write these bits, and wait for the next chunk. - if (!machine.buf) { - machine.buf = Buffer.alloc(machine.headerLen); - } - - // partLen should be no more than the available size - partLen = Math.min(machine.headerLen - machine.bufIndex, chunk.length - machine.chunkIndex); - part = chunk.slice(machine.chunkIndex, machine.chunkIndex + partLen); - chunk.copy(machine.buf, machine.bufIndex, machine.chunkIndex, machine.chunkIndex + partLen); - machine.chunkIndex += partLen; // this MUST be chunk.length - machine.bufIndex += partLen; + var header = machine.fns.collectData(chunk, machine.headerLen); + // We don't have the entire header yet so return false. + if (!header) { return false; } - else { - // it's now ready to discover the whole header - if (machine.buf) { - str += machine.buf.slice(0, machine.bufIndex).toString(); - } - partLen = machine.headerLen - str.length; - part = chunk.slice(machine.chunkIndex, machine.chunkIndex + partLen); - str += part.toString(); + machine._headers = header.toString().split(/,/g); - machine.chunkIndex += partLen; - machine.buf = null; // back to null - machine.bufIndex = 0; // back to 0 + machine.family = machine._headers[0]; + machine.address = machine._headers[1]; + machine.port = machine._headers[2]; + machine.bodyLen = parseInt(machine._headers[3], 10) || 0; + machine.service = machine._headers[4]; + //console.log('machine.service', machine.service); - machine._headers = str.split(/,/g); - - machine.family = machine._headers[0]; - machine.address = machine._headers[1]; - machine.port = machine._headers[2]; - machine.bodyLen = parseInt(machine._headers[3], 10) || -1; - machine.service = machine._headers[4]; - //console.log('machine.service', machine.service); - - return true; - } + return true; }; machine.fns.data = function (chunk) { //console.log(''); //console.log('[data]'); - var curSize = machine.bufIndex + (chunk.length - machine.chunkIndex); - //console.log('curSize:', curSize); - //console.log('bodyLen:', machine.bodyLen, typeof machine.bodyLen); - var partLen = 0; - var msg; - var data; - - partLen = Math.min(machine.bodyLen - machine.bufIndex, chunk.length - machine.chunkIndex); - - if (curSize < machine.bodyLen) { - //console.log('curSize < bodyLen'); - - // I still don't have the whole header, - // so just create a large enough buffer, - // write these bits, and wait for the next chunk. - if (!machine.buf) { - machine.buf = Buffer.alloc(machine.bodyLen); - } - - chunk.copy(machine.buf, machine.bufIndex, machine.chunkIndex, machine.chunkIndex + partLen); - machine.chunkIndex += partLen; // this MUST be chunk.length - machine.bufIndex += partLen; + var data = machine.fns.collectData(chunk, machine.bodyLen); + // We don't have the entire body yet so return false. + if (!data) { return false; } - if (machine.bufIndex > 0) { - // the completing remainder of the body is in the current slice - chunk.copy(machine.buf, machine.bufIndex, machine.chunkIndex, machine.chunkIndex + partLen); - } - else { - // the whole body is in the current slice - machine.buf = chunk.slice(machine.chunkIndex, machine.chunkIndex + partLen); - } - machine.bufIndex += partLen; - - machine.service = machine.service; - data = machine.buf.slice(0, machine.bufIndex); - //console.log('machine.service', machine.service); - - // // data, end, error // + var msg = {}; + if ('error' === machine.service) { + try { + msg = JSON.parse(data.toString()); + } catch(e) { + msg.message = data.toString(); + msg.code = 'E_UNKNOWN_ERR'; + } + } + + msg.family = machine.family; + msg.address = machine.address; + msg.port = machine.port; + msg.service = machine.service; + msg.data = data; + if ('end' === machine.service) { - msg = {}; - - msg.family = machine.family; - msg.address = machine.address; - msg.port = machine.port; - msg.service = 'end'; - msg.data = data; - if (machine.emit) { machine.emit('tunnelEnd', msg); } @@ -168,18 +144,6 @@ Packer.create = function (opts) { } } else if ('error' === machine.service) { - try { - msg = JSON.parse(machine.data.toString()); - } catch(e) { - msg = new Error('unknown error'); - } - - msg.family = machine.family; - msg.address = machine.address; - msg.port = machine.port; - msg.service = 'error'; - msg.data = data; - if (machine.emit) { machine.emit('tunnelError', msg); } @@ -187,15 +151,15 @@ Packer.create = function (opts) { (machine.onerror||machine.onmessage)(msg); } } + else if ('control' === machine.service) { + if (machine.emit) { + machine.emit('tunnelControl', msg); + } + else { + (machine.oncontrol||machine.onmessage)(msg); + } + } else { - msg = {}; - - msg.family = machine.family; - msg.address = machine.address; - msg.port = machine.port; - msg.service = machine.service; - msg.data = data; - if (machine.emit) { machine.emit('tunnelData', msg); } @@ -204,12 +168,11 @@ Packer.create = function (opts) { } } - machine.chunkIndex += partLen; // === chunk.length - machine.buf = null; // reset to null - machine.bufIndex = 0; // reset to 0 - return true; }; + + machine.state = 0; + machine.states = ['version', 'headerLength', 'header', 'data']; machine.fns.addChunk = function (chunk) { //console.log(''); //console.log('[addChunk]'); @@ -219,17 +182,19 @@ Packer.create = function (opts) { if (true === machine.fns[machine.states[machine.state]](chunk)) { machine.state += 1; - machine.state %= machine.states_length; + machine.state %= machine.states.length; } } }; return machine; - }; Packer.pack = function (address, data, service) { data = data || Buffer.alloc(1); + if (!Buffer.isBuffer(data)) { + data = new Buffer(JSON.stringify(data)); + } if (!data.byteLength) { data = Buffer.alloc(1); } @@ -242,16 +207,21 @@ Packer.pack = function (address, data, service) { } var version = 1; - var header = Buffer.from([ - /*servername,*/ address.family, address.address, address.port, data.byteLength - , (address.service || '') - ].join(',')); + var header; + if (service === 'control') { + header = Buffer.from(['', '', '', data.byteLength, service].join(',')); + } + else { + header = Buffer.from([ + address.family, address.address, address.port, data.byteLength, (address.service || '') + ].join(',')); + } var meta = Buffer.from([ 255 - version, header.length ]); var buf = Buffer.alloc(meta.byteLength + header.byteLength + data.byteLength); - meta.copy(buf, 0, 0, meta.byteLength); - header.copy(buf, 2, 0, header.byteLength); - data.copy(buf, 2 + header.byteLength, 0, data.byteLength); + meta.copy(buf, 0); + header.copy(buf, 2); + data.copy(buf, 2 + header.byteLength); return buf; }; @@ -262,22 +232,18 @@ Packer.socketToAddr = function (socket) { // tlsSocket.remoteAddress = remoteAddress; // causes core dump // console.log(tlsSocket.remoteAddress); + function extractValue(name) { + return socket[name] + || socket['_'+name] + || socket._handle._parentWrap[name] + || socket._handle._parentWrap._handle.owner.stream[name] + ; + } + return { - family: - socket.remoteFamily - || socket._remoteFamily - || socket._handle._parentWrap.remoteFamily - || socket._handle._parentWrap._handle.owner.stream.remoteFamily - , address: - socket.remoteAddress - || socket._remoteAddress - || socket._handle._parentWrap.remoteAddress - || socket._handle._parentWrap._handle.owner.stream.remoteAddress - , port: - socket.remotePort - || socket._remotePort - || socket._handle._parentWrap.remotePort - || socket._handle._parentWrap._handle.owner.stream.remotePort + family: extractValue('remoteFamily') + , address: extractValue('remoteAddress') + , port: extractValue('remotePort') }; }; diff --git a/package.json b/package.json index 6d2907e..b06e77f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "tunnel-packer", - "version": "1.1.0", + "version": "1.2.0", "description": "A strategy for packing and unpacking a proxy stream (i.e. packets through a tunnel). Handles multiplexed and tls connections. Used by telebit and telebitd.", "main": "index.js", "scripts": {