Skip to main content

Web Streams

You can work with modern Web APIs like Web Streams using Piscina. Web Streams enable efficient processing of streaming data across multiple threads. It's particularly useful for scenarios involving large datasets or real-time data processing where the benefits of multi-threading can be significant.

In the main script (index.mjs), we create a Piscina instance and set up a ReadableStream that generates a sequence of numbers. We also create a WritableStream that simply logs each chunk it receives. The script then runs a task in the worker pool, passing both streams as arguments.

The worker script (worker.mjs) defines an async function that reads from the provided ReadableStream and writes to the WritableStream. It uses a for-await loop to iterate over the chunks in the ReadableStream, writing each chunk to the WritableStream.

worker.mjs
export default async function ({ readable, writable }) {
const writer = writable.getWriter();
for await (const chunk of readable) {
await writer.write(chunk);
}
writer.close();
}
index.mjs
import Piscina from 'piscina';
import {
ReadableStream,
WritableStream
} from 'node:stream/web';

const pool = new Piscina({
filename: new URL('./worker.mjs', import.meta.url).href
});

const readable = new ReadableStream({
start () {
this.chunks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
},

pull (controller) {
const chunk = this.chunks.shift();
controller.enqueue(chunk);
if (this.chunks.length === 0) {
controller.close();
}
}
});

const writable = new WritableStream({
write (chunk) {
console.log(chunk);
}
});

(async function () {
await pool.run({ readable, writable }, { transferList: [readable, writable] });
})()

A key aspect of this example is the use of the transferList option when running the task. This allows the ReadableStream and WritableStream instances to be transferred to the worker thread, rather than cloned. This is crucial for maintaining the integrity of the streams across threads.

You can also check out this example on github.