Skip to main content

Stream

This example explains how Piscina can be used for more complex scenarios involving stream processing. Piscina allows for efficient data transfer between the main thread and worker threads using MessagePort.

The main script (index.mjs) sets up a pipeline that reads a file, processes its contents through a worker thread (converting text to uppercase), and outputs the result.

index.mjs
import Piscina from 'piscina';
import { MessagePortDuplex } from './stream.mjs';
import { createReadStream } from 'fs';
import { pipeline } from 'stream';

const pool = new Piscina({
filename: new URL('./worker.mjs', import.meta.url).href
});

const { port1, port2 } = new MessageChannel();

pool.run(port2, { transferList: [port2] });

const duplex = new MessagePortDuplex(port1);
pipeline(
createReadStream(new URL('./index.mjs', import.meta.url).pathname),
duplex,
process.stdout,
(err) => { if (err) throw err; });

The stream file defines custom stream classes that extend standard Node.js streams to work with MessagePort. These classes enable efficient communication between the main thread and worker threads.

stream.mjs
import {
Writable,
Readable,
Duplex
} from 'stream';

const kPort = Symbol('kPort');

export class MessagePortWritable extends Writable {
constructor (port, options) {
super(options);
this[kPort] = port;
}

_write (buf, _, cb) {
this[kPort].postMessage(buf, [buf.buffer]);
cb();
}

_writev (data, cb) {
const chunks = new Array(data.length);
const transfers = new Array(data.length);
for (let n = 0; n < data.length; n++) {
chunks[n] = data[n].chunk;
transfers[n] = data[n].chunk.buffs;
}
this[kPort].postMessage(chunks, transfers);
cb();
}

_final (cb) {
this[kPort].postMessage(null);
cb();
}

_destroy (err, cb) {
this[kPort].close(() => cb(err));
}

unref () { this[kPort].unref(); return this; }
ref () { this[kPort].ref(); return this; }
}

export class MessagePortReadable extends Readable {
constructor (port, options) {
super(options);
this[kPort] = port;
port.onmessage = ({ data }) => this.push(data);
}

_read () {
this[kPort].start();
}

_destroy (err, cb) {
this[kPort].close(() => {
this[kPort].onmessage = undefined;
cb(err);
});
}

unref () { this[kPort].unref(); return this; }
ref () { this[kPort].ref(); return this; }
}

export class MessagePortDuplex extends Duplex {
constructor (port, options) {
super(options);
this[kPort] = port;
port.onmessage = ({ data }) => this.push(data);
}

_read () {
this[kPort].start();
}

_write (buf, _, cb) {
this[kPort].postMessage(buf, [buf.buffer]);
cb();
}

_writev (data, cb) {
const chunks = new Array(data.length);
const transfers = new Array(data.length);
for (let n = 0; n < data.length; n++) {
chunks[n] = data[n].chunk;
transfers[n] = data[n].chunk.buffs;
}
this[kPort].postMessage(chunks, transfers);
cb();
}

_final (cb) {
this[kPort].postMessage(null);
cb();
}

_destroy (err, cb) {
this[kPort].close(() => {
this[kPort].onmessage = undefined;
cb(err);
});
}

unref () { this[kPort].unref(); return this; }
ref () { this[kPort].ref(); return this; }
}

The worker script defines a function that processes data through a MessagePortDuplex stream, converting incoming text to uppercase and writing it back.

worker.mjs
import { MessagePortDuplex } from './stream.mjs';

export default function (port) {
let res;
const ret = new Promise((resolve) => {
res = resolve;
});
const duplex = new MessagePortDuplex(port);
duplex.setEncoding('utf8');
duplex.on('data', (chunk) => duplex.write(chunk.toUpperCase()));
duplex.on('end', () => {
duplex.end();
res();
});
return ret;
}

You can also check out this example on github.