6

In my program source code I have the following function (Promise concurrency limitation function, similar to pLimit):

async function promiseMapLimit(
  array,
  poolLimit,
  iteratorFn,
) {
  const ret = [];
  const executing = [];
  for (const item of array) {
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    if (poolLimit <= array.length) {
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        await Promise.race(executing);
      }
    }
  }

  return Promise.all(ret);
}

It works properly, so if I passed it an array of numbers [1..99] and try to multiply it by 2 it will give the correct output [0..198].

const testArray = Array.from(Array(100).keys());

promiseMapLimit(testArray, 20, async (value) => value * 2).then((result) =>
  console.log(result)
);

Code sample - js playground.

But I can't understand its logic, during the debugging I noticed, that it adds promises in chunks of 20 and only after that goes further: enter image description here

For example, this block of code:

  for (const item of array) {
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

will iterate over 20 items of an array (why not all 100???)

same here:

if (poolLimit <= array.length) {
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);

it will add only 20 items to the executing array and only after that step inside if (executing.length >= poolLimit) code block.

I'd be very grateful for the explanation of how this function works.

Karen
  • 1,069
  • 3
  • 16
  • 35
  • The answers to this question: https://stackoverflow.com/questions/37576685/using-async-await-with-a-foreach-loop might be able to help you. – Dr.Simplisist Dec 02 '21 at 07:20

5 Answers5

5

Very interesting question! I think the important part of the code here is Promise.race(...) which resolves as soon one of the promises resolves.

I have added a sleep function with a random factor (up to 6 seconds) to better visualize the way how this works.

The expected functionality is that: we always want 20 promises to be executed in paralel, and once one finishes, the next in the queue would execute.

In a visual way, that would look like this, for a limit of 3 and 10 promises – in the below example you can notice that in every moment in time there are 3 active promises (except when they end):

PromiseID  | Start                 End |
0          [====]
1          [==]
2          [======]
3             [==========]
4               [====]
5                 [================]
6                    [==]
7                       [====]
8                        [======]
9                            [========]

The code to create the random delay is below:

// Create the utility sleep function
const sleep = x => new Promise(res => setTimeout(res, x))

async function promiseMapLimit(array, poolLimit, iteratorFn) {
  const ret = [];
  const executing = [];
  for (const item of array) {
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    console.log(ret.length)
    if (poolLimit <= array.length) {
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        console.log(`Running batch of ${executing.length} promises.`);
        await Promise.race(executing);
        // As ssoon one of the promise finishes, we continue the loop.
        console.log("Resolved one promise.")
      }
    }
  }

  return Promise.all(ret);
}

const testArray = Array.from(Array(100).keys());

promiseMapLimit(testArray, 20, async (value) => {
  // Log
  console.log(`Computing iterator fn for ${value}`)
  await sleep(3000 + Math.random() * 3000);
  return value * 2
}).then((result) =>
  console.log(result)
);

will iterate over 20 items of an array (why not all 100???)

At start, like in the graph, it will not iterate all 100 items, but the first 20 items and then the loop is paused by await Promise.race(...) (because executing.length >= poolLimit will be true after iterating 20 items).

Once a promise is fullfiled, it will be removed from the executing array by executing.splice(executing.indexOf(e), 1).

I think things become more clear when having a delay (await sleep(...)) so that we can simulate a real async operation (such as a database request etc).

Please let me know if there is anything else unclear.

Ionică Bizău
  • 102,012
  • 80
  • 271
  • 450
3

You have await inside async function. This works roughly as follows:

  • Execute code line by line until await keyword
  • Pause execution of this code block
  • Once awaited value resolved continue until next await keyword

In your case, it iterates 20 times, then pauses everything once you hit a limit. Then, once at least one promise inside ret resolved it proceeds.

Next thing that is happening is that once any of the promises resolved, removes itself from array. But since almost everything happens instantaneously, you see that it - resolves all 20 promises, fills with another 20. If you make your iteratorFn slower with random delays, you'll see, that pool is constantly filling up to 20 and then almost immediately replaces freed space in pool with new promise, while there is at least some elements left.

Let's replace your iteratorFn with this and call it:

let iter = async (value) => {
  // randomly delay each calculation to 1, 2 or 3 seconds
  return new Promise(resolve => setTimeout(resolve, [1000, 2000, 3000][Math.floor(Math.random() * 3)], value * 2))
}

promiseMapLimit(testArray, 20, iter).then((result) =>
  console.log(result)
);

And let's log amount of elements inside executing once there is a promise resolved:

if (poolLimit <= array.length) {
      const e = p.then(() => {
        executing.splice(executing.indexOf(e), 1);
// logging what is left
         console.log({l: executing.length})
         });
      executing.push(e);
      if (executing.length >= poolLimit) {
        await Promise.race(executing);
      }
    }

This way in console you will see that logging starts with {l: 19}, since pool is filled up and then one promise resolved. And it will continue, until the very end where log will go from 19 to 0.

Mr. Hedgehog
  • 2,237
  • 1
  • 13
  • 13
0

Mr. Hedgehog's answer does explain the important parts. I tried to explain the function using inline comments, maybe this helps.

async function promiseMapLimit(
    array,
    poolLimit,
    iteratorFn,
  ) {

    // array for storing results / ResultPromises
    const ret = [];

    // array holding currently executed functions!.
    const executing = [];

    // iterate over array
    for (const item of array) {

      // Create a new Promise which is instantly resolved!.
      // this is syntactic sougar and could also be written as:
      // const p = new Promise((res, rej) => iteratorFn(item, array).then(res))
      // but with this used version, it also would allow the iteratorFn to be NON ASYNC 
      const p = Promise.resolve().then(

        // since it is resolved immediatly, this code block will be executed
        // not immediatly but almost immediatly. (few mikroseconds delay..)
        // Under the hoods, js always adds up all functions into a queue. 
        // So this function body will for example be executed after the `ret.push(p)` !! 
          () =>  iteratorFn(item, array)
        );

      // store the created promise in the results array anyways!!
      ret.push(p);
  

      // If the array holds less elements than poolLimit then nothing has to be done here and
      // all elements can directly be executed.
      if (poolLimit <= array.length) {

        // if we get in here, it says that there are more elements in the array than allowed by 
        // poolLimit. 

        // This line adds a success handler to the promise. It basically 
        // removes itself from the executing array at the point it finishes!.
        const e = p.then(() =>

            // here you see, it searches for itself, and removes itself from the array using splice!.
            executing.splice(executing.indexOf(e), 1)
        );


        // Add the promise to the currently executing ones. 
        // Note that this following line is executed before the "executing.splice(executing.indexOf(e), 1)"
        executing.push(e);

        // And the final, smart part!.
        // If in the executing array are less elements than maximum allowed,
        // do nothing!
        if (executing.length >= poolLimit) {
            // we got here, so the allowed size of executing queue is smaller than array.length . 
            // Promise.race() waits for the first promise to resolve in a given array
            // So at the moment any promise is resolved, the execution will go on and not before!.
            // Since the promises remove themselves from the executing array we know 1 
            // promise is now finished and the executing holds a space!.
          await Promise.race(executing);
        }
      }
    }
  

    // since all promises were looped and added to results array it's now 
    // possible to await them all!.
    return Promise.all(ret);
  }

To understand this topic, you may wanna learn about eventloop and callstack of the v8 engine. => https://www.educative.io/edpresso/what-is-an-event-loop-in-javascript

Silvan Bregy
  • 2,182
  • 6
  • 19
0

This answer will be fun to write.

async function promiseMapLimit(
  array,
  poolLimit,
  iteratorFn,
) {
  const ret = [];
  const executing = [];
  for (const item of array) {
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    if (poolLimit <= array.length) {
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        await Promise.race(executing);
      }
    }
  }

  return Promise.all(ret);
}

So there are three things happening in this piece of code.

  1. All the array elements are put in the ret array
  2. Created a batch of promises that has to be executed first - execution is done using Promise.race
  3. Promises in the execution queue are also chained to remove themselves from the execution queue.

Ignore 2 for sometime and lets breakdown 1 and 3.

Lets run through a simple scenario.

promiseMapLimit([1, 2, 3], 1, async function (i) => { return 2 * i });

This line calls the above function. now because of this like const p = Promise.resolve().then(() => iteratorFn(item, array)); ret now has [Promise(2), Promise(4), Promise(6)]

Now because poolLimit is less than array.length we go into the if block.

const e = p.then(() => executing.splice(executing.indexOf(e), 1)); this line adds a then block which will get executed after the promise elements from the ret array resolve. In this then block each promise iterates through the executing array finds itself and remove itself.

It is equivalent to:

const e = p.then(() => {
    1. Keep e as closure variable for later, for when the promise does get resolved in Promise.race line
    2. find that e in executing array
    3. Remove it
});

Now that you understand the 1 and 3. Lets go to 2. Loop runs for the array.length and when executing.length >= poolLimit it waits for execution of promises in the executing queue due to this line await Promise.race(executing);

Let me rewrite the function so its easier to understand:

async function promiseMapLimit(
  array,
  poolLimit,
  iteratorFn,
) {
  const ret = [];
  const executing = [];
  for (const item of array) {
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);
  }
  if (poolLimit > array.length) {
    // if poolLimit and greater that array just resolve all the promises and no need to worry of concurrency
    return Promise.all(ret);       
  }
  for (const p of ret) {
      // Add self remove logic to each promise element
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      // Put the promise element in execution queue
      executing.push(e);
      // Whenever the execution ques is full wait for execution of the promises
      // Promise.race awaits for all the promises to finish execution
      // And because of the self removal logic execution queue also becomes empty
      if (executing.length >= poolLimit) {
        await Promise.race(executing);
      }
  }
  // Promise.all will take care of all the promises remaining in the last batch for which await Promise.race(executing); was not executed.
  return Promise.all(ret);
}
-2

Some of the code isn't correct according to what I think you're trying to do.

if (poolLimit <= array.length) doesn't do anything, the code inside it will always execute. Unless you send it an array with less values than the poolLimit, then it won't execute anything at all and just directly return an empty ret array promise. In your case you've sent it an array of 100, so everything inside that if will always execute.

if (executing.length >= poolLimit) is more correct according to what you're trying to do. But you've made a little mistake here in that it should rather be executing.length <= poolLimit, so that it can fill up the execution array even if it's below 20. Right now it will execute only in chunks of 20 instead of filling up to 20. It will also break if you sent it an array of i.e. 101 since it will never execute that last promise, you just accidentally sent it an array where the array length is dividable by the limit without a rest.

However you can replace both those checks and the loop instead with something like while (executing.length <= poolLimit && ret.length !== array.length), so that it constantly fills up the execution array until it's done.

Something else that's not quite right is the Promise.race. The race will execute only a single promise, if any one of them resolves it will stop executing all of them, which is probably not what you want to do with a promise limiter. Instead just let them resolve by themselves, there's no reason to race anything or await anything there.

The code you have here:

if (poolLimit <= array.length) {
  const e = p.then(() => executing.splice(executing.indexOf(e), 1));
  executing.push(e);
  if (executing.length >= poolLimit) {
    await Promise.race(executing);
  }
}

Is all essentially dead code and doesn't really do anything, all of your array promises are immediately executing and being added to ret. It is successfully breaking them up into chunks of 20 but not in the way you intended.

Instead you probably tried to do something like:

const limiter = async (array, limit) => {
  const promises = []
  const executing = []

  while (array.length !== promises.length) {
    if (executing.length <= limit) {
      executing.push(/* new promise here */ ...then(example => {
        promises.push(example)
        // splice execution array
      }))
    }
  }

  return Promise.all(promises)
}

That's not perfect, just to give you an idea, you can alter that according to what exactly you're trying to do.

A last suggestion is that you don't have to keep the executing promises in an actual array, you can also just keep a counter of the promises that's currently being resolved and increase and decrease that counter instead of constantly splicing the executing array.

Matriarx
  • 2,959
  • 1
  • 6
  • 19
  • 1
    The code works correctly. The `poolLimit` will only kick in if there are more elements in array than the limit, that's correct ? . The Code within `Promise.resolve().then(/**...here**/)` will almost directly be exrecuted. – Silvan Bregy Nov 30 '21 at 13:52
  • Also, `Promise.race()` creates a new `Promise` which is resolved/rejected with the value of the first `Promise` in its param array.. It does not impact the Promise execution at all¨.https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race – Silvan Bregy Nov 30 '21 at 13:53
  • No it doesn't, it's broken in many ways. The poolLimit shouldn't only kick in at 20, it should kick in immediately, otherwise you're delaying everything until it reaches 20, which slows all the promises down. You want it to fill up to the limit, not ONLY execute at the limit. What's more is that if you give it 24, those last 4 will never execute because it will never reach the limit. The code within the .then should be executed after the internal promise resolves, I've edited to show that. – Matriarx Nov 30 '21 at 14:30
  • I know exactly what promise.race is doing, but when you're building a promise limiter you don't want to only execute 1 of 20, you want to execute all of them, race is meaningless here. What's more is that using race on chunks of 20 makes no sense, if you do want to race them then you have to race all 100 of them. – Matriarx Nov 30 '21 at 14:31
  • The race is stopping execution of the loop once the `executing.length` has reached `poolLimit` and resumes it, when one of the element in the array is done... It works. That's it. It's not about performance in here.. – Silvan Bregy Nov 30 '21 at 14:33
  • ````poolLimit <= array.length```` is also not doing anything, the poolLimit isn't changing, the array.length is never changing, that if is irrelevant. – Matriarx Nov 30 '21 at 14:37
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/239703/discussion-between-silvan-bregy-and-matriarx). – Silvan Bregy Nov 30 '21 at 14:46