From 482040d6ec97ee5a8cc8dff231df000adb182a09 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Thu, 13 Jun 2019 17:36:25 -0600 Subject: [PATCH] for batching async tasks --- batchasync.js | 71 ++++++++++++++++++++++++++++++++ package.json | 25 ++++++++++++ test.js | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 206 insertions(+) create mode 100644 batchasync.js create mode 100644 package.json create mode 100644 test.js diff --git a/batchasync.js b/batchasync.js new file mode 100644 index 0000000..75e7e11 --- /dev/null +++ b/batchasync.js @@ -0,0 +1,71 @@ +(function (exports) { +'use strict'; + +exports.batchAsync = function (limit, arr, doStuff) { + arr = arr.slice(0); + return new Promise(function(resolve, reject) { + var total = arr.length; + var active = 0; + var results = []; + var error; + + function doMoreStuff() { + // Don't take on any more tasks if we've errored, + // or if too many are already in progress + if (error || active > limit) { + return; + } + + // If there are no more tasks to start, return + if (!arr.length) { + // If everything is also *finished*, resolve + if (active < 1) { + resolve(results); + } + return; + } + + // We need to dequeue the task here so the index is correct + // (keep in mind we want to support sync and async) + var index = total - arr.length; + var task = arr.shift(); + active += 1; + + // Spawn another task immediately, + // which will be stopped if we're at the limit + doMoreStuff(); + + var p; + try { + p = doStuff(task); + } catch (e) { + // we need to handle, and bubble, synchronous errors + error = e; + reject(e); + throw e; + } + // Do stuff and then decrease the active counter when done + // add support for sync by rapping in a promise + Promise.resolve(p) + .then(function(result) { + if ('undefined' === typeof result) { + throw new Error( + "result was 'undefined'. Please return 'null' to signal that you didn't just forget to return another promise." + ); + } + active -= 1; + results[index] = result; + }) + .then(doMoreStuff) + .catch(function(e) { + // handle async errors + error = e; + reject(e); + }); + } + + doMoreStuff(); + }); +}; + +}('undefined' !== typeof window ? window : module.exports)); diff --git a/package.json b/package.json new file mode 100644 index 0000000..91ac565 --- /dev/null +++ b/package.json @@ -0,0 +1,25 @@ +{ + "name": "batchasync", + "version": "1.0.0", + "description": "Like forEachAsync, or Promise.all(), but handling a bounded number of items at any given time.", + "main": "batchasync.js", + "scripts": { + "test": "node ./test.js" + }, + "repository": { + "type": "git", + "url": "https://git.coolaj86.com/coolaj86/batchasync.js.git" + }, + "keywords": [ + "batch", + "forEach", + "async", + "forEachAsync", + "Promise.all", + "Promise", + "task", + "queue" + ], + "author": "AJ ONeal (https://coolaj86.com/)", + "license": "MPL-2.0" +} diff --git a/test.js b/test.js new file mode 100644 index 0000000..61cd57f --- /dev/null +++ b/test.js @@ -0,0 +1,110 @@ +(function (exports) { +'use strict'; + +var batchAsync = exports.batchAsync || require('./batchasync.js').batchAsync; + +function testBatch() { + var timeouts = [100, 80, 20, 500, 50, 30, 200, 300]; + var tasks = timeouts.map(function(timeout, i) { + return function() { + return promiseTimeout(timeout).then(function() { + console.log('task:', i, timeouts[i]); + return i; + }); + }; + }); + var len = tasks.length; + + return batchAsync(4, tasks, function(task) { + return task(); + }) + .then(function(results) { + console.info('results:', results); + if (len !== results.length) { + throw new Error('result set too small'); + } + if (results.join(' ') !== results.sort().join(' ')) { + throw new Error('result set out-of-order'); + } + }) + .then(function() { + return batchAsync(4, [], 'not a function').then(function() { + console.info('Handled ZERO tasks correctly.'); + }); + }) + .then(function() { + return batchAsync(4, timeouts, function(x) { + return x; + }).then(function(results) { + if (results.join(' ') !== timeouts.join(' ')) { + console.error(results); + throw new Error('sync result set out-of-order'); + } + console.info('Handled sync tasks correctly.'); + }); + }) + .then(function() { + return batchAsync(4, tasks, function(task) { + if (0 === Math.floor(Math.random() * 2) % 2) { + throw new Error('any async error will do'); + } + return task(); + }) + .then(function(results) { + console.log(results); + var e = new Error('async rejection should not pass!'); + e.FAIL = true; + throw e; + }) + .catch(function(e) { + if (e.FAIL) { + throw e; + } + console.info('Pass: Exception thrown when expected'); + }); + }) + .then(function() { + return batchAsync(4, timeouts, function() { + if (0 === Math.floor(Math.random() * 2) % 2) { + throw new Error('any sync error will do'); + } + return null; + }) + .then(function(results) { + var e = new Error('should not pass sync exception!'); + e.FAIL = true; + throw e; + }) + .catch(function(e) { + if (e.FAIL) { + throw e; + } + }) + .then(function() { + // wait for the tasks the error left dangling to print their message + console.info('Pass: Promise rejected when expected'); + return promiseTimeout(1000); + }); + }); +} + +function promiseTimeout(timeout) { + return new Promise(function(resolve, reject) { + setTimeout(resolve, timeout); + }); +} + +testBatch() + .then(function() { + console.info('PROBABLY PASSED'); + console.info( + 'We tested what could be tested without knowing Passed what could be tested Do the results make sense?' + ); + }) + .catch(function(e) { + console.error('FAIL!'); + console.error(e); + process.exit(500); + }); + +}('undefined' !== typeof window ? window : module.exports));