Skip to main content

Web Streams Transfer

Using Web Streams and Piscina, you can create a data processing pipeline. This is useful for cases involving real-time data transformation or analysis.

In the main script below, we set up three key components of the Web Streams API:

  • A ReadableStream that generates a sequence of numbers from 1 to 9, and then 0.
  • A WritableStream that simply logs each chunk it receives to the console.
  • A TransformStream that processes each chunk using a Piscina worker pool.

The TransformStream's transform method uses pool.run() to process each chunk in a worker thread. This allows for parallel processing of the stream data.

Finally, the script sets up a pipeline using pipeThrough() and pipeTo() methods, connecting the ReadableStream to the TransformStream, and then to the WritableStream.

worker.mjs
export default async function (num) {
return 'ABC'.repeat(num * num);
}
index.mjs
import Piscina from 'piscina';
import {
ReadableStream,
TransformStream,
WritableStream
} from 'node:stream/web';

const pool = new Piscina({
filename: new URL('./worker.mjs', import.meta.url).href
});

const readable = new ReadableStream({
start () {
this.chunks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
},

pull (controller) {
const chunk = this.chunks.shift();
controller.enqueue(chunk);
if (this.chunks.length === 0) {
controller.close();
}
}
});

const writable = new WritableStream({
write (chunk) {
console.log(chunk);
}
});

const transform = new TransformStream({
async transform (chunk, controller) {
controller.enqueue(await pool.run(chunk));
}
});

readable.pipeThrough(transform).pipeTo(writable);

You can also check out this example on github.