Queues
Les queues sont un modèle de conception puissant qui vous aide à faire face aux défis de scalabilité et de performance courants dans les applications. Voici quelques exemples de problèmes que les queues peuvent vous aider à résoudre :
- Lisser les pics de traitement. Par exemple, si les utilisateurs peuvent initier des tâches gourmandes en ressources à des moments arbitraires, vous pouvez ajouter ces tâches à une queue au lieu de les exécuter de manière synchrone. Vous pouvez ensuite avoir des processus de travail qui extraient des tâches de la queue de manière contrôlée. Vous pouvez facilement ajouter de nouveaux consommateurs de queue pour augmenter la gestion des tâches en arrière-plan à mesure que l’application se développe.
- Décomposer des tâches monolithiques qui pourraient autrement bloquer la boucle d’événements Node.js. Par exemple, si une requête utilisateur nécessite un travail intensif en CPU comme la transcodage audio, vous pouvez déléguer cette tâche à d’autres processus, libérant ainsi des processus face à l’utilisateur pour rester réactifs.
- Fournir un canal de communication fiable entre divers services. Par exemple, vous pouvez mettre en queue des tâches (jobs) dans un processus ou un service, et les consommer dans un autre. Vous pouvez être notifié (en écoutant les événements de statut) lors de l’achèvement, d’un erreur ou d’autres changements d’état dans le cycle de vie du job depuis n’importe quel processus ou service. Lorsque les producteurs ou consommateurs de queue échouent, leur état est préservé et le traitement des tâches peut redémarrer automatiquement lorsque les nœuds sont redémarrés.
Nest fournit le paquet @nestjs/bullmq
pour l’intégration BullMQ et le paquet @nestjs/bull
pour l’intégration Bull. Les deux paquets sont des abstractions/enveloppes au-dessus de leurs bibliothèques respectives, qui ont été développées par la même équipe. Bull est actuellement en mode maintenance, l’équipe se concentrant sur la correction des bogues, tandis que BullMQ est activement développé, avec une mise en œuvre moderne en TypeScript et un ensemble de fonctionnalités différentes. Si Bull répond à vos exigences, il reste un choix fiable et éprouvé. Les paquets Nest facilitent l’intégration de BullMQ ou de Bull dans votre application Nest de manière conviviale.
Les deux, BullMQ et Bull, utilisent Redis pour persister les données des jobs, donc vous aurez besoin d’avoir Redis installé sur votre système. Comme ils sont soutenus par Redis, votre architecture de queue peut être totalement distribuée et indépendante de la plateforme. Par exemple, vous pouvez avoir des producteurs, consommateurs et écouteurs de queue exécutés dans Nest sur un (ou plusieurs) nœuds, et d’autres producteurs, consommateurs et écouteurs s’exécutant sur d’autres plateformes Node.js sur d’autres nœuds réseau.
Ce chapitre couvre les paquets @nestjs/bullmq
et @nestjs/bull
. Nous vous recommandons également de lire la documentation BullMQ et la documentation Bull pour plus de contexte et des détails spécifiques sur l’implémentation.
Installation de BullMQ
Pour commencer à utiliser BullMQ, nous installons d’abord les dépendances requises.
$ npm install --save @nestjs/bullmq bullmq
Une fois le processus d’installation terminé, nous pouvons importer le BullModule
dans le module racine AppModule
.
import { Module } from '@nestjs/common';import { BullModule } from '@nestjs/bullmq';
@Module({ imports: [ BullModule.forRoot({ connection: { host: 'localhost', port: 6379, }, }), ],})export class AppModule {}
La méthode forRoot()
est utilisée pour enregistrer un objet de configuration du paquet bullmq
qui sera utilisé par toutes les queues enregistrées dans l’application (à moins d’indications contraires). Pour votre référence, voici quelques-unes des propriétés d’un objet de configuration :
connection: ConnectionOptions
- Options pour configurer la connexion Redis. Voir Connections pour plus d’informations. Facultatif.prefix: string
- Préfixe pour toutes les clés de queue. Facultatif.defaultJobOptions: JobOpts
- Options pour contrôler les paramètres par défaut des nouveaux jobs. Voir JobOpts pour plus d’informations. Facultatif.settings: AdvancedSettings
- Paramètres de configuration avancés de la queue. Ceux-ci ne devraient généralement pas être modifiés. Voir AdvancedSettings pour plus d’informations. Facultatif.
Toutes les options sont facultatives, fournissant un contrôle détaillé sur le comportement de la queue. Celles-ci sont passées directement au constructeur Queue
de BullMQ. Lisez plus sur ces options et d’autres options ici.
Pour enregistrer une queue, importez le module dynamique BullModule.registerQueue()
, comme suit :
BullModule.registerQueue({ name: 'audio',});
La méthode registerQueue()
est utilisée pour instancier et/ou enregistrer des queues. Les queues sont partagées entre les modules et les processus qui se connectent à la même base de données Redis sous-jacente avec les mêmes identifiants. Chaque queue est unique par sa propriété nom. Un nom de queue est utilisé à la fois comme un jeton d’injection (pour injecter la queue dans les contrôleurs/fournisseurs), et comme argument pour les décorateurs d’associer des classes consommateurs et des écouteurs aux queues.
Vous pouvez également remplacer certaines des options pré-configurées pour une queue spécifique, comme suit :
BullModule.registerQueue({ name: 'audio', connection: { port: 6380, },});
BullMQ prend également en charge les relations parent-enfant entre les jobs. Cette fonctionnalité permet la création de flux où les jobs sont les nœuds d’arbres de profondeur arbitraire. Pour en savoir plus à leur sujet, consultez ici.
Pour ajouter un flux, vous pouvez faire ce qui suit :
BullModule.registerFlowProducer({ name: 'flowProducerName',});
Étant donné que les jobs sont persistés dans Redis, chaque fois qu’une queue nommée spécifique est instanciée (par exemple, lorsqu’une application est démarrée/redémarrée), elle tente de traiter tous les anciens jobs qui peuvent exister d’une session précédente non terminée.
Chaque queue peut avoir un ou plusieurs producteurs, consommateurs et écouteurs. Les consommateurs récupèrent des jobs dans la queue dans un ordre spécifique : FIFO (le défaut), LIFO, ou selon les priorités. Le contrôle de l’ordre de traitement des queues est discuté ici.
Configuration asynchrone
Vous souhaiterez peut-être passer les options de bullmq
de manière asynchrone plutôt que statique. Dans ce cas, utilisez la méthode forRootAsync()
qui fournit plusieurs moyens de gérer la configuration asynchrone. De même, si vous souhaitez passer les options de queue de manière asynchrone, utilisez la méthode registerQueueAsync()
.
Une approche consiste à utiliser une fonction d’usine :
BullModule.forRootAsync({ useFactory: () => ({ connection: { host: 'localhost', port: 6379, }, }),});
Notre usine se comporte comme tout autre fournisseur asynchrone (par exemple, elle peut être async
et elle est capable d’injecter des dépendances via inject
).
Alternativez-vous avec la syntaxe useClass
:
BullModule.forRootAsync({ useClass: BullConfigService,});