Stream-In
Piscina can be effectively used in data processing pipelines, handling large volumes of data efficiently by leveraging worker threads and implementing proper flow control.
The main script in the example below sets up a Piscina pool and a CSV stream, then processes each line of the CSV file through a worker thread.
We implement flow control by pausing the stream when the Piscina queue reaches its maximum size and resuming it when the queue drains. This ensures that memory usage remains under control even when processing very large files.
- Javascript
- Typescript
'use strict';
// node index [maxQueue]
// example: node index
// defaults to 100
// example: node index 100
// example: node index 500
const { resolve } = require('path');
const csv = require('csvtojson');
const Pool = require('piscina');
const { performance, PerformanceObserver } = require('perf_hooks');
const Progress = require('./progress');
const p = new PerformanceObserver((entries) => {
console.log(entries.getEntries()[0].duration);
});
p.observe({ entryTypes: ['measure'] });
const maxQueue = Math.max(parseInt(process.argv[2] || 100), 50);
const pool = new Pool({
filename: resolve(__dirname, 'worker.js'),
maxQueue
});
const stream = csv().fromFile('./data.csv');
pool.on('drain', () => {
if (stream.isPaused()) {
console.log('resuming...', pool.queueSize);
stream.resume();
}
});
const progress = new Progress();
progress.on('finished', () => {
console.log(progress.message);
});
performance.mark('A');
stream
.on('data', (data) => {
const line = data.toString('utf8');
progress.incSubmitted();
pool.run(line)
.then(() => {
progress.incCompleted();
})
.catch((err) => {
progress.incFailed();
stream.destroy(err);
});
if (pool.queueSize === maxQueue) {
console.log('pausing...', pool.queueSize, pool.utilization);
stream.pause();
}
})
.on('error', (err) => {
console.log(err.message);
console.log('run: `node generate` to generate the sample data');
})
.on('end', () => {
// We are done submitting tasks
progress.done();
performance.mark('B');
performance.measure('A to B', 'A', 'B');
});
process.on('exit', () => {
console.log('Mean Wait Time:', pool.waitTime.mean, 'ms');
console.log('Mean Run Time:', pool.runTime.mean, 'ms');
});
// node index [maxQueue]
// example: node index
// defaults to 100
// example: node index 100
// example: node index 500
import { resolve } from 'path';
import csv from 'csvtojson';
import Pool from 'piscina';
import { performance, PerformanceObserver } from 'perf_hooks';
import { filename } from './worker';
import Progress from './progress';
const p = new PerformanceObserver((entries) => {
console.log(entries.getEntries()[0].duration);
});
p.observe({ entryTypes: ['measure'] });
const maxQueue = Math.max(parseInt(process.argv[2] || '100', 10), 50);
const pool = new Pool({
filename: resolve(__dirname, 'workerWrapper.js'),
workerData: { fullpath: filename },
maxQueue,
});
const stream = csv().fromFile('./data.csv');
pool.on('drain', () => {
if (stream.isPaused()) {
console.log('resuming...', pool.queueSize);
stream.resume();
}
});
const progress = new Progress();
progress.on('finished', () => {
console.log(progress.message);
});
performance.mark('A');
stream
.on('data', (data) => {
const line = data.toString('utf8');
progress.incSubmitted();
pool.run(line)
.then(() => {
progress.incCompleted();
})
.catch((err) => {
progress.incFailed();
stream.destroy(err);
});
if (pool.queueSize === maxQueue) {
console.log('pausing...', pool.queueSize, pool.utilization);
stream.pause();
}
})
.on('error', (err) => {
console.log(err.message);
console.log('run: `node generate` to generate the sample data');
})
.on('end', () => {
// We are done submitting tasks
progress.done();
performance.mark('B');
performance.measure('A to B', 'A', 'B');
});
process.on('exit', () => {
console.log('Mean Wait Time:', pool.waitTime.mean, 'ms');
console.log('Mean Run Time:', pool.runTime.mean, 'ms');
});
const { workerData } = require('worker_threads');
if (workerData.fullpath.endsWith(".ts")) {
require("ts-node").register();
}
module.exports = require(workerData.fullpath);
The worker (worker.js
) is a simple function that takes a line of data and returns its JSON stringified version.
- Javascript
- Typescript
module.exports = (data) => {
return JSON.stringify(data);
};
export default function toJson(data: any): string {
return JSON.stringify(data);
}
export const filename = __filename;
A custom Progress
class (progress.js
) is used to track the progress of the task, emitting events and providing status messages.
- Javascript
- Typescript
'use strict';
const { EventEmitter } = require('events');
class Progress extends EventEmitter {
#tasksSubmitted = 0;
#tasksCompleted = 0;
#tasksFailed = 0;
#done = false;
done () {
this.#done = true;
}
incSubmitted () {
this.#tasksSubmitted++;
}
incCompleted () {
this.#tasksCompleted++;
if (this.#done && this.completed === this.#tasksSubmitted) {
process.nextTick(() => this.emit('finished'));
}
}
incFailed () {
this.#tasksFailed++;
}
get completed () {
return this.#tasksCompleted + this.#tasksFailed;
}
get message () {
return `${this.#tasksCompleted} of ${this.#tasksSubmitted} completed` +
` ${((this.#tasksCompleted / this.#tasksSubmitted) * 100).toFixed(2)}%` +
` [${this.#tasksFailed} failed]`;
}
};
module.exports = Progress;
import { EventEmitter } from 'events';
class Progress extends EventEmitter {
#tasksSubmitted = 0;
#tasksCompleted = 0;
#tasksFailed = 0;
#done = false;
done() {
this.#done = true;
}
incSubmitted() {
this.#tasksSubmitted++;
}
incCompleted() {
this.#tasksCompleted++;
if (this.#done && this.completed === this.#tasksSubmitted) {
process.nextTick(() => this.emit('finished'));
}
}
incFailed() {
this.#tasksFailed++;
}
get completed() {
return this.#tasksCompleted + this.#tasksFailed;
}
get message() {
return `${this.#tasksCompleted} of ${this.#tasksSubmitted} completed` +
` ${((this.#tasksCompleted / this.#tasksSubmitted) * 100).toFixed(2)}%` +
` [${this.#tasksFailed} failed]`;
}
}
export default Progress;
A separate script (generate.js
) is provided to generate sample CSV data for testing purposes.
- Javascript
- Typescript
'use strict';
const { createWriteStream } = require('fs');
const out = createWriteStream('./data.csv');
const count = parseInt(process.argv[2] || '5000') || 5000;
out.write('a,b,c,d,e,f,g\n');
for (let n = 0; n < count; n++) {
out.write('1,2,3,4,5,6,7\n');
}
out.end();
console.log('done');
import { createWriteStream } from 'fs';
const out = createWriteStream('./data.csv');
const count = parseInt(process.argv[2] || '5000') || 5000;
out.write('a,b,c,d,e,f,g\n');
for (let n = 0; n < count; n++) {
out.write('1,2,3,4,5,6,7\n');
}
out.end();
console.log('done');
You can also check out this example on github.