RabbitMQ
RabbitMQ est un courtier de messages open-source et léger qui prend en charge plusieurs protocoles de messagerie. Il peut être déployé dans des configurations distribuées et fédérées pour répondre aux exigences de grande échelle et de haute disponibilité. De plus, c’est le courtier de messages le plus largement déployé, utilisé dans le monde entier par des petites startups et de grandes entreprises.
Installation
Pour commencer à construire des microservices basés sur RabbitMQ, installez d’abord les packages requis :
$ npm i --save amqplib amqp-connection-manager
Overview
Pour utiliser le transporteur RabbitMQ, passez l’objet d’options suivant à la méthode createMicroservice()
:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, { transport: Transport.RMQ, options: { urls: ['amqp://localhost:5672'], queue: 'cats_queue', queueOptions: { durable: false }, },});
Options
La propriété options
est spécifique au transporteur choisi. Le transporteur RabbitMQ expose les propriétés décrites ci-dessous.
Propriété | Description |
---|---|
urls | URLs de connexion |
queue | Nom de la queue à laquelle votre serveur écoutera |
prefetchCount | Définit le nombre de préferais pour le canal |
isGlobalPrefetchCount | Active le préfetching par canal |
noAck | Si false , le mode d’accusation manuel est activé |
consumerTag | Identifiant du tag du consommateur (lisez plus ici) |
queueOptions | Options de queue supplémentaires (lisez plus ici) |
socketOptions | Options de socket supplémentaires (lisez plus ici) |
headers | En-têtes à envoyer avec chaque message |
Client
Comme d’autres transporteurs de microservices, vous avez plusieurs options pour créer une instance de ClientProxy
RabbitMQ.
Une méthode pour créer une instance consiste à utiliser le ClientsModule
. Pour créer une instance cliente avec le ClientsModule
, importez-le et utilisez la méthode register()
pour passer un objet d’options avec les mêmes propriétés que celles montrées ci-dessus dans la méthode createMicroservice()
, ainsi qu’une propriété name
à utiliser comme token d’injection. Lisez plus sur ClientsModule
ici.
@Module({ imports: [ ClientsModule.register([ { name: 'MATH_SERVICE', transport: Transport.RMQ, options: { urls: ['amqp://localhost:5672'], queue: 'cats_queue', queueOptions: { durable: false, }, }, }, ]), ],})
D’autres options pour créer un client (soit ClientProxyFactory
soit @Client()
) peuvent également être utilisées. Vous pouvez lire à leur sujet ici.
Context
Dans des scénarios plus sophistiqués, vous pouvez vouloir accéder à plus d’informations sur la requête entrante. Lors de l’utilisation du transporteur RabbitMQ, vous pouvez accéder à l’objet RmqContext
.
@MessagePattern('notifications')getNotifications( @Payload() data: number[], @Ctx() context: RmqContext,) { console.log(`Pattern: ${context.getPattern()}`);}
Pour accéder au message RabbitMQ d’origine (avec les properties
, fields
et content
), utilisez la méthode getMessage()
de l’objet RmqContext
, comme suit :
@MessagePattern('notifications')getNotifications( @Payload() data: number[], @Ctx() context: RmqContext,) { const originalMsg = context.getMessage(); console.log(originalMsg);}
Pour récupérer une référence au channel RabbitMQ, utilisez la méthode getChannelRef
de l’objet RmqContext
, comme suit :
@MessagePattern('notifications')getNotifications( @Payload() data: number[], @Ctx() context: RmqContext,) { const channel = context.getChannelRef(); console.log(channel);}
Message acknowledgement
Pour s’assurer qu’un message n’est jamais perdu, RabbitMQ prend en charge les accusés de réception de message. Un accusé de réception est envoyé par le consommateur pour indiquer à RabbitMQ qu’un message particulier a été reçu, traité et que RabbitMQ peut le supprimer. Si un consommateur meurt (son canal est fermé, la connexion est fermée ou la connexion TCP est perdue) sans envoyer un accusé, RabbitMQ comprendra qu’un message n’a pas été entièrement traité et le remettra en queue.
Pour activer le mode d’accusation manuelle, définissez la propriété noAck
sur false
:
options: { urls: ['amqp://localhost:5672'], queue: 'cats_queue', noAck: false, queueOptions: { durable: false, },}
Lorsqu’il est activé, nous devons envoyer un accusé approprié depuis le worker pour signaler que nous avons terminé une tâche.
@MessagePattern('notifications')getNotifications( @Payload() data: number[], @Ctx() context: RmqContext,) { const channel = context.getChannelRef(); const originalMsg = context.getMessage();
channel.ack(originalMsg);}
Record builders
Pour configurer les options de message, vous pouvez utiliser la classe RmqRecordBuilder
(note : cela est faisable pour des flux basés sur des événements également). Par exemple, pour définir les propriétés headers
et priority
, utilisez la méthode setOptions
, comme suit :
const message = ':cat:';const record = new RmqRecordBuilder(message) .setOptions({ headers: { ['x-version']: '1.0.0', }, priority: 3, }) .build();
this.client.send('replace-emoji', record).subscribe(...);
Et vous pouvez lire ces valeurs également côté serveur, en accédant à RmqContext
, comme suit :
@MessagePattern('replace-emoji')replaceEmoji( @Payload() data: string, @Ctx() context: RmqContext,): string { const { properties: { headers } } = context.getMessage(); return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';}