CQRS
Le flux d’applications simples de CRUD (Créer, Lire, Mettre à jour et Supprimer) peut être décrit comme suit :
- La couche des contrôleurs gère les requêtes HTTP et délègue des tâches à la couche des services.
- La couche des services est là où la plupart de la logique métier vit.
- Les services utilisent des dépôts / DAOs pour changer / persister des entités.
- Les entités agissent comme des conteneurs pour les valeurs, avec des setters et des getters.
Bien que ce modèle soit généralement suffisant pour des applications petites et moyennes, il peut ne pas être le meilleur choix pour des applications plus grandes et plus complexes. Dans de tels cas, le modèle CQRS (Command and Query Responsibility Segregation) peut être plus approprié et évolutif (selon les exigences de l’application). Les avantages de ce modèle incluent :
- Séparation des préoccupations. Le modèle sépare les opérations de lecture et d’écriture en modèles distincts.
- Scalabilité. Les opérations de lecture et d’écriture peuvent être évoluées indépendamment.
- Flexibilité. Le modèle permet l’utilisation de différents magasins de données pour les opérations de lecture et d’écriture.
- Performance. Le modèle permet l’utilisation de différents magasins de données optimisés pour les opérations de lecture et d’écriture.
Pour faciliter ce modèle, Nest fournit un léger module CQRS. Ce chapitre décrit comment l’utiliser.
Installation
Tout d’abord, installez le paquet requis :
$ npm install --save @nestjs/cqrs
Commandes
Les commandes sont utilisées pour changer l’état de l’application. Elles doivent être basées sur des tâches, plutôt que centrées sur les données. Lorsqu’une commande est envoyée, elle est gérée par un Gestionnaire de Commandes correspondant. Le gestionnaire est responsable de la mise à jour de l’état de l’application.
heroes-game.service.ts
@Injectable()export class HeroesGameService { constructor(private commandBus: CommandBus) {}
async killDragon(heroId: string, killDragonDto: KillDragonDto) { return this.commandBus.execute( new KillDragonCommand(heroId, killDragonDto.dragonId), ); }}
Dans l’extrait de code ci-dessus, nous instancions la classe KillDragonCommand
et la passons à la méthode execute()
du CommandBus
. Voici la classe de commande démontrée :
kill-dragon.command.ts
export class KillDragonCommand { constructor( public readonly heroId: string, public readonly dragonId: string, ) {}}
Le CommandBus
représente un flux de commandes. Il est responsable de l’envoi des commandes aux gestionnaires appropriés. La méthode execute()
retourne une promesse, qui se résout à la valeur renvoyée par le gestionnaire.
Créons maintenant un gestionnaire pour la commande KillDragonCommand
.
kill-dragon.handler.ts
@CommandHandler(KillDragonCommand)export class KillDragonHandler implements ICommandHandler<KillDragonCommand> { constructor(private repository: HeroRepository) {}
async execute(command: KillDragonCommand) { const { heroId, dragonId } = command; const hero = await this.repository.findOneById(+heroId);
hero.killEnemy(dragonId); await this.repository.persist(hero); }}
Ce gestionnaire récupère l’entité Hero
du dépôt, appelle la méthode killEnemy()
, puis persiste les modifications. La classe KillDragonHandler
implémente l’interface ICommandHandler
, qui nécessite l’implémentation de la méthode execute()
. La méthode execute()
reçoit l’objet de commande en argument.
Requêtes
Les requêtes sont utilisées pour récupérer des données de l’état de l’application. Elles doivent être centrées sur les données, plutôt que basées sur des tâches. Lorsqu’une requête est envoyée, elle est gérée par un Gestionnaire de Requêtes correspondant. Le gestionnaire est responsable de la récupération des données.
Le QueryBus
suit le même modèle que le CommandBus
. Les gestionnaires de requêtes doivent implémenter l’interface IQueryHandler
et être annotés avec le décorateur @QueryHandler()
.
Événements
Les événements sont utilisés pour notifier d’autres parties de l’application des changements d’état. Ils sont envoyés par des modèles ou directement en utilisant le EventBus
. Lorsqu’un événement est envoyé, il est géré par des Gestionnaires d’Événements correspondants. Les gestionnaires peuvent alors, par exemple, mettre à jour le modèle de lecture.
Pour des fins de démonstration, créons une classe d’événement :
hero-killed-dragon.event.ts
export class HeroKilledDragonEvent { constructor( public readonly heroId: string, public readonly dragonId: string, ) {}}
Maintenant, bien que les événements puissent être envoyés directement en utilisant la méthode EventBus.publish()
, nous pouvons également les envoyer depuis le modèle. Mettons à jour le modèle Hero
pour envoyer l’événement HeroKilledDragonEvent
lorsque la méthode killEnemy()
est appelée.
hero.model.ts
export class Hero extends AggregateRoot { constructor(private id: string) { super(); this.autoCommit = true; }
killEnemy(enemyId: string) { // Logique métier this.apply(new HeroKilledDragonEvent(this.id, enemyId)); }}
La méthode apply()
est utilisée pour envoyer des événements. Elle accepte un objet d’événement comme argument. Cependant, comme notre modèle n’est pas conscient du EventBus
, nous devons le lier au modèle. Nous pouvons le faire en utilisant la classe EventPublisher
.
kill-dragon.handler.ts
@CommandHandler(KillDragonCommand)export class KillDragonHandler implements ICommandHandler<KillDragonCommand> { constructor( private repository: HeroRepository, private publisher: EventPublisher, ) {}
async execute(command: KillDragonCommand) { const { heroId, dragonId } = command; const hero = this.publisher.mergeObjectContext( await this.repository.findOneById(+heroId), ); hero.killEnemy(dragonId); hero.commit(); }}
Le EventPublisher#mergeObjectContext
fusionne le publish
d’événement dans l’objet fourni, ce qui signifie que l’objet pourra maintenant envoyer des événements au flux d’événements.
Notez que dans cet exemple, nous appelons également la méthode commit()
sur le modèle. Cette méthode est utilisée pour envoyer les événements en attente. Pour envoyer automatiquement les événements, nous pouvons définir la propriété autoCommit
à true
.
Dans le cas où nous voulons fusionner le publish
d’événements dans un objet non existant, mais plutôt dans une classe, nous pouvons utiliser la méthode EventPublisher#mergeClassContext
.
Exemple de gestionnaire d’événements
@EventsHandler(HeroKilledDragonEvent)export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> { constructor(private repository: HeroRepository) {}
handle(event: HeroKilledDragonEvent) { // Logique métier }}
Chaque événement peut avoir plusieurs Gestionnaires d’Événements.
Sagas
Une saga est un processus longue durée qui écoute des événements et peut déclencher de nouvelles commandes. Elle est généralement utilisée pour gérer des flux de travail complexes dans l’application. Par exemple, lorsqu’un utilisateur s’inscrit, une saga peut écouter l’UserRegisteredEvent
et envoyer un e-mail de bienvenue à l’utilisateur.
Les sagas sont une fonctionnalité extrêmement puissante. Une seule saga peut écouter 1..* événements. En utilisant la bibliothèque RxJS, nous pouvons filtrer, mapper, forker et fusionner les flux d’événements pour créer des flux de travail sophistiqués. Chaque saga retourne un Observable qui produit une instance de commande. Cette commande est ensuite envoyée de manière asynchrone par le CommandBus
.
Créons une saga qui écoute l’HeroKilledDragonEvent
et envoie la commande DropAncientItemCommand
.
heroes-game.saga.ts
@Injectable()export class HeroesGameSagas { @Saga() dragonKilled = (events$: Observable<any>): Observable<ICommand> => { return events$.pipe( ofType(HeroKilledDragonEvent), map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)), ); };}
Le décorateur @Saga()
marque la méthode comme une saga. L’argument events$
est un flux Observable de tous les événements. L’opérateur ofType
filtre le flux par le type d’événement spécifié. L’opérateur map
mappe l’événement à une nouvelle instance de commande.
Dans cet exemple, nous mappons l’HeroKilledDragonEvent
à la commande DropAncientItemCommand
. La commande DropAncientItemCommand
est ensuite automatiquement envoyée par le CommandBus
.
Configuration
Pour conclure, nous devons enregistrer tous les gestionnaires de commandes, gestionnaires d’événements et sagas dans le HeroesGameModule
:
heroes-game.module.ts
export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler];export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler];
@Module({ imports: [CqrsModule], controllers: [HeroesGameController], providers: [ HeroesGameService, HeroesGameSagas, ...CommandHandlers, ...EventHandlers, HeroRepository, ],})export class HeroesGameModule {}
Exceptions non gérées
Les gestionnaires d’événements sont exécutés de manière asynchrone. Cela signifie qu’ils doivent toujours gérer toutes les exceptions pour empêcher l’application de se retrouver dans un état incohérent. Cependant, si une exception n’est pas gérée, le EventBus
créera l’objet UnhandledExceptionInfo
et le poussera dans le flux UnhandledExceptionBus
. Ce flux est un Observable
qui peut être utilisé pour traiter les exceptions non gérées.
Traitement des exceptions non gérées
private destroy$ = new Subject<void>();
constructor(private unhandledExceptionsBus: UnhandledExceptionBus) { this.unhandledExceptionsBus .pipe(takeUntil(this.destroy$)) .subscribe((exceptionInfo) => { // Gérer l'exception ici // e.g. envoyez-le à un service externe, terminez le processus, ou publiez un nouvel événement });}
onModuleDestroy() { this.destroy$.next(); this.destroy$.complete();}
Pour filtrer les exceptions, nous pouvons utiliser l’opérateur ofType
, comme suit :
this.unhandledExceptionsBus .pipe( takeUntil(this.destroy$), UnhandledExceptionBus.ofType(TransactionNotAllowedException), ) .subscribe((exceptionInfo) => { // Gérer l'exception ici });
Où TransactionNotAllowedException
est l’exception que nous voulons filtrer.
L’objet UnhandledExceptionInfo
contient les propriétés suivantes :
export interface UnhandledExceptionInfo<Cause = IEvent | ICommand, Exception = any> { /** * L'exception qui a été lancée. */ exception: Exception; /** * La cause de l'exception (référence d'événement ou de commande). */ cause: Cause;}
Souscription à tous les événements
Le CommandBus
, QueryBus
et EventBus
sont tous des Observables. Cela signifie que nous pouvons nous abonner à l’ensemble du flux et, par exemple, traiter tous les événements. Par exemple, nous pouvons enregistrer tous les événements dans la console, ou les sauvegarder dans le magasin d’événements.
private destroy$ = new Subject<void>();
constructor(private eventBus: EventBus) { this.eventBus .pipe(takeUntil(this.destroy$)) .subscribe((event) => { // Sauvegarder les événements dans la base de données });}
onModuleDestroy() { this.destroy$.next(); this.destroy$.complete();}
Exemple
Un exemple fonctionnel est disponible ici.