Worker & Job Queue
The Vendure Worker is a Node.js process responsible for running computationally intensive or otherwise long-running tasks in the background. For example, updating a search index or sending emails. Running such tasks in the background allows the server to stay responsive, since a response can be returned immediately without waiting for the slower tasks to complete.
Put another way, the Worker executes jobs which have been placed in the job queue.
The worker
The worker is started by calling the bootstrapWorker() function with the same
configuration as is passed to the main server bootstrap(). In a standard Vendure installation, this is found
in the index-worker.ts file:
Underlying architecture
The Worker is a NestJS standalone application. This means it is almost identical to the main server app,
but does not have any network layer listening for requests. The server communicates with the worker
via a “job queue” architecture. The exact implementation of the job queue is dependent on the
configured JobQueueStrategy, but by default
the worker polls the database for new jobs.
Multiple workers
It is possible to run multiple workers in parallel to better handle heavy loads. Using the
JobQueueOptions.activeQueues configuration, it is even possible to have particular workers dedicated
to one or more specific types of jobs. For example, if your application does video transcoding,
you might want to set up a dedicated worker just for that task:
Running jobs on the main process
It is possible to run jobs from the Job Queue on the main server. This is mainly used for testing and automated tasks, and is not advised for production use, since it negates the benefits of running long tasks off of the main process. To do so, you need to manually start the JobQueueService:
ProcessContext
Sometimes your code may need to be aware of whether it is being run as part of a server or worker process.
In this case you can inject the ProcessContext provider and query it like this:
The job queue
Vendure uses a job queue to handle the processing of certain tasks which are typically too slow to run in the normal request-response cycle. A normal request-response looks like this:
In the normal request-response, all intermediate tasks (looking up data in the database, performing business logic etc.) occur before the response can be returned. For most operations this is fine, since those intermediate tasks are very fast.
Some operations however will need to perform much longer-running tasks. For example, updating the search index on thousands of products could take up to a minute or more. In this case, we certainly don’t want to delay the response until that processing has completed. That’s where a job queue comes in:
What does Vendure use the job queue for?
By default, Vendure uses the job queue for the following tasks:
- Re-building the search index
- Updating the search index when changes are made to Products, ProductVariants, Assets etc.
- Updating the contents of Collections
- Sending transactional emails
How does the Job Queue work?
This diagram illustrates the job queue mechanism:
The server adds jobs to the queue. The worker then picks up these jobs from the queue and processes them in sequence, one by one (it is possible to increase job queue throughput by running multiple workers or by increasing the concurrency of a single worker).
JobQueueStrategy
The actual queue part is defined by the configured JobQueueStrategy.
If no strategy is defined, Vendure uses an in-memory store of the contents of each queue. While this has the advantage of requiring no external dependencies, it is not suitable for production because when the server is stopped, the entire queue will be lost and any pending jobs will never be processed. Moreover, it cannot be used when running the worker as a separate process.
A better alternative is to use the DefaultJobQueuePlugin
(which will be used in a standard @vendure/create installation), which configures Vendure to use the SqlJobQueueStrategy.
This strategy uses the database as a queue, and means that even if the Vendure server stops, pending jobs will be persisted and upon re-start, they will be processed.
It is also possible to implement your own JobQueueStrategy to take advantage of other technologies. Examples include RabbitMQ, Google Cloud Pub Sub & Amazon SQS. It may make sense to implement a custom strategy based on one of these if the default database-based approach does not meet your performance requirements.
Job Queue Performance
It is common for larger Vendure projects to define multiple custom job queues, When using the DefaultJobQueuePlugin
with many queues, performance may be impacted. This is because the SqlJobQueueStrategy uses polling to check for
new jobs in the database. Each queue will (by default) query the database every 200ms. So if there are 10 queues,
this will result in a constant 50 queries/second.
In this case it is recommended to try the BullMQJobQueuePlugin, which uses an efficient push-based strategy built on Redis.
Using Job Queues in a plugin
If your plugin involves long-running tasks, you can also make use of the job queue.
A real example of this can be seen in the EmailPlugin source
Let's say you are building a plugin which allows a video URL to be specified, and then that video gets transcoded into a format suitable for streaming on the storefront. This is a long-running task which should not block the main thread, so we will use the job queue to run the task on the worker.
First we'll add a new mutation to the Admin API schema:
The resolver looks like this:
The resolver just defines how to handle the new addVideoToProduct mutation, delegating the actual work to the ProductVideoService.
Creating a job queue
Use npx vendure add to easily add a job queue to a service.
The JobQueueService creates and manages job queues. The queue is created when the
application starts up (see NestJS lifecycle events), and then we can use the add() method to add jobs to the queue.
Notice the generic type parameter of the JobQueue:
This means that when we call jobQueue.add() we must pass in an object of this type. This data will then be available in the process function as the job.data property.
The data passed to jobQueue.add() must be JSON-serializable, because it gets serialized into a string when stored in the job queue. Therefore you should
avoid passing in complex objects such as Date instances, Buffers, etc.
The ProductVideoService is in charge of setting up the JobQueue and adding jobs to that queue. Calling
will add a transcoding job to the queue.
Plugin code typically gets executed on both the server and the worker. Therefore, you sometimes need to explicitly check what context you are in. This can be done with the ProcessContext provider.
Finally, the ProductVideoPlugin brings it all together, extending the GraphQL API, defining the required CustomField to store the transcoded video URL, and registering our service and resolver. The PluginCommonModule is imported as it exports the JobQueueService.
Passing the RequestContext
Sometimes you need to pass the RequestContext object to the process function of a job, since ctx is required by many Vendure
service methods that you may be using inside your process function. However, the RequestContext object itself is not serializable,
so it cannot be passed directly to the JobQueue.add() method. Instead, you can serialize the RequestContext using the ctx.serialize() method,
and then deserialize it in the process function using the static deserialize method.
Serializing the RequestContext should be done with caution, since it is a relatively large object and will significantly increase the size of the job data.
In cases where the job is created in large quantities (hundreds or thousands of jobs per day), this can lead to performance issues. Especially when using the BullMQJobQueuePlugin, which stores the job data in Redis, the size of the job data can lead to too much memory usage which can cause the Redis server to crash.
Instead of serializing the entire RequestContext, consider passing only the necessary data you need and then reconstructing the RequestContext in the process function:
Handling job cancellation
It is possible for an administrator to cancel a running job. Doing so will cause the configured job queue strategy to mark the job as cancelled, but
on its own this will not stop the job from running. This is because the job queue itself has no direct control over the process function once
it has been started.
It is up to the process function to check for cancellation and stop processing if the job has been cancelled. This can be done by checking the
job.state property, and if the job is cancelled, the process function can throw an error to indicate that the job was interrupted
by early cancellation:
Subscribing to job updates
When creating a new job via JobQueue.add(), it is possible to subscribe to updates to that Job (progress and status changes). This allows you, for example, to create resolvers which are able to return the results of a given Job.
In the video transcoding example above, we could modify the transcodeForProduct() call to look like this:
If you prefer to work with Promises rather than Rxjs Observables, you can also convert the updates to a promise:



