Getting started

Self-Hosting

Extending

Rest APIs
GraphQL APIs

Contributing

User Guide

Message Queue

Message Queue

Message Queue
In this article

Queues facilitate async operations to be performed. They can be used for performing background tasks such as sending a welcome email on register. Each use case will have its own queue class extended from MessageQueueServiceBase.

Currently, queue supports two drivers which can be configured by env variable MESSAGE_QUEUE_TYPE.

  1. bull-mq: this is the default driver, which uses bull-mq under the hood.
  2. pg-boss: this uses pg-boss under the hood.

Steps to create and use a new queue

  1. Add a queue name for your new queue under enum MESSAGE_QUEUES.
  2. Provide the factory implementation of the queue with the queue name as the dependency token.
  3. Inject the queue that you created in the required module/service with the queue name as the dependency token.
  4. Add worker class with token based injection just like producer.

Example usage

class Resolver {
  constructor(@Inject(MESSAGE_QUEUES.custom) private queue: MessageQueueService) {}

  async onSomeAction() {
    //business logic
    await this.queue.add(someData);
  }
}

//async worker
class CustomWorker {
  constructor(@Inject(MESSAGE_QUEUES.custom) private queue: MessageQueueService) {
    this.initWorker();
  }

  async initWorker() {
    await this.queue.work(async ({ id, data }) => {
      //worker logic
    });
  }
}

Noticed something to change?

As an open-source company, we welcome contributions through Github. Help us keep it up-to-date, accurate, and easy to understand by getting involved and sharing your ideas!

twenty github image
The #1 Open Source CRM
©2025 Twenty PBC