27

Consider the following:

var asyncFunction = function(data, callback) {
  doAsyncyThing(function(data){
    // do some stuff
    return callback(err)
  })
}
fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
  .pipe(JSONstream.parse())
  .on('data', asyncFunction)   // <- how to let asyncFunction complete before continuing

How does the stream know when asyncFunction has completed? Is there any way to use asynchronous functions from within streams?

Fergie
  • 5,438
  • 7
  • 35
  • 42
  • I'm not sure how the stream will handle the callback since the documentation doesn't show two parameters in on('data', function). If you do want to do something fancy though, you can pause the stream, do your stuff, then resume the stream. – Dave Briand Apr 16 '16 at 11:01
  • @DaveBriand are you saying categorically that you cannot do this, or are you restating the question? :) – Fergie Apr 16 '16 at 13:47
  • Categorically you can't pass a two argument function to the data stream event. However, you can pause the stream on the data event, do some asynchronous processing, then resume the stream when your processing is complete. – Dave Briand Apr 18 '16 at 14:42
  • Great! Is there a cleanish way to code this? Could you give an example? – Fergie Apr 19 '16 at 20:44
  • 1
    Just for clarity: yes, `on('data', asyncFunction)` cannot deal with callbacks, since `asyncFunction` must be in the form `function(data)`. My point is: "how then do you deal with callbacks?" – Fergie Aug 10 '16 at 16:51

1 Answers1

23

Check out transform streams. They give you the ability to run async code on a chunk, and then call a callback when you are finished. Here are the docs: https://nodejs.org/api/stream.html#transform_transformchunk-encoding-callback

As a simple example, you can do something like:

const Transform = require('stream').Transform
class WorkerThing extends Transform {
    _transform(chunk, encoding, cb) {
        asyncFunction(chunk, cb)
    }
}

const workerThing = new WorkerThing()

fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.pipe(workerThing)
t7tran
  • 1,569
  • 16
  • 15
Ryan Quinn
  • 1,139
  • 1
  • 15
  • 28
  • 1
    I didnt really understand what you were saying at first, but yes, I see now that transform streams are probably the way forward. Will try this out. – Fergie Aug 10 '16 at 16:47