4

How to bufferize efficiently in nodeJS on events from a stream to bulk insert instead of unique insert per record received from the stream. Here's pseudo code I've got in mind:

// Open MongoDB connection

mystream.on('data', (record) => {
   // bufferize data into an array
   // if the buffer is full (1000 records)
   // bulk insert into MongoDB and empty buffer
})

mystream.on('end', () => {
   // close connection
})

Does this look realistic? Is there any possible optimization? Existing libraries facilitaties that?

dbrrt
  • 1,860
  • 2
  • 14
  • 31
  • 2
    The nodejs native `stream` api sounds like the perfect fit, you should look into using a Writable. The size of the buffer may be controlled by setting the highWaterMark. The writable class has a `final()` function, which is called once the stream has completed. This could be used to close the db connection. – jorgenkg Nov 09 '20 at 05:02
  • Thanks for your answer, I considered that option too, that's probably the best way to solve that issue, the more data you get in, the more data to put on the buffer, and the data you receive in the buffer will be automatically populating the MongoDB database, I assume this way you can also control the data flow, and get the data coming into the input automatically destroyed. I'm planning to use small to quite large datasets with this approach (few kb to 5-10gb of streamed data) – dbrrt Nov 09 '20 at 05:23
  • 1
    Both MongoDB's native driver (and the Mongoose API) expose a DB cursor interface that can be encapsulated as a stream.Readable (`stream.Readable.from()`) and then piped into the buffer Writable. Thus, the script won't fetch more data than it can store in its writable buffer. – jorgenkg Nov 09 '20 at 12:01
  • This example is very close to what I'm looking for https://github.com/sorribas/mongo-write-stream/blob/master/index.js – dbrrt Nov 09 '20 at 12:05

2 Answers2

2

Using NodeJS' stream library, this can be concisely and efficiently implemented as:

const stream = require('stream');
const util = require('util');
const mongo = require('mongo');

const streamSource; // A stream of objects from somewhere

// Establish DB connection
const client = new mongo.MongoClient("uri");
await client.connect();

// The specific collection to store our documents
const collection = client.db("my_db").collection("my_collection");

await util.promisify(stream.pipeline)( 
  streamSource, 
  stream.Writable({
    objectMode: true,
    highWaterMark: 1000,
    writev: async (chunks, next) => {
      try {
        const documents = chunks.map(({chunk}) => chunk);
        
        await collection.insertMany(docs, {ordered: false});

        next();
      }
      catch( error ){
        next( error );
      }
    }
  })
);
jorgenkg
  • 3,950
  • 1
  • 32
  • 46
  • again, thank you so much for your help, it might be a drop in solution for my problem. – dbrrt Nov 09 '20 at 14:35
  • I'm relatively new to Nodejs Streams (in a more complex use-case). This answer has helped me A LOT! Thanks heaps. – victorkurauchi Jul 26 '21 at 07:17
  • It's important to note that this method doesn't guarantee that `writev` always gets 1000 chunks. It can be anything between 1 and 1000. – morten.c Mar 10 '22 at 18:53
  • Yes, *at most* 1000 chunks/documents. Depending on how many objects have been added to the write buffer between the previous call to `writev` and calling `next`. – jorgenkg Mar 10 '22 at 20:48
0

I ended up with a no dependency solution.

const { MongoClient } = require("mongodb")
const url = process.env.MONGO_URI || "mongodb://localhost:27019";
const connection = MongoClient.connect(url, { useNewUrlParser: true, useUnifiedTopology: true })
    Promise.resolve(connection)
        .then((db) => {
            const dbName = "databaseName";
            const collection = 'collection';
            const dbo = db.db(dbName);

            let buffer = []

            stream.on("data", (row: any) => {
                buffer.push(row)
                if (buffer.length > 10000) {
                    dbo.collection(collection).insertMany(buffer, {ordered: false});
                    buffer = []
                }
            });

            stream.on("end", () => {
                // insert last chunk
                dbo.collection(collection).insertMany(buffer, {ordered: false})
                    .then(() => {
                        console.log("Done!");
                        db.close();
                    })
                
            });
            sas_stream.on("error", (err) => console.log(err));

        })
        .catch((err) => {
            console.log(err)
        })
dbrrt
  • 1,860
  • 2
  • 14
  • 31