106

In https://stackoverflow.com/a/18658613/779159 is an example of how to calculate the md5 of a file using the built-in crypto library and streams.

var fs = require('fs');
var crypto = require('crypto');

// the file you want to get the hash    
var fd = fs.createReadStream('/some/file/name.txt');
var hash = crypto.createHash('sha1');
hash.setEncoding('hex');

fd.on('end', function() {
    hash.end();
    console.log(hash.read()); // the desired sha1sum
});

// read all file and pipe it (write it) to the hash object
fd.pipe(hash);

But is it possible to convert this to using ES8 async/await instead of using the callback as seen above, but while still keeping the efficiency of using streams?

Felix Kling
  • 756,363
  • 169
  • 1,062
  • 1,111
user779159
  • 8,056
  • 12
  • 50
  • 80
  • 3
    `async/await` is nothing but syntax-level support for promises. If you can put this code inside a promise, then you are done. – Felix Kling Nov 08 '15 at 22:27
  • 1
    Node.js 10.x supports using `for-await-of` to read streams (https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_streams_compatibility_with_async_generators_and_async_iterators) but I think it's not the right concept for your question here. Leaving it as a note for others who might come here facing a situation where it would help. – SimonSimCity Jul 01 '20 at 11:03

6 Answers6

136

async/await only works with promises, not with streams. There are ideas to make an extra stream-like data type that would get its own syntax, but those are highly experimental if at all and I won't go into details.

Anyway, your callback is only waiting for the end of the stream, which is a perfect fit for a promise. You'd just have to wrap the stream:

var fd = fs.createReadStream('/some/file/name.txt');
var hash = crypto.createHash('sha1');
hash.setEncoding('hex');
// read all file and pipe it (write it) to the hash object
fd.pipe(hash);

var end = new Promise(function(resolve, reject) {
    hash.on('end', () => resolve(hash.read()));
    fd.on('error', reject); // or something like that. might need to close `hash`
});

Now you can await that promise:

(async function() {
    let sha1sum = await end;
    console.log(sha1sum);
}());
Bergi
  • 572,313
  • 128
  • 898
  • 1,281
  • 4
    Congratulations! Now it is not a right answer, now you can use async cycle with streams http://2ality.com/2018/04/async-iter-nodejs.html. Unfortunately, it is an experimental feature now. – MiF Aug 03 '18 at 10:05
  • 13
    @MiF So you mean the answer is wrong, because it's not "highly experimental" but just "experimental"? :-D – Bergi Aug 03 '18 at 15:53
  • JS move to use promises and async/await in all async cases. It is too strange, that this fiature wasnt released in production. I think, it is a good sintacsys now and will not changed. – MiF Aug 04 '18 at 19:52
  • 1
    Why are you waiting for the ReadStream to `end`? Wouldn't the Promise resolve before the hash function has a chance to perform on the data? Or, at the very least, wouldn't it create a race condition? I would assume you would want to store the value of `fd.pipe(hash)` in a variable and listen for the `finish` event as that would signal that the WriteableStream (hashing) has completed. – adam-beck Dec 18 '18 at 20:01
  • @adam-beck Sounds like you're right. Why I did it this way? Because OP did that in their question as well. Feel free to suggest an edit. – Bergi Dec 18 '18 at 20:32
  • Well I did some digging. And now I'm not so sure: nodejs.org/api/stream.html#stream_events_finish_and_end – adam-beck Dec 19 '18 at 03:33
85

If you are using node version >= v10.0.0 then you can use stream.pipeline and util.promisify.

const fs = require('fs');
const crypto = require('crypto');
const util = require('util');
const stream = require('stream');

const pipeline = util.promisify(stream.pipeline);

const hash = crypto.createHash('sha1');
hash.setEncoding('hex');

async function run() {
  await pipeline(
    fs.createReadStream('/some/file/name.txt'),
    hash
  );
  console.log('Pipeline succeeded');
}

run().catch(console.error);
shusson
  • 5,138
  • 32
  • 35
44

Node V15 now has a promisfiy pipeline in stream/promises. This is the cleanest and most official way.

const { pipeline } = require('stream/promises');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

We all should appreciate how much works it's done here:

  • Capture errors in all the streams.
  • Destroy unfinished streams when error is raised.
  • Only return when the last writable stream is finished.

This pipe thing is one of the most powerful feature Node.JS has. Making it fully async is not easy. Now we have it.

Jason Ching
  • 1,525
  • 1
  • 16
  • 22
17

Something like this works:

for (var res of fetchResponses){ //node-fetch package responses
    const dest = fs.createWriteStream(filePath,{flags:'a'});
    totalBytes += Number(res.headers.get('content-length'));
    await new Promise((resolve, reject) => {
        res.body.pipe(dest);
        res.body.on("error", (err) => {
            reject(err);
        });
        dest.on("finish", function() {
            resolve();
        });
    });         
}
Ronnie Royston
  • 13,836
  • 5
  • 66
  • 83
  • 1
    Great explanation, I used this promise to wrap an https request I am making using NodeJS built in https module. Thank you! – Alex Rindone Apr 08 '20 at 21:15
7

2021 Update:

New example from Node documentation:

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const chunk of readable) {
    data += chunk;
  }
  console.log(data);
}

see https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator

Dmitri R117
  • 1,912
  • 20
  • 14
3

I would comment, but don't have enough reputation.

A WORD OF CAUTION: If you have an application that is passing streams around AND doing async/await, be VERY CAREFUL to connect ALL pipes before you await. You can end up with streams not containing what you thought they did. Here's the minimal example

const { PassThrough } = require('stream');

async function main() {
    const initialStream = new PassThrough();

    const otherStream = new PassThrough();
    const data = [];
    otherStream.on('data', dat => data.push(dat));
    const resultOtherStreamPromise = new Promise(resolve => otherStream.on('end', () => { resolve(Buffer.concat(data)) }));

    const yetAnotherStream = new PassThrough();
    const data2 = [];
    yetAnotherStream.on('data', dat => data2.push(dat));
    const resultYetAnotherStreamPromise = new Promise(resolve => yetAnotherStream.on('end', () => { resolve(Buffer.concat(data2)) }));

    initialStream.pipe(otherStream);
    initialStream.write('some ');

    await Promise.resolve(); // Completely unrelated await

    initialStream.pipe(yetAnotherStream);
    initialStream.end('data');
    const [resultOtherStream, resultYetAnotherStream] = await Promise.all([
        resultOtherStreamPromise,
        resultYetAnotherStreamPromise,
    ]);

    console.log('other stream:', resultOtherStream.toString()); // other stream: some data
    console.log('yet another stream:', resultYetAnotherStream.toString()); // yet another stream: data
}
main();