As all others in this answer thread have pointed out, Promise.all() won't do the right thing if you need to limit concurrency. But ideally you shouldn't even want to wait until all of the Promises are done before processing them.
Instead, you want to process each result ASAP as soon as it becomes available, so you don't have to wait for the very last promise to finish before you start iterating over them.
So, here's a code sample that does just that, based partly on the answer by Endless and also on this answer by T.J. Crowder.
async function* raceAsyncIterators(iterators) {
async function queueNext(iteratorResult) {
delete iteratorResult.result; // Release previous result ASAP
iteratorResult.result = await iteratorResult.iterator.next();
return iteratorResult;
};
const iteratorResults = new Map(iterators.map(iterator => [
iterator,
queueNext({ iterator })
]));
while (iteratorResults.size) {
const winner = await Promise.race(iteratorResults.values());
if (winner.result.done) {
iteratorResults.delete(winner.iterator);
} else {
const { value } = winner.result;
iteratorResults.set(winner.iterator, queueNext(winner));
yield value;
}
}
}
async function* runTasks(maxConcurrency, iterator) {
// Each worker is an async generator that polls for tasks
// from the shared iterator.
// Sharing the iterator ensures that each worker gets unique tasks.
const workers = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
workers[i] = (async function*() {
for (const task of iterator) yield await task();
})();
}
yield* raceAsyncIterators(workers);
}
// example tasks that sleep and return a number
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
const tasks = [];
for (let i = 0; i < 20; i++) {
tasks.push(async () => {
console.log(`start ${i}`);
await sleep(Math.random() * 1000);
console.log(`end ${i}`);
return i;
});
}
(async () => {
for await (let value of runTasks(3, tasks.values())) {
console.log(`output ${value}`);
}
})()
There's a lot of magic in here; let me explain.
We start with an async generator function (an async function*()) called raceAsyncIterators(). raceAsyncIterators() is like Promise.race() but with N iterators of Promises instead of just N Promises; it returns an async iterator that yields the results of resolved Promises.
The raceAsyncIterators() keeps an iteratorResults map, mapping from iterators (as keys) to Promises of iteratorResult objects; each iteratorResult has an iterator property and a result (the result of awaiting the iterator's next() Promise).
raceAsyncIterators() calls Promise.race() to let the iteratorResult Promises race to complete their tasks. When the winning iteratorResult says that its iterator is completely done, we remove it from the map; otherwise, we replace its Promise in the iteratorResults map with the iterator's next() Promise and yield the result value.
With that out of the way, we can now define our runTasks() function.
runTasks() accepts an iterator parameter, a non-async iterator of "tasks" to perform. Each task is a async function (a regular async function(), not an async generator async function*()). It also accepts a number maxConcurrency, the number of workers we'll run.
// Each worker is an async iterator that polls for tasks
// from the shared iterator.
// Sharing the iterator ensures that each worker gets unique tasks.
const workers = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
workers[i] = (async function*() {
for (const task of iterator) yield await task();
})();
}
Note that the workers are initially defined as async generator functions, but we immediately invoke each function, and store each resulting async iterator in the workers array.
If we had just one worker iterator, we could call for await (let result of worker()) to get a stream of results.
But, since we have N worker iterators, we want to race them with raceAsyncIterators(), processing results from whichever worker iterator yields results first.
The last line of runTasks() is:
yield* raceAsyncIterators(workers)
yield* is an uncommon JS expression in which a generator can yield the results of another generator. This yield* line yields whichever results win the race.
With runTasks(), we can use a for await loop, like this:
for await (const value of runTasks(3, tasks.values())) {
console.log(`output ${value}`);
}
This returns each Promise's value in the order in which they're resolved.
In the example, we generate an array of 20 async tasks that sleep for a random amount of time and return a number. (In real life, you'd probably make an array of async functions that fetch URLs or something.)
The example calls runTasks with 3 concurrent workers, so no more than 3 tasks are launched at the same time. When any task completes, we immediately queue up the next task. (This is superior to "batching", where you do 3 tasks at once, await all three of them, and don't start the next batch of three until the entire previous batch has finished.)