Passer au contenu

CQRS

Le flux d’applications simples de CRUD (Créer, Lire, Mettre à jour et Supprimer) peut être décrit comme suit :

  1. La couche des contrôleurs gère les requêtes HTTP et délègue des tâches à la couche des services.
  2. La couche des services est là où la plupart de la logique métier vit.
  3. Les services utilisent des dépôts / DAOs pour changer / persister des entités.
  4. Les entités agissent comme des conteneurs pour les valeurs, avec des setters et des getters.

Bien que ce modèle soit généralement suffisant pour des applications petites et moyennes, il peut ne pas être le meilleur choix pour des applications plus grandes et plus complexes. Dans de tels cas, le modèle CQRS (Command and Query Responsibility Segregation) peut être plus approprié et évolutif (selon les exigences de l’application). Les avantages de ce modèle incluent :

  • Séparation des préoccupations. Le modèle sépare les opérations de lecture et d’écriture en modèles distincts.
  • Scalabilité. Les opérations de lecture et d’écriture peuvent être évoluées indépendamment.
  • Flexibilité. Le modèle permet l’utilisation de différents magasins de données pour les opérations de lecture et d’écriture.
  • Performance. Le modèle permet l’utilisation de différents magasins de données optimisés pour les opérations de lecture et d’écriture.

Pour faciliter ce modèle, Nest fournit un léger module CQRS. Ce chapitre décrit comment l’utiliser.

Installation

Tout d’abord, installez le paquet requis :

Fenêtre de terminal
$ npm install --save @nestjs/cqrs

Commandes

Les commandes sont utilisées pour changer l’état de l’application. Elles doivent être basées sur des tâches, plutôt que centrées sur les données. Lorsqu’une commande est envoyée, elle est gérée par un Gestionnaire de Commandes correspondant. Le gestionnaire est responsable de la mise à jour de l’état de l’application.

heroes-game.service.ts

Service du jeu des héros
@Injectable()
export class HeroesGameService {
constructor(private commandBus: CommandBus) {}
async killDragon(heroId: string, killDragonDto: KillDragonDto) {
return this.commandBus.execute(
new KillDragonCommand(heroId, killDragonDto.dragonId),
);
}
}

Dans l’extrait de code ci-dessus, nous instancions la classe KillDragonCommand et la passons à la méthode execute() du CommandBus. Voici la classe de commande démontrée :

kill-dragon.command.ts

Commande pour tuer le dragon
export class KillDragonCommand {
constructor(
public readonly heroId: string,
public readonly dragonId: string,
) {}
}

Le CommandBus représente un flux de commandes. Il est responsable de l’envoi des commandes aux gestionnaires appropriés. La méthode execute() retourne une promesse, qui se résout à la valeur renvoyée par le gestionnaire.

Créons maintenant un gestionnaire pour la commande KillDragonCommand.

kill-dragon.handler.ts

Gestionnaire de la commande tuer le dragon
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
constructor(private repository: HeroRepository) {}
async execute(command: KillDragonCommand) {
const { heroId, dragonId } = command;
const hero = await this.repository.findOneById(+heroId);
hero.killEnemy(dragonId);
await this.repository.persist(hero);
}
}

Ce gestionnaire récupère l’entité Hero du dépôt, appelle la méthode killEnemy(), puis persiste les modifications. La classe KillDragonHandler implémente l’interface ICommandHandler, qui nécessite l’implémentation de la méthode execute(). La méthode execute() reçoit l’objet de commande en argument.

Requêtes

Les requêtes sont utilisées pour récupérer des données de l’état de l’application. Elles doivent être centrées sur les données, plutôt que basées sur des tâches. Lorsqu’une requête est envoyée, elle est gérée par un Gestionnaire de Requêtes correspondant. Le gestionnaire est responsable de la récupération des données.

Le QueryBus suit le même modèle que le CommandBus. Les gestionnaires de requêtes doivent implémenter l’interface IQueryHandler et être annotés avec le décorateur @QueryHandler().

Événements

Les événements sont utilisés pour notifier d’autres parties de l’application des changements d’état. Ils sont envoyés par des modèles ou directement en utilisant le EventBus. Lorsqu’un événement est envoyé, il est géré par des Gestionnaires d’Événements correspondants. Les gestionnaires peuvent alors, par exemple, mettre à jour le modèle de lecture.

Pour des fins de démonstration, créons une classe d’événement :

hero-killed-dragon.event.ts

Événement héros a tué un dragon
export class HeroKilledDragonEvent {
constructor(
public readonly heroId: string,
public readonly dragonId: string,
) {}
}

Maintenant, bien que les événements puissent être envoyés directement en utilisant la méthode EventBus.publish(), nous pouvons également les envoyer depuis le modèle. Mettons à jour le modèle Hero pour envoyer l’événement HeroKilledDragonEvent lorsque la méthode killEnemy() est appelée.

hero.model.ts

Modèle héro
export class Hero extends AggregateRoot {
constructor(private id: string) {
super();
this.autoCommit = true;
}
killEnemy(enemyId: string) {
// Logique métier
this.apply(new HeroKilledDragonEvent(this.id, enemyId));
}
}

La méthode apply() est utilisée pour envoyer des événements. Elle accepte un objet d’événement comme argument. Cependant, comme notre modèle n’est pas conscient du EventBus, nous devons le lier au modèle. Nous pouvons le faire en utilisant la classe EventPublisher.

kill-dragon.handler.ts

Gestionnaire de la commande tuer le dragon amélioré
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
constructor(
private repository: HeroRepository,
private publisher: EventPublisher,
) {}
async execute(command: KillDragonCommand) {
const { heroId, dragonId } = command;
const hero = this.publisher.mergeObjectContext(
await this.repository.findOneById(+heroId),
);
hero.killEnemy(dragonId);
hero.commit();
}
}

Le EventPublisher#mergeObjectContext fusionne le publish d’événement dans l’objet fourni, ce qui signifie que l’objet pourra maintenant envoyer des événements au flux d’événements.

Notez que dans cet exemple, nous appelons également la méthode commit() sur le modèle. Cette méthode est utilisée pour envoyer les événements en attente. Pour envoyer automatiquement les événements, nous pouvons définir la propriété autoCommit à true.

Dans le cas où nous voulons fusionner le publish d’événements dans un objet non existant, mais plutôt dans une classe, nous pouvons utiliser la méthode EventPublisher#mergeClassContext.

Exemple de gestionnaire d’événements

Gestionnaire d'événements héro a tué un dragon
@EventsHandler(HeroKilledDragonEvent)
export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> {
constructor(private repository: HeroRepository) {}
handle(event: HeroKilledDragonEvent) {
// Logique métier
}
}

Chaque événement peut avoir plusieurs Gestionnaires d’Événements.

Sagas

Une saga est un processus longue durée qui écoute des événements et peut déclencher de nouvelles commandes. Elle est généralement utilisée pour gérer des flux de travail complexes dans l’application. Par exemple, lorsqu’un utilisateur s’inscrit, une saga peut écouter l’UserRegisteredEvent et envoyer un e-mail de bienvenue à l’utilisateur.

Les sagas sont une fonctionnalité extrêmement puissante. Une seule saga peut écouter 1..* événements. En utilisant la bibliothèque RxJS, nous pouvons filtrer, mapper, forker et fusionner les flux d’événements pour créer des flux de travail sophistiqués. Chaque saga retourne un Observable qui produit une instance de commande. Cette commande est ensuite envoyée de manière asynchrone par le CommandBus.

Créons une saga qui écoute l’HeroKilledDragonEvent et envoie la commande DropAncientItemCommand.

heroes-game.saga.ts

Saga du jeu des héros
@Injectable()
export class HeroesGameSagas {
@Saga()
dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(HeroKilledDragonEvent),
map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)),
);
};
}

Le décorateur @Saga() marque la méthode comme une saga. L’argument events$ est un flux Observable de tous les événements. L’opérateur ofType filtre le flux par le type d’événement spécifié. L’opérateur map mappe l’événement à une nouvelle instance de commande.

Dans cet exemple, nous mappons l’HeroKilledDragonEvent à la commande DropAncientItemCommand. La commande DropAncientItemCommand est ensuite automatiquement envoyée par le CommandBus.

Configuration

Pour conclure, nous devons enregistrer tous les gestionnaires de commandes, gestionnaires d’événements et sagas dans le HeroesGameModule :

heroes-game.module.ts

Module du jeu des héros
export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler];
export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler];
@Module({
imports: [CqrsModule],
controllers: [HeroesGameController],
providers: [
HeroesGameService,
HeroesGameSagas,
...CommandHandlers,
...EventHandlers,
HeroRepository,
],
})
export class HeroesGameModule {}

Exceptions non gérées

Les gestionnaires d’événements sont exécutés de manière asynchrone. Cela signifie qu’ils doivent toujours gérer toutes les exceptions pour empêcher l’application de se retrouver dans un état incohérent. Cependant, si une exception n’est pas gérée, le EventBus créera l’objet UnhandledExceptionInfo et le poussera dans le flux UnhandledExceptionBus. Ce flux est un Observable qui peut être utilisé pour traiter les exceptions non gérées.

Traitement des exceptions non gérées

Gestion des exceptions non gérées
private destroy$ = new Subject<void>();
constructor(private unhandledExceptionsBus: UnhandledExceptionBus) {
this.unhandledExceptionsBus
.pipe(takeUntil(this.destroy$))
.subscribe((exceptionInfo) => {
// Gérer l'exception ici
// e.g. envoyez-le à un service externe, terminez le processus, ou publiez un nouvel événement
});
}
onModuleDestroy() {
this.destroy$.next();
this.destroy$.complete();
}

Pour filtrer les exceptions, nous pouvons utiliser l’opérateur ofType, comme suit :

Filtre des exceptions
this.unhandledExceptionsBus
.pipe(
takeUntil(this.destroy$),
UnhandledExceptionBus.ofType(TransactionNotAllowedException),
)
.subscribe((exceptionInfo) => {
// Gérer l'exception ici
});

TransactionNotAllowedException est l’exception que nous voulons filtrer.

L’objet UnhandledExceptionInfo contient les propriétés suivantes :

Interface d'informations sur les exceptions non gérées
export interface UnhandledExceptionInfo<Cause = IEvent | ICommand, Exception = any> {
/**
* L'exception qui a été lancée.
*/
exception: Exception;
/**
* La cause de l'exception (référence d'événement ou de commande).
*/
cause: Cause;
}

Souscription à tous les événements

Le CommandBus, QueryBus et EventBus sont tous des Observables. Cela signifie que nous pouvons nous abonner à l’ensemble du flux et, par exemple, traiter tous les événements. Par exemple, nous pouvons enregistrer tous les événements dans la console, ou les sauvegarder dans le magasin d’événements.

Souscription aux événements
private destroy$ = new Subject<void>();
constructor(private eventBus: EventBus) {
this.eventBus
.pipe(takeUntil(this.destroy$))
.subscribe((event) => {
// Sauvegarder les événements dans la base de données
});
}
onModuleDestroy() {
this.destroy$.next();
this.destroy$.complete();
}

Exemple

Un exemple fonctionnel est disponible ici.