Passer au contenu

RabbitMQ

RabbitMQ est un courtier de messages open-source et léger qui prend en charge plusieurs protocoles de messagerie. Il peut être déployé dans des configurations distribuées et fédérées pour répondre aux exigences de grande échelle et de haute disponibilité. De plus, c’est le courtier de messages le plus largement déployé, utilisé dans le monde entier par des petites startups et de grandes entreprises.

Installation

Pour commencer à construire des microservices basés sur RabbitMQ, installez d’abord les packages requis :

Fenêtre de terminal
$ npm i --save amqplib amqp-connection-manager

Overview

Pour utiliser le transporteur RabbitMQ, passez l’objet d’options suivant à la méthode createMicroservice() :

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
});

Options

La propriété options est spécifique au transporteur choisi. Le transporteur RabbitMQ expose les propriétés décrites ci-dessous.

PropriétéDescription
urlsURLs de connexion
queueNom de la queue à laquelle votre serveur écoutera
prefetchCountDéfinit le nombre de préferais pour le canal
isGlobalPrefetchCountActive le préfetching par canal
noAckSi false, le mode d’accusation manuel est activé
consumerTagIdentifiant du tag du consommateur (lisez plus ici)
queueOptionsOptions de queue supplémentaires (lisez plus ici)
socketOptionsOptions de socket supplémentaires (lisez plus ici)
headersEn-têtes à envoyer avec chaque message

Client

Comme d’autres transporteurs de microservices, vous avez plusieurs options pour créer une instance de ClientProxy RabbitMQ.

Une méthode pour créer une instance consiste à utiliser le ClientsModule. Pour créer une instance cliente avec le ClientsModule, importez-le et utilisez la méthode register() pour passer un objet d’options avec les mêmes propriétés que celles montrées ci-dessus dans la méthode createMicroservice(), ainsi qu’une propriété name à utiliser comme token d’injection. Lisez plus sur ClientsModule ici.

ClientsModule
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false,
},
},
},
]),
],
})

D’autres options pour créer un client (soit ClientProxyFactory soit @Client()) peuvent également être utilisées. Vous pouvez lire à leur sujet ici.

Context

Dans des scénarios plus sophistiqués, vous pouvez vouloir accéder à plus d’informations sur la requête entrante. Lors de l’utilisation du transporteur RabbitMQ, vous pouvez accéder à l’objet RmqContext.

RmqContext
@MessagePattern('notifications')
getNotifications(
@Payload() data: number[],
@Ctx() context: RmqContext,
) {
console.log(`Pattern: ${context.getPattern()}`);
}

Pour accéder au message RabbitMQ d’origine (avec les properties, fields et content), utilisez la méthode getMessage() de l’objet RmqContext, comme suit :

RmqContext
@MessagePattern('notifications')
getNotifications(
@Payload() data: number[],
@Ctx() context: RmqContext,
) {
const originalMsg = context.getMessage();
console.log(originalMsg);
}

Pour récupérer une référence au channel RabbitMQ, utilisez la méthode getChannelRef de l’objet RmqContext, comme suit :

RmqContext
@MessagePattern('notifications')
getNotifications(
@Payload() data: number[],
@Ctx() context: RmqContext,
) {
const channel = context.getChannelRef();
console.log(channel);
}

Message acknowledgement

Pour s’assurer qu’un message n’est jamais perdu, RabbitMQ prend en charge les accusés de réception de message. Un accusé de réception est envoyé par le consommateur pour indiquer à RabbitMQ qu’un message particulier a été reçu, traité et que RabbitMQ peut le supprimer. Si un consommateur meurt (son canal est fermé, la connexion est fermée ou la connexion TCP est perdue) sans envoyer un accusé, RabbitMQ comprendra qu’un message n’a pas été entièrement traité et le remettra en queue.

Pour activer le mode d’accusation manuelle, définissez la propriété noAck sur false :

Options
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: false,
queueOptions: {
durable: false,
},
}

Lorsqu’il est activé, nous devons envoyer un accusé approprié depuis le worker pour signaler que nous avons terminé une tâche.

Ack
@MessagePattern('notifications')
getNotifications(
@Payload() data: number[],
@Ctx() context: RmqContext,
) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}

Record builders

Pour configurer les options de message, vous pouvez utiliser la classe RmqRecordBuilder (note : cela est faisable pour des flux basés sur des événements également). Par exemple, pour définir les propriétés headers et priority, utilisez la méthode setOptions, comme suit :

RecordBuilder
const message = ':cat:';
const record = new RmqRecordBuilder(message)
.setOptions({
headers: {
['x-version']: '1.0.0',
},
priority: 3,
})
.build();
this.client.send('replace-emoji', record).subscribe(...);

Et vous pouvez lire ces valeurs également côté serveur, en accédant à RmqContext, comme suit :

RmqContext
@MessagePattern('replace-emoji')
replaceEmoji(
@Payload() data: string,
@Ctx() context: RmqContext,
): string {
const { properties: { headers } } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}