Skip to main content

Managing Worker Threads

Delaying Availability of Workers

A worker thread will not be made available to process tasks until Piscina determines that it is "ready". By default, a worker is ready as soon as Piscina loads it and acquires a reference to the exported handler function.

There may be times when the availability of a worker may need to be delayed longer while the worker initializes any resources it may need to operate. To support this case, the worker module may export a Promise that resolves the handler function as opposed to exporting the function directly:

worker.js
async function initialize() {
await someAsyncInitializationActivity();
return ({ a, b }) => a + b;
}

module.exports = initialize();

Piscina will await the resolution of the exported Promise before marking the worker thread available.

Managing Task Overload with Backpressure

When the maxQueue option is set, once the Piscina queue is full, no additional tasks may be submitted until the queue size falls below the limit. The 'drain' event may be used to receive notification when the queue is empty and all tasks have been submitted to workers for processing.

The example below shows how to use a Node.js stream to feed a Piscina worker pool:

"use strict";

const { resolve } = require("path");
const Pool = require("piscina");

const pool = new Pool({
filename: resolve(__dirname, "worker.js"),
maxQueue: "auto",
});

const stream = getStreamSomehow();
stream.setEncoding("utf8");

pool.on("drain", () => {
if (stream.isPaused()) {
console.log("resuming...", counter, pool.queueSize);
stream.resume();
}
});

stream
.on("data", (data) => {
pool.run(data);
if (pool.queueSize === pool.options.maxQueue) {
console.log("pausing...", counter, pool.queueSize);
stream.pause();
}
})
.on("error", console.error)
.on("end", () => {
console.log("done");
});

Out of Scope Asynchronous Code

A worker thread is only active until the moment it returns a result, it can be a result of a synchronous call or a Promise that will be fulfilled/rejected in the future. Once this is done, Piscina will wait for stdout and stderr to be flushed, and then pause the worker's event-loop until the next call. If async code is scheduled without being awaited before returning since Piscina has no way of detecting this, that code execution will be resumed on the next call. Thus, it is highly recommended to properly handle all async tasks before returning a result as it could make your code unpredictable.

For example:

const { setTimeout } = require("timers/promises");

module.exports = ({ a, b }) => {
// This promise should be awaited
setTimeout(1000).then(() => {
console.log("Working"); // This will **not** run during the same worker call
});

return a + b;
};