This is my first project working with node streams. I also have some general questions about streams in general.
Goal: Stream large files (100 million+ rows) into MongoDB in parallel batches.
Ideal solution: Stream file into batches, run insertMany on batches without awaiting.
Research:
- Bufferizing data from stream in nodeJS for perfoming bulk insert
- https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback (Transform pipe which I think I need)
- Node.js: can you use asynchronous functions from within streams? Ryan Quinns answer
- Parsing huge logfiles in Node.js - read in line-by-line accepted answer
- https://nodejs.org/es/docs/guides/backpressuring-in-streams/
- https://csv.js.org/transform/handler/
Current Issues
Running out of memory heap when run on test file of 5 million rows. I believe this is where the transform pipe with flush comes into play. Does this happen because I do not await each insertMany to finish?
Terminal still blinks because client.close() was never called. If I call this I run into errors that mongodb cannot connect.
csv package has transform api where you can run in parallel. I want to use this but I am not sure how to implement it on the batch sizes, looks like it wants to run one at a time? Questions
Should I be using the transform stream?
Do I need a writeable stream for the insertMany or can it be part of the read stream?
//Packages const fs = require('fs'); const csv = require('fast-csv'); //MongoDB const { MongoClient } = require('mongodb'); const uri = `...`; const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true }); // Insert Many db async function doWork(batch, batchId) { await client.db("targetDB").collection("targetCollection").insertMany(batch, {ordered: false}); console.log(`Finished processing batchId ${batchId} with batch size of ${batch.length}`); } let lineNumber = 0; const process = async (rowLimit, batchSize) => { // connect to mongoDB once, otherwise errors will get thrown await client.connect(); const readStream = fs.createReadStream(fileName); let batch = []; let batchId = 1; readStream // set customer headers to deal with duplicate header names and skip the first row (which has the headers) .pipe(csv.parse({headers: headers, skipRows: 1})) .on('data', row => { batch.push(row); lineNumber++; // if batch full insert to database if (batch.length >= batchSize) { // do not await the insert, let it work in the background doWork(batch, batchId); batch = []; batchId++; } // grab for n rows of large file. if (lineNumber >= rowLimit) { readStream.pause(); // this will continue to log some rows after the limit, buffer still picks up data after pause? console.log(`Read stream has been paused at line ${lineNumber} at ${new Date().toLocaleTimeString()}`); //readStream.destroy() // doesn't trigger on close or finish, is it because of the doWork async function? } }) .on('error', function(err){ console.log('Error while reading file.', err); }) .on('end', function(){ console.log(`Read ${lineNumber} lines`); // check if anything is left over (odd lots) in batch console.log(`left in batch: ${batch.length}`); if (batch.length) doWork(batch, batchId); }) .on('finish', function(){ console.log(`on Finish`); // check if anything is left over (odd lots) in batch console.log(`left in batch: ${batch.length}`); if (batch.length) doWork(batch, batchId); }) .on('close', function(){ console.log(`on Close`); // check if anything is left over (odd lots) in batch console.log(`left in batch: ${batch.length}`); if (batch.length) doWork(batch, batchId); }) }; process(1000000, 10000);