add onmessage, onend, onerror

This commit is contained in:
AJ ONeal 2016-09-30 00:25:54 -04:00
parent 46f3b57751
commit 48d50210fb
1 changed files with 76 additions and 8 deletions

View File

@ -1,8 +1,19 @@
'use strict';
module.exports.create = function (opts) {
var machine;
if (!opts.onMessage && !opts.onmessage) {
machine = new (require('events').EventEmitter)();
}
machine.onMessage = opts.onmessage || opts.onMessage;
machine.onmessage = opts.onmessage || opts.onMessage;
machine.onError = opts.onerror || opts.onError;
machine.onerror = opts.onerror || opts.onError;
machine.onEnd = opts.onend || opts.onEnd;
machine.onend = opts.onend || opts.onEnd;
var machine = { onMessage: opts.onMessage };
machine._version = 1;
machine.state = 0;
machine.states = { 0: 'version', 1: 'headerLength', 2: 'header', 3: 'data'/*, 4: 'error'*/ };
@ -95,6 +106,7 @@ module.exports.create = function (opts) {
//console.log('curSize:', curSize);
//console.log('bodyLen:', machine.bodyLen, typeof machine.bodyLen);
var partLen = 0;
var msg;
partLen = Math.min(machine.bodyLen - machine.bufIndex, chunk.length - machine.chunkIndex);
@ -125,13 +137,69 @@ module.exports.create = function (opts) {
}
machine.bufIndex += partLen;
machine.onMessage({
family: machine.family
, address: machine.address
, port: machine.port
, data: machine.buf.slice(0, machine.bufIndex)
, service: machine.service
});
machine.service = machine.service || machine.type;
machine.type = machine.type || machine.service;
//
// data, end, error
//
if ('end' === machine.type) {
msg = {};
msg.family = machine.family;
msg.address = machine.address;
msg.port = machine.port;
msg.service = 'end';
msg.type = msg.type || 'end';
if (machine.emit) {
machine.emit('tunnelEnd', msg);
}
else {
(machine.onend||machine.onmessage)(msg);
}
}
else if ('error' === machine.type) {
try {
msg = JSON.parse(machine.data.toString());
} catch(e) {
msg = new Error('unknown error');
}
msg.family = machine.family;
msg.address = machine.address;
msg.port = machine.port;
msg.service = 'error';
msg.type = msg.type || 'error';
if (machine.emit) {
machine.emit('tunnelError', msg);
}
else {
(machine.onerror||machine.onError)(msg);
}
}
else {
msg = {
data: machine.buf.slice(0, machine.bufIndex)
, service: machine.service
, type: machine.type
};
msg.family = machine.family;
msg.address = machine.address;
msg.port = machine.port;
msg.service = machine.service;
msg.type = machine.type || 'data';
if (machine.emit) {
machine.emit('tunnelData', msg);
}
else {
machine.onmessage(msg);
}
}
machine.chunkIndex += partLen; // === chunk.length
machine.buf = null; // reset to null