Skip to content

Commit

Permalink
fix(worker): wait fetched jobs to be processed when closing (#3059)
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Feb 7, 2025
1 parent a5c1dea commit d4de2f5
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 97 deletions.
173 changes: 89 additions & 84 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ export class Worker<
protected paused: boolean;
protected processFn: Processor<DataType, ResultType, NameType>;
protected running = false;
protected mainLoopRunning: Promise<void> | null = null;

static RateLimitError(): Error {
return new RateLimitError();
Expand Down Expand Up @@ -447,100 +448,108 @@ export class Worker<

await this.startStalledCheckTimer();

const jobsInProgress = new Set<{ job: Job; ts: number }>();
this.startLockExtenderTimer(jobsInProgress);
const client = await this.client;
const bclient = await this.blockingConnection.client;

const asyncFifoQueue = (this.asyncFifoQueue =
new AsyncFifoQueue<void | Job<DataType, ResultType, NameType>>());
this.mainLoopRunning = this.mainLoop(client, bclient);

let tokenPostfix = 0;
// We must await here or finally will be called too early.
await this.mainLoopRunning;
} finally {
this.running = false;
}
}

const client = await this.client;
const bclient = await this.blockingConnection.client;
/**
* This is the main loop in BullMQ. Its goals are to fetch jobs from the queue
* as efficiently as possible, providing concurrency and minimal unnecessary calls
* to Redis.
*/
private async mainLoop(client: RedisClient, bclient: RedisClient) {
const asyncFifoQueue = (this.asyncFifoQueue = new AsyncFifoQueue<void | Job<
DataType,
ResultType,
NameType
>>());
const jobsInProgress = new Set<{ job: Job; ts: number }>();
this.startLockExtenderTimer(jobsInProgress);

let tokenPostfix = 0;

while (!this.closing && !this.paused) {
let numTotal = asyncFifoQueue.numTotal();

/**
* This is the main loop in BullMQ. Its goals are to fetch jobs from the queue
* as efficiently as possible, providing concurrency and minimal unnecessary calls
* to Redis.
* This inner loop tries to fetch jobs concurrently, but if we are waiting for a job
* to arrive at the queue we should not try to fetch more jobs (as it would be pointless)
*/
while (!(this.closing || this.paused)) {
let numTotal = asyncFifoQueue.numTotal();

/**
* This inner loop tries to fetch jobs concurrently, but if we are waiting for a job
* to arrive at the queue we should not try to fetch more jobs (as it would be pointless)
*/
while (
!this.waiting &&
!this.paused &&
numTotal < this._concurrency &&
(!this.limitUntil || numTotal == 0)
) {
const token = `${this.id}:${tokenPostfix++}`;

const fetchedJob = this.retryIfFailed<void | Job<
DataType,
ResultType,
NameType
>>(
() => this._getNextJob(client, bclient, token, { block: true }),
this.opts.runRetryDelay,
);
asyncFifoQueue.add(fetchedJob);

numTotal = asyncFifoQueue.numTotal();
while (
!this.closing &&
!this.paused &&
!this.waiting &&
numTotal < this._concurrency &&
(!this.limitUntil || numTotal == 0)
) {
const token = `${this.id}:${tokenPostfix++}`;

const fetchedJob = this.retryIfFailed<void | Job<
DataType,
ResultType,
NameType
>>(
() => this._getNextJob(client, bclient, token, { block: true }),
this.opts.runRetryDelay,
);
asyncFifoQueue.add(fetchedJob);

if (this.waiting && numTotal > 1) {
// We are waiting for jobs but we have others that we could start processing already
break;
}
numTotal = asyncFifoQueue.numTotal();

// We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls
// to Redis in high concurrency scenarios.
const job = await fetchedJob;
if (this.waiting && numTotal > 1) {
// We are waiting for jobs but we have others that we could start processing already
break;
}

// No more jobs waiting but we have others that could start processing already
if (!job && numTotal > 1) {
break;
}
// We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls
// to Redis in high concurrency scenarios.
const job = await fetchedJob;

// If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting
// for processing this job.
if (this.blockUntil) {
break;
}
// No more jobs waiting but we have others that could start processing already
if (!job && numTotal > 1) {
break;
}

// Since there can be undefined jobs in the queue (when a job fails or queue is empty)
// we iterate until we find a job.
let job: Job<DataType, ResultType, NameType> | void;
do {
job = await asyncFifoQueue.fetch();
} while (!job && asyncFifoQueue.numQueued() > 0);

if (job) {
const token = job.token;
asyncFifoQueue.add(
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
() =>
this.processJob(
<Job<DataType, ResultType, NameType>>job,
token,
() => asyncFifoQueue.numTotal() <= this._concurrency,
jobsInProgress,
),
this.opts.runRetryDelay,
),
);
// If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting
// for processing this job.
if (this.blockUntil) {
break;
}
}

this.running = false;
return await asyncFifoQueue.waitAll();
} catch (error) {
this.running = false;
throw error;
// Since there can be undefined jobs in the queue (when a job fails or queue is empty)
// we iterate until we find a job.
let job: Job<DataType, ResultType, NameType> | void;
do {
job = await asyncFifoQueue.fetch();
} while (!job && asyncFifoQueue.numQueued() > 0);

if (job) {
const token = job.token;
asyncFifoQueue.add(
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
() =>
this.processJob(
<Job<DataType, ResultType, NameType>>job,
token,
() => asyncFifoQueue.numTotal() <= this._concurrency,
jobsInProgress,
),
this.opts.runRetryDelay,
),
);
}
}

return asyncFifoQueue.waitAll();
}

/**
Expand Down Expand Up @@ -815,10 +824,6 @@ will never work with more accuracy than 1ms. */
fetchNextCallback = () => true,
jobsInProgress: Set<{ job: Job; ts: number }>,
): Promise<void | Job<DataType, ResultType, NameType>> {
if (!job || this.closing) {
return;
}

const srcPropagationMedatada = job.opts?.telemetry?.metadata;

return this.trace<void | Job<DataType, ResultType, NameType>>(
Expand Down Expand Up @@ -1160,8 +1165,8 @@ will never work with more accuracy than 1ms. */
reconnect = false;
}

if (this.asyncFifoQueue) {
await this.asyncFifoQueue.waitAll();
if (this.mainLoopRunning) {
await this.mainLoopRunning;
}

reconnect && (await this.blockingConnection.reconnect());
Expand Down
2 changes: 1 addition & 1 deletion tests/test_bulk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ describe('bulk jobs', () => {
await worker.close();
await worker2.close();
await queueEvents.close();
});
}).timeout(10_000);

it('should process jobs with custom ids', async () => {
const name = 'test';
Expand Down
21 changes: 9 additions & 12 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2404,18 +2404,6 @@ describe('workers', function () {
const worker = new Worker(
queueName,
async () => {
try {
if (++i === 4) {
// Pause when all 4 works are processing
await worker.pause();

// Wait for all the active jobs to finalize.
expect(nbJobFinish).to.be.equal(3);
}
} catch (err) {
console.error(err);
}

// 100 - i*20 is to force to finish job n°4 before lower jobs that will wait longer
await delay(100 - i * 10);
nbJobFinish++;
Expand All @@ -2434,6 +2422,15 @@ describe('workers', function () {
);
await worker.waitUntilReady();

worker.on('active', async () => {
if (++i === 4) {
// Pause when all 4 works are processing
await worker.pause();
// Wait for all the active jobs to finalize.
expect(nbJobFinish).to.be.equal(3);
}
});

const waiting = new Promise((resolve, reject) => {
const cb = after(8, resolve);
worker.on('completed', cb);
Expand Down

0 comments on commit d4de2f5

Please sign in to comment.