0

This is my first project working with node streams. I also have some general questions about streams in general.

Goal: Stream large files (100 million+ rows) into MongoDB in parallel batches.

Ideal solution: Stream file into batches, run insertMany on batches without awaiting.

Research:

Current Issues

  • Running out of memory heap when run on test file of 5 million rows. I believe this is where the transform pipe with flush comes into play. Does this happen because I do not await each insertMany to finish?

  • Terminal still blinks because client.close() was never called. If I call this I run into errors that mongodb cannot connect.

  • csv package has transform api where you can run in parallel. I want to use this but I am not sure how to implement it on the batch sizes, looks like it wants to run one at a time? Questions

  • Should I be using the transform stream?

  • Do I need a writeable stream for the insertMany or can it be part of the read stream?

         //Packages
         const fs = require('fs');
         const csv = require('fast-csv');
    
    
         //MongoDB
         const { MongoClient } = require('mongodb');
         const uri = `...`;
         const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true });
    
    
    
         // Insert Many db
         async function doWork(batch, batchId) {
         await client.db("targetDB").collection("targetCollection").insertMany(batch, {ordered: false});
         console.log(`Finished processing batchId ${batchId} with batch size of ${batch.length}`);
         }
    
         let lineNumber = 0;
    
         const process = async (rowLimit, batchSize) => {
         // connect to mongoDB once, otherwise errors will get thrown
         await client.connect();
         const readStream = fs.createReadStream(fileName);
         let batch = [];
         let batchId = 1;
         readStream
         // set customer headers to deal with duplicate header names and skip the first row (which has the headers)
         .pipe(csv.parse({headers: headers, skipRows: 1}))
         .on('data', row => {
             batch.push(row);
             lineNumber++;
             // if batch full insert to database
             if (batch.length >= batchSize) {
                 // do not await the insert, let it work in the background
                 doWork(batch, batchId);
                 batch = [];
                 batchId++;
             }
             // grab for n rows of large file.
             if (lineNumber >= rowLimit) {
                 readStream.pause();
                 // this will continue to log some rows after the limit, buffer still picks up data after pause?
                 console.log(`Read stream has been paused at line ${lineNumber} at ${new Date().toLocaleTimeString()}`);
    
                 //readStream.destroy() // doesn't trigger on close or finish, is it because of the doWork async function?
             }
         })
         .on('error', function(err){
             console.log('Error while reading file.', err);
         })
         .on('end', function(){
             console.log(`Read ${lineNumber} lines`);
    
             // check if anything is left over (odd lots) in batch
             console.log(`left in batch: ${batch.length}`);
             if (batch.length) doWork(batch, batchId);
         })
         .on('finish', function(){
             console.log(`on Finish`);
    
             // check if anything is left over (odd lots) in batch
             console.log(`left in batch: ${batch.length}`);
             if (batch.length) doWork(batch, batchId);
         })
         .on('close', function(){
             console.log(`on Close`);
    
             // check if anything is left over (odd lots) in batch
             console.log(`left in batch: ${batch.length}`);
             if (batch.length) doWork(batch, batchId);
         })
        };
    
         process(1000000, 10000);
    
eurodollars
  • 27
  • 1
  • 6

0 Answers0