Web Streams Transfer
Using Web Streams and Piscina, you can create a data processing pipeline. This is useful for cases involving real-time data transformation or analysis.
In the main script below, we set up three key components of the Web Streams API:
- A
ReadableStream
that generates a sequence of numbers from 1 to 9, and then 0. - A
WritableStream
that simply logs each chunk it receives to the console. - A
TransformStream
that processes each chunk using a Piscina worker pool.
The TransformStream's transform
method uses pool.run()
to process each chunk in a worker thread. This allows for parallel processing of the stream data.
Finally, the script sets up a pipeline using pipeThrough()
and pipeTo()
methods, connecting the ReadableStream
to the TransformStream
, and then to the WritableStream
.
- Javascript
- Typescript
worker.mjs
export default async function (num) {
return 'ABC'.repeat(num * num);
}
index.mjs
import Piscina from 'piscina';
import {
ReadableStream,
TransformStream,
WritableStream
} from 'node:stream/web';
const pool = new Piscina({
filename: new URL('./worker.mjs', import.meta.url).href
});
const readable = new ReadableStream({
start () {
this.chunks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
},
pull (controller) {
const chunk = this.chunks.shift();
controller.enqueue(chunk);
if (this.chunks.length === 0) {
controller.close();
}
}
});
const writable = new WritableStream({
write (chunk) {
console.log(chunk);
}
});
const transform = new TransformStream({
async transform (chunk, controller) {
controller.enqueue(await pool.run(chunk));
}
});
readable.pipeThrough(transform).pipeTo(writable);
worker.ts
export default async function (num: number): Promise<string> {
return 'ABC'.repeat(num * num);
}
export const filename = __filename;
index.ts
import { resolve } from 'path';
import Piscina from 'piscina';
import {
ReadableStream,
WritableStream
} from 'node:stream/web';
import { filename } from './worker';
const pool = new Piscina({
filename: resolve(__dirname, 'workerWrapper.js'),
workerData: { fullpath: filename },
});
const readable = new ReadableStream({
start () {
this.chunks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
},
pull (controller) {
const chunk = this.chunks.shift();
controller.enqueue(chunk);
if (this.chunks.length === 0) {
controller.close();
}
}
});
const writable = new WritableStream({
write (chunk) {
console.log(chunk);
}
});
const transform = new TransformStream({
async transform (chunk, controller) {
controller.enqueue(await pool.run(chunk));
}
});
readable.pipeThrough(transform).pipeTo(writable);
workerWrapper.js
const { workerData } = require('worker_threads');
if (workerData.fullpath.endsWith(".ts")) {
require("ts-node").register();
}
module.exports = require(workerData.fullpath);
You can also check out this example on github.