Task Queue
You can extend Piscina's functionality by implementing a custom task queue. Instead of the default FIFO (First-In-First-Out) queue, it uses a priority queue to manage tasks.
The main script (index.js
) defines a PriorityTaskQueue
class that wraps the shuffled-priority-queue
package. This class implements the necessary methods (push
, remove
, shift
) that Piscina expects from a task queue. The queue prioritizes tasks based on a priority
value specified in the task options.
Task priorities only affect queued tasks. If a worker is immediately available when a task is submitted, it will be processed regardless of its priority. The priority queue comes into play when there are more tasks than available workers.
- Javascript
- Typescript
'use strict';
const spq = require('shuffled-priority-queue');
const Piscina = require('piscina');
const { resolve } = require('path');
const kItem = Symbol('item');
class PriorityTaskQueue {
queue = spq();
get size () { return this.queue.length; }
push (value) {
const queueOptions = value[Piscina.queueOptionsSymbol];
const priority = queueOptions ? (queueOptions.priority || 0) : 0;
value[kItem] = this.queue.add({ priority, value });
}
remove (value) {
this.queue.remove(value[kItem]);
}
shift () {
return this.queue.shift().value;
}
}
const pool = new Piscina({
filename: resolve(__dirname, 'worker.js'),
taskQueue: new PriorityTaskQueue(),
maxThreads: 4
});
function makeTask (task, priority) {
return { ...task, [Piscina.queueOptionsSymbol]: { priority } };
}
(async () => {
// Submit enough tasks to ensure that at least some are queued
console.log(await Promise.all([
pool.run(makeTask({ priority: 1 }, 1)),
pool.run(makeTask({ priority: 2 }, 2)),
pool.run(makeTask({ priority: 3 }, 3)),
pool.run(makeTask({ priority: 4 }, 4)),
pool.run(makeTask({ priority: 5 }, 5)),
pool.run(makeTask({ priority: 6 }, 6)),
pool.run(makeTask({ priority: 7 }, 7)),
pool.run(makeTask({ priority: 8 }, 8))
]));
})();
import { resolve } from 'path';
import Piscina from 'piscina';
const spq = require('shuffled-priority-queue');
import { filename } from './worker';
const kItem = Symbol('item');
class PriorityTaskQueue {
queue = spq();
get size() {
return this.queue.length;
}
push(value: any) {
const queueOptions = value[Piscina.queueOptionsSymbol];
const priority = queueOptions ? queueOptions.priority || 0 : 0;
value[kItem] = this.queue.add({ priority, value });
}
remove(value: any) {
this.queue.remove(value[kItem]);
}
shift() {
return this.queue.shift().value;
}
}
const pool = new Piscina({
filename: resolve(__dirname, 'workerWrapper.js'),
workerData: { fullpath: filename },
taskQueue: new PriorityTaskQueue(),
maxThreads: 4
});
function makeTask(task: { priority: number }, priority: number) {
return { ...task, [Piscina.queueOptionsSymbol]: { priority } };
}
(async () => {
// Submit enough tasks to ensure that at least some are queued
try {
console.log(
await Promise.all([
pool.run(makeTask({ priority: 1 }, 1)),
pool.run(makeTask({ priority: 2 }, 2)),
pool.run(makeTask({ priority: 3 }, 3)),
pool.run(makeTask({ priority: 4 }, 4)),
pool.run(makeTask({ priority: 5 }, 5)),
pool.run(makeTask({ priority: 6 }, 6)),
pool.run(makeTask({ priority: 7 }, 7)),
pool.run(makeTask({ priority: 8 }, 8))
])
);
} catch (error) {
console.log(error);
}
})();
const { workerData } = require('worker_threads');
if (workerData.fullpath.endsWith(".ts")) {
require("ts-node").register();
}
module.exports = require(workerData.fullpath);
The worker script (worker.js
) simulates some work by sleeping for 100ms, then logs and returns the priority of the task it processed. This allows us to observe the order in which tasks are executed.
- Javascript
- Typescript
'use strict';
const { promisify } = require('util');
const sleep = promisify(setTimeout);
module.exports = async ({ priority }) => {
await sleep(100);
process._rawDebug(`${priority}`);
return priority;
};
import { promisify } from 'util';
const sleep = promisify(setTimeout);
export default async ({ priority }: { priority: number }): Promise<number> => {
await sleep(100);
process._rawDebug(`${priority}`);
return priority;
};
export const filename = __filename;
You can also check out this example on github.