Passer au contenu

Kafka

Kafka est une plateforme de streaming distribuée open source qui possède trois capacités clés :

  • Publier et s’abonner aux flux d’enregistrements, similaire à une file de messages ou un système de messagerie d’entreprise.
  • Stocker des flux d’enregistrements de manière durable et tolérante aux pannes.
  • Traiter les flux d’enregistrements au fur et à mesure de leur arrivée.

Le projet Kafka vise à fournir une plateforme unifiée, à haut débit et à faible latence pour traiter des flux de données en temps réel. Il s’intègre très bien avec Apache Storm et Spark pour l’analyse des données de streaming en temps réel.

Installation

Pour commencer à créer des microservices basés sur Kafka, installez d’abord le package requis :

Fenêtre de terminal
$ npm i --save kafkajs

Vue d’ensemble

Comme d’autres implémentations de couches de transport de microservices de Nest, vous sélectionnez le mécanisme de transport Kafka à l’aide de la propriété transport de l’objet options passé à la méthode createMicroservice(), ainsi qu’une propriété options optionnelle, comme montré ci-dessous :

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
});

Options

La propriété options est spécifique au transporteur choisi. Le transporteur Kafka expose les propriétés décrites ci-dessous.

clientOptions de configuration du client (en savoir plus ici)
consumerOptions de configuration du consommateur (en savoir plus ici)
runOptions de configuration d’exécution (en savoir plus ici)
subscribeOptions de configuration d’abonnement (en savoir plus ici)
producerOptions de configuration du producteur (en savoir plus ici)
sendOptions de configuration d’envoi (en savoir plus ici)
producerOnlyModeDrapeau de fonction pour ignorer l’enregistrement du groupe de consommateurs et agir uniquement en tant que producteur (boolean)
postfixIdChanger le suffixe de la valeur clientId (string)

Client

Il y a une petite différence dans Kafka par rapport à d’autres transporteurs de microservices. Au lieu de la classe ClientProxy, nous utilisons la classe ClientKafka.

Comme d’autres transporteurs de microservices, vous avez plusieurs options pour créer une instance de ClientKafka.

Une méthode pour créer une instance est d’utiliser le ClientsModule. Pour créer une instance de client avec le ClientsModule, importez-le et utilisez la méthode register() pour passer un objet options avec les mêmes propriétés que celles montrées ci-dessus dans la méthode createMicroservice(), ainsi qu’une propriété name à utiliser comme jeton d’injection. En savoir plus sur le ClientsModule ici.

heroes.controller.ts
@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
},
},
]),
],
})

Les messages sortants peuvent également être clés en passant un objet avec les propriétés key et value. Clés des messages est important pour respecter l’ exigence de co-partition.

heroes.controller.ts
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const dragonId = message.dragonId;
const items = [
{ id: 1, name: 'Épée mythique' },
{ id: 2, name: 'Clé du donjon' },
];
return items;
}
}
heroes.controller.ts
@Controller()
export class HeroesController {
@Client()
client: ClientKafka;
@MessagePattern('hero.kill.dragon')
async killDragon(
@Payload() message: KillDragonMessage,
@Ctx() context: KafkaContext,
) {
const originalMessage = context.getMessage();
const partition = context.getPartition();
const { headers, timestamp } = originalMessage;
}
}

Exceptions rétrouvables

Chaque cas où vous pouvez vouloir contourner cette mécanique et laisser les exceptions être consommées par le driver kafkajs. Lancer une exception lors du traitement d’un message demande à kafkajs de réessayer (redélivrer), ce qui signifie que même si le gestionnaire de messages (ou d’événements) a été déclenché, l’offset ne sera pas engagé dans Kafka.

Voici comment utiliser une classe dédiée appelée KafkaRetriableException :

throw new KafkaRetriableException('...');

Engagement des offsets

L’engagement des offsets est essentiel lors du travail avec Kafka. Par défaut, les messages seront automatiquement engagés après un certain temps. Pour plus d’informations, visitez la documentation de KafkaJS.