86

I wrote a pretty simple function that downloads an image from a given URL, resize it and upload to S3 (using 'gm' and 'knox'), I have no idea if I'm doing the reading of a stream to a buffer correctly. (everything is working, but is it the correct way?)

also, I want to understand something about the event loop, how do I know that one invocation of the function won't leak anything or change the 'buf' variable to another already running invocation (or this scenario is impossible because the callbacks are anonymous functions?)

var http = require('http');
var https = require('https');
var s3 = require('./s3');
var gm = require('gm');

module.exports.processImageUrl = function(imageUrl, filename, callback) {
var client = http;
if (imageUrl.substr(0, 5) == 'https') { client = https; }

client.get(imageUrl, function(res) {
    if (res.statusCode != 200) {
        return callback(new Error('HTTP Response code ' + res.statusCode));
    }

    gm(res)
        .geometry(1024, 768, '>')
        .stream('jpg', function(err, stdout, stderr) {
            if (!err) {
                var buf = new Buffer(0);
                stdout.on('data', function(d) {
                    buf = Buffer.concat([buf, d]);
                });

                stdout.on('end', function() {
                    var headers = {
                        'Content-Length': buf.length
                        , 'Content-Type': 'Image/jpeg'
                        , 'x-amz-acl': 'public-read'
                    };

                    s3.putBuffer(buf, '/img/d/' + filename + '.jpg', headers, function(err, res) {
                        if(err) {
                            return callback(err);
                        } else {
                            return callback(null, res.client._httpMessage.url);
                        }
                    });
                });
            } else {
                callback(err);
            }
        });
    }).on('error', function(err) {
        callback(err);
    });
};
Gal Ben-Haim
  • 17,068
  • 21
  • 75
  • 129

9 Answers9

97

Overall I don't see anything that would break in your code.

Two suggestions:

The way you are combining Buffer objects is a suboptimal because it has to copy all the pre-existing data on every 'data' event. It would be better to put the chunks in an array and concat them all at the end.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
})

For performance, I would look into if the S3 library you are using supports streams. Ideally you wouldn't need to create one large buffer at all, and instead just pass the stdout stream directly to the S3 library.

As for the second part of your question, that isn't possible. When a function is called, it is allocated its own private context, and everything defined inside of that will only be accessible from other items defined inside that function.

Update

Dumping the file to the filesystem would probably mean less memory usage per request, but file IO can be pretty slow so it might not be worth it. I'd say that you shouldn't optimize too much until you can profile and stress-test this function. If the garbage collector is doing its job you may be overoptimizing.

With all that said, there are better ways anyway, so don't use files. Since all you want is the length, you can calculate that without needing to append all of the buffers together, so then you don't need to allocate a new Buffer at all.

var pause_stream = require('pause-stream');

// Your other code.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var contentLength = bufs.reduce(function(sum, buf){
    return sum + buf.length;
  }, 0);

  // Create a stream that will emit your chunks when resumed.
  var stream = pause_stream();
  stream.pause();
  while (bufs.length) stream.write(bufs.shift());
  stream.end();

  var headers = {
      'Content-Length': contentLength,
      // ...
  };

  s3.putStream(stream, ....);
Christian Fritz
  • 18,961
  • 3
  • 42
  • 63
loganfsmyth
  • 146,797
  • 27
  • 317
  • 241
  • it supports streams, but I need to know the Content-Length for the S3 headers and its impossible with streams – Gal Ben-Haim Jan 11 '13 at 00:14
  • btw - what about the second part of the question ? – Gal Ben-Haim Jan 11 '13 at 00:15
  • 1
    is it a better practice to pipe the stream from 'gm' to a file and then open a stream from that file and upload to S3, using the file size as Content-Length ? as far as I understand this eliminates loading the entire file to memory like I'm doing now – Gal Ben-Haim Jan 11 '13 at 07:53
  • just want to mention that the `bufs.pop()` call should be `bufs.unshift()`, or even easier just replace the entire while loop with a simple for loop. – Erhhung Mar 07 '16 at 22:07
  • on the on('data') you can just do bytes += data.length instead of reducing the array at the end. – Bergur Feb 07 '18 at 00:44
  • 1
    @Bergur True, but then you have to maintain two separate accumulator variables. I prefer maintaining the single one and calculating the length later. I'm not convinced it would make an appreciable difference in performance or anything. – loganfsmyth Feb 07 '18 at 01:01
  • Great...Its really helped @oganfsmyth – Dibish Mar 08 '19 at 06:22
31

Javascript snippet

function stream2buffer(stream) {

    return new Promise((resolve, reject) => {
        
        const _buf = [];

        stream.on("data", (chunk) => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", (err) => reject(err));

    });
} 

Typescript snippet

async function stream2buffer(stream: Stream): Promise<Buffer> {

    return new Promise < Buffer > ((resolve, reject) => {
        
        const _buf = Array < any > ();

        stream.on("data", chunk => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", err => reject(`error converting stream - ${err}`));

    });
} 
Mike
  • 12,690
  • 24
  • 87
  • 142
bsorrentino
  • 1,013
  • 9
  • 19
  • 1
    this works incredibly well... this is the MVP right here – Jan Jun 05 '21 at 19:57
  • exactly what i was looking for! thank you! – Fedcomp Jun 09 '21 at 22:12
  • Actually for me JS version of `stream2buffer()` doesn't return a proper value. – Mike Jun 30 '21 at 20:10
  • 1
    Hi @MikeB. as you see the code is very simple (also to debug) Could you give us mode details concerning : "doesn't return a proper value" – bsorrentino Jul 01 '21 at 15:50
  • @bsorrentino, I think the problem was with the way I return the value. In my case `const pdfBuffered = \`data:application/pdf;base64, ${Buffer.concat(chunks).toString("base64")}\`;` worked OK. So, not just `Buffer.concat(_buf))` but `Buffer.concat(chunks).toString("base64")`. – Mike Jul 01 '21 at 18:09
9

You can easily do this using node-fetch if you are pulling from http(s) URIs.

From the readme:

fetch('https://assets-cdn.github.com/images/modules/logos_page/Octocat.png')
    .then(res => res.buffer())
    .then(buffer => console.log)
Tiberiu-Ionuț Stan
  • 5,311
  • 6
  • 36
  • 65
5

I suggest loganfsmyths method, using an array to hold the data.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
}

IN my current working example, i am working with GRIDfs and npm's Jimp.

   var bucket = new GridFSBucket(getDBReference(), { bucketName: 'images' } );
    var dwnldStream = bucket.openDownloadStream(info[0]._id);// original size
  dwnldStream.on('data', function(chunk) {
       data.push(chunk);
    });
  dwnldStream.on('end', function() {
    var buff =Buffer.concat(data);
    console.log("buffer: ", buff);
       jimp.read(buff)
.then(image => {
         console.log("read the image!");
         IMAGE_SIZES.forEach( (size)=>{
         resize(image,size);
         });
});

I did some other research

with a string method but that did not work, per haps because i was reading from an image file, but the array method did work.

const DISCLAIMER = "DONT DO THIS";
var data = "";
stdout.on('data', function(d){ 
           bufs+=d; 
         });
stdout.on('end', function(){
          var buf = Buffer.from(bufs);
          //// do work with the buffer here

          });

When i did the string method i got this error from npm jimp

buffer:  <Buffer 00 00 00 00 00>
{ Error: Could not find MIME for Buffer <null>

basically i think the type coersion from binary to string didnt work so well.

Maddocks
  • 467
  • 4
  • 8
4

You can convert your readable stream to a buffer and integrate it in your code in an asynchronous way like this.

async streamToBuffer (stream) {
    return new Promise((resolve, reject) => {
      const data = [];

      stream.on('data', (chunk) => {
        data.push(chunk);
      });

      stream.on('end', () => {
        resolve(Buffer.concat(data))
      })

      stream.on('error', (err) => {
        reject(err)
      })
   
    })
  }

the usage would be as simple as:

 // usage
  const myStream // your stream
  const buffer = await streamToBuffer(myStream) // this is a buffer
  • Doesn't work with `process.stdin` on Windows - if no input has been piped into the command, neither `end` nor `error` appears to fire. – mindplay.dk Sep 15 '21 at 12:08
1

I suggest to have array of buffers and concat to resulting buffer only once at the end. Its easy to do manually, or one could use node-buffers

Andrey Sidorov
  • 24,056
  • 4
  • 61
  • 74
1

I just want to post my solution. Previous answers was pretty helpful for my research. I use length-stream to get the size of the stream, but the problem here is that the callback is fired near the end of the stream, so i also use stream-cache to cache the stream and pipe it to res object once i know the content-length. In case on an error,

var StreamCache = require('stream-cache');
var lengthStream = require('length-stream');

var _streamFile = function(res , stream , cb){
    var cache = new StreamCache();

    var lstream = lengthStream(function(length) {
        res.header("Content-Length", length);
        cache.pipe(res);
    });

    stream.on('error', function(err){
        return cb(err);
    });

    stream.on('end', function(){
        return cb(null , true);
    });

    return stream.pipe(lstream).pipe(cache);
}
1

in ts, [].push(bufferPart) is not compatible;

so:

getBufferFromStream(stream: Part | null): Promise<Buffer> {
    if (!stream) {
        throw 'FILE_STREAM_EMPTY';
    }
    return new Promise(
        (r, j) => {
            let buffer = Buffer.from([]);
            stream.on('data', buf => {
               buffer = Buffer.concat([buffer, buf]);
            });
            stream.on('end', () => r(buffer));
            stream.on('error', j);
        }
    );
}
afcfzf
  • 29
  • 5
0

You can check the "content-length" header at res.headers. It will give you the length of the content you will receive (how many bytes of data it will send)

faBri_CI-o
  • 33
  • 1
  • 6