Background jobs
TIP
Requires @compas/store
to be installed
The queue system is based on 'static' units of work to be done in the background. It supports the following:
- Job priority's. Lower value means higher priority.
- Scheduling jobs at a set time
- Customizable handler timeouts
- Recurring job handling
- Concurrent workers pulling from the same queue
- Specific workers for a specific job
When to use which function of adding a job:
queueWorkerAddJob
: use the queue as background processing of defined units. Like converting a file to different formats, sending async or scheduled notifications. Jobs created will have a priority of '5'.queueWorkerRegisterCronJobs
: use the queue for scheduled recurring jobs based on the specificcronExpression
. Jos created will have a default priority of '4'.
Every job runs with a timeout. This timeout is enforced with an AbortSignal
on the event
argument passed to your handler and automatically checked with calls like eventStart
and newEventFromEvent
. It is determined in the following order:
- Timeout of the specific job, via
handlerTimeout
property. Should be used sporadically - Timeout of a specific handler as provided by the
handler
property. - The
handlerTimeout
property of the QueueWorker
Jobs are picked up if the following criteria are met:
- The job is not complete yet
- The job's 'scheduledAt' property is in the past
- The job's 'retryCount' value is lower than the
maxRetryCount
option.
Eligible jobs are sorted in the following order:
- By priority ascending, so a lower priority value job will run first
- By scheduledAt ascending, so an earlier scheduled job will be picked before a later scheduled job.
If a job fails, by throwing an error, other jobs may run first before any retries happen, based on the above ordering.
API
Provided by @compas/store
. A summary of the available functionality. See the docs on these functions for more information and their accepted arguments.
queueWorkerCreate
This function constructs a worker, applies the default options if no value is provided and returns a { start, stop }
synchronously. start
needs to be called before any jobs are picked up. If you need to shut down gracefully you can use await stop()
. This will finish all running jobs and prevent picking up new jobs. See the QueueWorkerOptions
as the second argument of this function for all available options and their defaults.
Some specific options include:
includeNames
/excludeNames
: let this queue worker only pick up specific jobs. This allows you to scale queue workers independently.deleteJobOnCompletion
: by default, the queue keeps history of the processed jobs. For high-volume queues, it is generally considered more efficient to delete jobs on completion. If you want to keep a history of jobs for a few days, you can usefalse
and instead usejobQueueCleanup
.unsafeIngoreSorting
: Ignore priority and scheduled based sorting. This is useful in combination withincludeNames
to create a higher throughput queue, with no guarantees of the order in which jobs are picked up.
queueWorkerAddJob
Add a new job to the queue. The name
option is mandatory. This function returns the id
of the inserted job.
queueWorkerRegisterCronJobs
Register cron jobs to the queue. Any existing cron job not in this definition will be removed from the queue, even if pending jobs exist. When the cron expression of a job is changed, it takes effect immediately. The system won't ever upgrade an existing normal job to a cron job. Note that your job may not be executed on time. Use job.data.cronLastCompletedAt
and job.data.cronExpression
to decide if you still need to execute your logic. The provided cronExpression
is evaluated in 'utc' mode.
cron-parser is used for parsing the cronExpression
. If you need a different type of scheduler, use queueWorkerAddJob
manually in your job handler.
jobQueueCleanup
A handler to remove jobs from the queue. The queue is the most performant when old completed jobs are cleaned up periodically. The advised way to use this job is the following:
- In
queueWorkerRegisterCronJobs
:{ name: "compas.queue.cleanup", cronExpression: "0 1 * * *" }
to run this job daily at 1 AM. - In your handler object:
{ "compas.queue.cleanup": jobQueueCleanup({ queueHistoryInDays: 5 }), }
jobQueueInsights
Get insights in the amount of jobs that are ready to be picked up (ie pending
) and how many jobs are scheduled at some time in the future. The advised way to use this job is the following:
- In
queueWorkerRegisterCronJobs
:{ name: "compas.queue.insights", cronExpression: "0 * * * *" }
to run this job every hour. - In your handler object:
{ "compas.queue.insights": jobQueueInsights(), }
Other @compas/store jobs
jobFileCleanup
When you delete a file via queries.fileDelete
the file is not removed from the underlying bucket. To do this syncDeletedFiles
is necessary. This job does that.
- In
queueWorkerRegisterCronJobs
:{ name: "compas.file.cleanup", cronExpression: "0 2 * * *" }
to run this job daily at 2 AM. - In your handler object:
{ "compas.file.cleanup": jobFileCleanup(s3Client, "bucketName"), }
jobFileGeneratePlaceholderImage
When you create a file via createOrUpdateFile
you have to the option to let it create a job to generate a placeholder image. This is a 10px wide JPEG that is stored on the file object, to support things like Next.js Image blurDataUrl
.
- In your handler object:
{ "compas.file.generatePlaceholderImage": jobFileGeneratePlaceholderImage(s3Client, "bucketName"), }
jobFileTransformImage
When you send files with fileSendTransformedImageResponse
, it adds this job when a not yet transformed option combination is found. This job transforms the image according to the requested options.
- In your handler object:
{ "compas.file.transformImage": jobFileTransformImage(s3Client), }
jobSessionStoreCleanup
Revoked and expired sessions of the session store are not automatically removed. This job does exactly that.
- In
queueWorkerRegisterCronJobs
:{ name: "compas.sessionStore.cleanup", cronExpression: "0 2 * * *" }
to run this job daily at 2 AM. - In your handler object:
{ "compas.sessionStore.cleanup": jobSessionStoreCleanup({ maxRevokedAgeInDays: 14 }), }
jobSessionStoreProcessLeakedSession
Process re ported leaked sessions. These jobs occur when the session store finds that refresh token is used multiple times. The job is able to either process the leaked session in to a report and log it via type: "sessionStore.leakedSession.report"
or is able to dump the raw session information via type: "sessionStore.leakedSession.dump"
- In your handler object:
{ "compas.sessionStore.potentialLeakedSession": jobSessionStoreCleanup({ maxRevokedAgeInDays: 14 }), }