Skip to main content

Stream-In

Piscina can be effectively used in data processing pipelines, handling large volumes of data efficiently by leveraging worker threads and implementing proper flow control.

The main script in the example below sets up a Piscina pool and a CSV stream, then processes each line of the CSV file through a worker thread.

We implement flow control by pausing the stream when the Piscina queue reaches its maximum size and resuming it when the queue drains. This ensures that memory usage remains under control even when processing very large files.

index.js
'use strict';

// node index [maxQueue]
// example: node index
// defaults to 100
// example: node index 100
// example: node index 500

const { resolve } = require('path');
const csv = require('csvtojson');
const Pool = require('piscina');
const { performance, PerformanceObserver } = require('perf_hooks');
const Progress = require('./progress');

const p = new PerformanceObserver((entries) => {
console.log(entries.getEntries()[0].duration);
});
p.observe({ entryTypes: ['measure'] });

const maxQueue = Math.max(parseInt(process.argv[2] || 100), 50);

const pool = new Pool({
filename: resolve(__dirname, 'worker.js'),
maxQueue
});

const stream = csv().fromFile('./data.csv');

pool.on('drain', () => {
if (stream.isPaused()) {
console.log('resuming...', pool.queueSize);
stream.resume();
}
});

const progress = new Progress();
progress.on('finished', () => {
console.log(progress.message);
});

performance.mark('A');
stream
.on('data', (data) => {
const line = data.toString('utf8');
progress.incSubmitted();
pool.run(line)
.then(() => {
progress.incCompleted();
})
.catch((err) => {
progress.incFailed();
stream.destroy(err);
});
if (pool.queueSize === maxQueue) {
console.log('pausing...', pool.queueSize, pool.utilization);
stream.pause();
}
})
.on('error', (err) => {
console.log(err.message);
console.log('run: `node generate` to generate the sample data');
})
.on('end', () => {
// We are done submitting tasks
progress.done();
performance.mark('B');
performance.measure('A to B', 'A', 'B');
});

process.on('exit', () => {
console.log('Mean Wait Time:', pool.waitTime.mean, 'ms');
console.log('Mean Run Time:', pool.runTime.mean, 'ms');
});

The worker (worker.js) is a simple function that takes a line of data and returns its JSON stringified version.

worker.js
module.exports = (data) => {
return JSON.stringify(data);
};

A custom Progress class (progress.js) is used to track the progress of the task, emitting events and providing status messages.

progress.js
'use strict';
const { EventEmitter } = require('events');

class Progress extends EventEmitter {
#tasksSubmitted = 0;
#tasksCompleted = 0;
#tasksFailed = 0;
#done = false;

done () {
this.#done = true;
}

incSubmitted () {
this.#tasksSubmitted++;
}

incCompleted () {
this.#tasksCompleted++;
if (this.#done && this.completed === this.#tasksSubmitted) {
process.nextTick(() => this.emit('finished'));
}
}

incFailed () {
this.#tasksFailed++;
}

get completed () {
return this.#tasksCompleted + this.#tasksFailed;
}

get message () {
return `${this.#tasksCompleted} of ${this.#tasksSubmitted} completed` +
` ${((this.#tasksCompleted / this.#tasksSubmitted) * 100).toFixed(2)}%` +
` [${this.#tasksFailed} failed]`;
}
};

module.exports = Progress;

A separate script (generate.js) is provided to generate sample CSV data for testing purposes.

generate.js
'use strict';

const { createWriteStream } = require('fs');

const out = createWriteStream('./data.csv');

const count = parseInt(process.argv[2] || '5000') || 5000;

out.write('a,b,c,d,e,f,g\n');

for (let n = 0; n < count; n++) {
out.write('1,2,3,4,5,6,7\n');
}

out.end();
console.log('done');

You can also check out this example on github.