Instance
Class: Piscina
Piscina works by creating a pool of Node.js Worker Threads to which one or more tasks may be dispatched. Each worker thread executes a single exported function defined in a separate file. Whenever a task is dispatched to a worker, the worker invokes the exported function and reports the return value back to Piscina when the function completes.
This class extends EventEmitter
from Node.js.
Constructor: new Piscina([options])
-
The following optional configuration is supported:
filename
: (string | null
) Provides the default source for the code that runs the tasks on Worker threads. This should be an absolute path or an absolutefile://
URL to a file that exports a JavaScriptfunction
orasync function
as its default export ormodule.exports
. ES modules are supported.name
: (string | null
) Provides the name of the default exported worker function. The default is'default'
, indicating the default export of the worker module.minThreads
: (number
) Sets the minimum number of threads that are always running for this thread pool. The default is the number provided byos.availableParallelism
.maxThreads
: (number
) Sets the maximum number of threads that are running for this thread pool. The default is the number provided byos.availableParallelism
* 1.5.idleTimeout
: (number
) A timeout in milliseconds that specifies how long aWorker
is allowed to be idle, i.e. not handling any tasks, before it is shut down. By default, this is immediate.infoThe default
idleTimeout
can lead to some performance loss in the application because of the overhead involved with stopping and starting new worker threads. To improve performance, try setting theidleTimeout
explicitly.maxQueue
: (number
|string
) The maximum number of tasks that may be scheduled to run, but not yet running due to lack of available threads, at a given time. By default, there is no limit. The special value'auto'
may be used to have Piscina calculate the maximum as the square ofmaxThreads
. When'auto'
is used, the calculatedmaxQueue
value may be found by checking theoptions.maxQueue
property.concurrentTasksPerWorker
: (number
) Specifies how many tasks can share a single Worker thread simultaneously. The default is1
. This generally only makes sense to specify if there is some kind of asynchronous component to the task. Keep in mind that Worker threads are generally not built for handling I/O in parallel.atomics
: (sync
|async
|disabled
) Use theAtomics
API for faster communication between threads. This is on by default. You can disableAtomics
globally by setting the environment variablePISCINA_DISABLE_ATOMICS
to1
. Ifatomics
issync
, it will cause to pause threads (stoping all execution) between tasks. Ideally, threads should wait for all operations to finish before returning control to the main thread (avoid having open handles within a thread). If still want to have the possibility of having open handles or handle asynchrnous tasks, you can set the environment variablePISCINA_ENABLE_ASYNC_ATOMICS
to1
or settingoptions.atomics
toasync
.
** :::info
Note: The async
mode comes with performance penalties and can lead to undesired behaviour if open handles are not tracked correctly.
Workers should be designed to wait for all operations to finish before returning control to the main thread, if any background operations are still running
async
can be of help (e.g. for cache warming, etc).
:::
**
resourceLimits
: (object
) See Node.js new Worker optionsmaxOldGenerationSizeMb
: (number
) The maximum size of each worker threads main heap in MB.maxYoungGenerationSizeMb
: (number
) The maximum size of a heap space for recently created objects.codeRangeSizeMb
: (number
) The size of a pre-allocated memory range used for generated code.stackSizeMb
: (number
) The default maximum stack size for the thread. Small values may lead to unusable Worker instances. Default: 4
env
: (object
) If set, specifies the initial value ofprocess.env
inside the worker threads. See Node.js new Worker options for details.argv
: (any[]
) List of arguments that will be stringified and appended toprocess.argv
in the worker. See Node.js new Worker options for details.execArgv
: (string[]
) List of Node.js CLI options passed to the worker. See Node.js new Worker options for details.workerData
: (any
) Any JavaScript value that can be cloned and made available asrequire('piscina').workerData
. See Node.js new Worker options for details. Unlike regular Node.js Worker Threads,workerData
must not specify any value requiring atransferList
. This is because theworkerData
will be cloned for each pooled worker.taskQueue
: (TaskQueue
) By default, Piscina uses a first-in-first-out queue for submitted tasks. ThetaskQueue
option can be used to provide an alternative implementation. See Custom Task Queues for additional detail.niceIncrement
: (number
) An optional value that decreases priority for the individual threads, i.e. the higher the value, the lower the priority of the Worker threads. This value is used on Unix/Windows and requires the optional@napi-rs/nice
module to be installed. Seenice(2)
andSetThreadPriority
for more details.trackUnmanagedFds
: (boolean
) An optional setting that, whentrue
, will cause Workers to track file descriptors managed usingfs.open()
andfs.close()
, and will close them automatically when the Worker exits. Defaults totrue
. (This option is only supported on Node.js 12.19+ and all Node.js versions higher than 14.6.0).closeTimeout
: (number
) An optional time (in milliseconds) to wait for the pool to complete all in-flight tasks whenclose()
is called. The default is30000
recordTiming
: (boolean
) By default, run and wait time will be recorded for the pool. To disable, set tofalse
.workerHistogram
: (boolean
) By defaultfalse
. It will hint the Worker pool to record statistics for each individual WorkerloadBalancer
: (PiscinaLoadBalancer
) By default, Piscina uses a least-busy algorithm. TheloadBalancer
option can be used to provide an alternative implementation. See Custom Load Balancers for additional detail.workerHistogram
: (boolean
) By defaultfalse
. It will hint the Worker pool to record statistics for each individual WorkerloadBalancer
: (PiscinaLoadBalancer
) By default, Piscina uses a least-busy algorithm. TheloadBalancer
option can be used to provide an alternative implementation. See Custom Load Balancers for additional detail.
Use caution when setting resource limits. Setting limits that are too low may
result in the Piscina
worker threads being unusable.
PiscinaLoadBalancer
The PiscinaLoadBalancer
interface is used to implement custom load balancing algorithm that determines which worker thread should be assigned a task.
For more information, see Custom Load Balancers.
Interface: PiscinaLoadBalancer
type PiscinaLoadBalancer = (
task: PiscinaTask, // Task to be distributed
workers: PiscinaWorker[] // Array of Worker instances
) => PiscinaWorker | null; // Worker instance to be assigned the task
If the PiscinaLoadBalancer
returns null
, Piscina
will attempt to spawn a new worker, otherwise the task will be queued until a worker is available.
Interface: PiscinaTask
interface PiscinaTask {
taskId: number; // Unique identifier for the task
filename: string; // Filename of the worker module
name: string; // Name of the worker function
created: number; // Timestamp when the task was created
isAbortable: boolean; // Indicates if the task can be aborted through AbortSignal
}
Interface: PiscinaWorker
interface PiscinaWorker {
id: number; // Unique identifier for the worker
currentUsage: number; // Number of tasks currently running on the worker
isRunningAbortableTask: boolean; // Indicates if the worker is running an abortable task
histogram: HistogramSummary | null; // Worker histogram
terminating: boolean; // Indicates if the worker is terminating
destroyed: boolean; // Indicates if the worker has been destroyed
}
Example: Custom Load Balancer
JavaScript
const { Piscina } = require('piscina');
function LeastBusyBalancer(opts) {
const { maximumUsage } = opts;
return (task, workers) => {
let candidate = null;
let checkpoint = maximumUsage;
for (const worker of workers) {
if (worker.currentUsage === 0) {
candidate = worker;
break;
}
if (worker.isRunningAbortableTask) continue;
if (!task.isAbortable && worker.currentUsage < checkpoint) {
candidate = worker;
checkpoint = worker.currentUsage;
}
}
return candidate;
};
}
const piscina = new Piscina({
loadBalancer: LeastBusyBalancer({ maximumUsage: 2 }),
});
piscina
.runTask({ filename: 'worker.js', name: 'default' })
.then((result) => console.log(result))
.catch((err) => console.error(err));
TypeScript
import { Piscina } from 'piscina';
function LeastBusyBalancer(
opts: LeastBusyBalancerOptions
): PiscinaLoadBalancer {
const { maximumUsage } = opts;
return (task, workers) => {
let candidate: PiscinaWorker | null = null;
let checkpoint = maximumUsage;
for (const worker of workers) {
if (worker.currentUsage === 0) {
candidate = worker;
break;
}
if (worker.isRunningAbortableTask) continue;
if (!task.isAbortable && worker.currentUsage < checkpoint) {
candidate = worker;
checkpoint = worker.currentUsage;
}
}
return candidate;
};
}
const piscina = new Piscina({
loadBalancer: LeastBusyBalancer({ maximumUsage: 2 }),
});
piscina
.runTask({ filename: 'worker.js', name: 'default' })
.then((result) => console.log(result))
.catch((err) => console.error(err));