Comments (17)
The problem with this is that you can shutdown the application with async events still being handled, and hence not completing the execution of the handler.
This is happening to me while using nestjs from the command line, my script closes before all the event handlers have been executed.
I think there should be a wait for you to wait for the completion of events. My events are events, I would prefer not to treat them as hooks.
from cqrs.
For anyone who runs into this issue: I wrote a simple Event
-like hooks module with a similar usage to the EventBus. https://github.com/NestPack/hooks
from cqrs.
I personally love this @Johnhhorton. Nice idea 👍
from cqrs.
@jarmokoivuniemi @theo-bittencourt I think I ended up using the library mentioned hooks, or something similar. It makes sense for the event bus to be async but not if the process can be stopped before all handers have run. The current implementation is not correct if it cannot warranty that all handlers are going to be run before the process stops.
I do not mean, that you have to be able to wait for them to be done, that is a different requirement and perhaps it is not necessary for a event bus.
from cqrs.
While the hook approach from @harryhorton is definitely more sound than what I'm about to suggest, I think this should be doable by including a resolve callback in the event constructor. My use case is batching some events for some bulk external API requests. So, in my case, I use this with sagas.
Here is a heavily stripped-down version of what I mean:
export class MyCustomEvent {
constructor(
public readonly remoteResolve: CallableFunction,
) {}
}
Then, when we publish events in a command handler:
// ...
await new Promise((resolve) => {
this.eventBus.publish(new MyCustomEvent(resolve));
}
// ...
And in the event handler/saga we could have something like this:
@EventsHandler(MyCustomEvent)
export class MyCustomEventHandler implements IEventHandler<MyCustomEventEvent> {
constructor() {}
handle(event: MyCustomEvent) {
// do logic
event.remoteResolve();
}
}
I can see why this would be violating some principles of coupling, but in my case, I didn't want to introduce a Redis queue with reducers.
from cqrs.
I'm with a similar problem faced by @leolara.
In my case, I need to somehow wait for handlers to finish before closing the app in the Jest's e2e testing.
from cqrs.
I'm with a similar problem faced by @leolara.
In my case, I need to somehow wait for handlers to finish before closing the app in the Jest's e2e testing.
Did you manage to solve this? I'm facing the same issue.
from cqrs.
It's annoying that this is closed with no real response. This is probably the biggest shortcoming of this module.
from cqrs.
Hi everyone. I've managed to solve this problem.
The TL;DR is don't use the NestJS CQRS EventBus
, instead use the EventEmitter @nestjs/event-emitter
.
Long Answer
@kamilmysliwiec is correct in saying that events by nature are async. This is the difference between eventual consistency and transactional consistency
By implementation, eventual consistency handles event downstream logic asynchronously long after the user has gotten their Http Response.
By implementation, transactional consistency is what most of you are looking for. Transaction concistency ensures that all the updates to the DB invoked by the system's commands handlers, and event handlers are all atomic. Meaning that you wrap your "command -> command handler -> event -> event handler -> command -> command handler -> ..." in one big transaction.
The difference between the two is that eventual consistency is in the name. The system will become consistent, eventually.
To comment on some of your concerns with error handling in the event handlers after the user has gotten their response is to adopt your own retry mechanism. There are many ways to do this, but generally speaking you'll have to adopt a message broker in between the publishing of events and consuming them. When consuming a message from the broker and an error occurs, the message will simply not be ACK'ed and the broker will allow retrying of the message.
Look here at this repo for a concrete example of using NestJs CQRS with a message broker to include resilience and error handling via a message broker (AWS SQS) nestjs-rest-cqrs-example
Now to solve everyone's issue with having NestJs CQRS module be transactionally consistent I'll provide some snippets of code to get you started with synchronous event handling
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Injectable } from '@nestjs/common';
@Injectable()
export class NestEventBusAdapter {
constructor(
private readonly eventEmitter: EventEmitter2,
) {
}
/**
* Synchronous event emitter using EventEmitter2 provided by @nestjs/event-emitter.
* @docs: Return the results of the listeners via Promise.all.
* @see: https://github.com/EventEmitter2/EventEmitter2#emitteremitasyncevent--eventns-arg1-arg2-
* @see https://docs.nestjs.com/techniques/events
*/
async sendEvent<TEvent extends DomainEvent>(events: TEvent[]): Promise<void> {
await Promise.all(
events.map(async (event) => {
try {
// FIXME: Fill in request-id
Logger.debug(`[request-id] "${event.constructor.name}" event published`);
return this.eventEmitter.emitAsync(event.constructor.name, event);
} catch (e) {
throw e;
}
}),
);
}
}
@Injectable()
@CommandHandler(RegisterUserCommand)
export class NestRegisterUserHandler implements ICommandHandler<RegisterUserCommand, User> {
constructor(
private readonly eventBus: NestEventBusAdapter,
) {
}
async execute(command: RegisterUserCommand): Promise<User> {
await this.eventBus.sendEvent(new UserCreatedEvent(...))
}
}
export abstract class DomainEvent {
public readonly id: string;
protected constructor() {
}
}
export class UserCreatedEvent extends DomainEvent {
constructor(
readonly id: string,
readonly email: string,
readonly firstName: string,
readonly lastName: string,
readonly role: string,
) {
}
}
// EventHandler.ts
import { DomainEvent } from '@libs/ddd/DomainEvent';
import { OnEvent } from '@nestjs/event-emitter';
/**
* Wrapper around OnEvent() that provides the following options:
* { suppressErrors: false, async: true, promisify: true }
*
* @see: https://docs.nestjs.com/techniques/events#listening-to-events
*/
export function EventHandler<T extends DomainEvent>(event: new (...args: never[]) => T): MethodDecorator {
return function(target: unknown, propertyKey: string | symbol, descriptor: PropertyDescriptor) {
OnEvent(event.name, { suppressErrors: false, async: true, promisify: true })(target, propertyKey, descriptor);
};
}
// NestUserCreatedHandler.ts
@Injectable()
export class NestUserCreatedHandler {
constructor(
) {}
@EventHandler(UserCreatedEvent)
async execute(event: UserCreatedEvent): Promise<string> {
[...]
}
}
from cqrs.
While it may not be possible with your event architecture, it would be a nice feature if async event handlers could be awaited from eventBus.publish, or if a workaround could be established without the overhead of the full CQRS featureset.
Events by nature have to asynchronous. If you - for some reason - want to delay an HTTP response, you can use RxJS Subjects and "signals/triggers" (subscribe to a subject from the controller and emit a signal from the event handler).
from cqrs.
Thanks for the advice! While using a subject would solve that problem, I'm looking for an easier to follow API. I've been looking for a way to avoid creating a hook system, which I'm totally willing to do, but I was hoping either an existing API or or approach to development would provide a solution. My ultimate goal is to write my modules in ways that allow them to be highly reusable as dynamic modules, while allowing modules that depend on them to inject custom business logic at specific events.
Thanks again, I'll keep trying approaches 👍
from cqrs.
Hi @kamilmysliwiec is there a chance to revisit this? I think it is a good idea to have this in NestJS. The equivalent concept that I can find is in Laravel Action library by Spatie https://github.com/spatie/laravel-queueable-action
It even has a retry mechanism.
from cqrs.
@cyoharry I recently forked the CQRS module and changed the AggregateRoot
, EventBus
and EventPublisher
classes to support async
methods (see here https://github.com/bradsheppard/nestjs-async-cqrs). I think this might help.
from cqrs.
@cyoharry I recently forked the CQRS module and changed the
AggregateRoot
,EventBus
andEventPublisher
classes to supportasync
methods (see here https://github.com/bradsheppard/nestjs-async-cqrs). I think this might help.
I tried your fork. But it still doesn't wait for the event(s) to finish. So my (cli) app still exits too soon.
from cqrs.
@sanderlissenburg If you can send me a reproducible example I'll take a look.
from cqrs.
@sanderlissenburg If you can send me a reproducible example I'll take a look.
Thank you for the kind offer. In the interest of time, I've chosen to drop the aggregate part and just call the repository from the command handler instead of the event handler. Because aggragateObject->commit()
is where the problem lies. That is still fire and forget, even with await
in front of it. Maybe if I have time left I revisit this part and come back to you with a simple example.
from cqrs.
While it may not be possible with your event architecture, it would be a nice feature if async event handlers could be awaited from eventBus.publish, or if a workaround could be established without the overhead of the full CQRS featureset.
Events by nature have to asynchronous. If you - for some reason - want to delay an HTTP response, you can use RxJS Subjects and "signals/triggers" (subscribe to a subject from the controller and emit a signal from the event handler).
Can u please let me know how exactly can I handle this ? for example I am triggering a command in controller and want to wait till event handler is not finished its work ? Any example would be appreciated.
Thanks
from cqrs.
Related Issues (20)
- Middleware chain for command and querybud HOT 2
- Saga events - request context HOT 1
- documentation request HOT 1
- problems constructing instance with Mikro ORM HOT 1
- Feat: Register Commands, Queries & Events programatically HOT 1
- Multiple execution of events based on how many times module is imported HOT 1
- Adjust EventsHandler typing to accept the type of class extending IEvent
- Double events when using microservice HOT 2
- Sagas stop working in case of exceptions HOT 2
- Command handler becomes unresponsive HOT 4
- Event Handler fails to Trigger After Error Occurs HOT 1
- Use a decorator to automatically merge event publisher into AggregateRoot HOT 1
- Http Exceptions thrown in certain situations cause an app crash. HOT 1
- App crashes if event handler triggers a command that throws HOT 1
- CommandHandleException with Command Name not command uuid HOT 2
- Commit method throws an error when trying to publish domain events HOT 1
- Patch update reflect-metadata HOT 1
- Request scope or an alternative for multitenancy, using CQRS module
- Use custom command publisher
- Support for Request Scoped/Durable Providers HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from cqrs.