Git Product home page Git Product logo

Comments (17)

leolara avatar leolara commented on May 7, 2024 7

The problem with this is that you can shutdown the application with async events still being handled, and hence not completing the execution of the handler.

This is happening to me while using nestjs from the command line, my script closes before all the event handlers have been executed.

I think there should be a wait for you to wait for the completion of events. My events are events, I would prefer not to treat them as hooks.

from cqrs.

harryhorton avatar harryhorton commented on May 7, 2024 2

For anyone who runs into this issue: I wrote a simple Event-like hooks module with a similar usage to the EventBus. https://github.com/NestPack/hooks

from cqrs.

kamilmysliwiec avatar kamilmysliwiec commented on May 7, 2024 2

I personally love this @Johnhhorton. Nice idea 👍

from cqrs.

leolara avatar leolara commented on May 7, 2024 2

@jarmokoivuniemi @theo-bittencourt I think I ended up using the library mentioned hooks, or something similar. It makes sense for the event bus to be async but not if the process can be stopped before all handers have run. The current implementation is not correct if it cannot warranty that all handlers are going to be run before the process stops.

I do not mean, that you have to be able to wait for them to be done, that is a different requirement and perhaps it is not necessary for a event bus.

from cqrs.

ugudango avatar ugudango commented on May 7, 2024 2

While the hook approach from @harryhorton is definitely more sound than what I'm about to suggest, I think this should be doable by including a resolve callback in the event constructor. My use case is batching some events for some bulk external API requests. So, in my case, I use this with sagas.

Here is a heavily stripped-down version of what I mean:

export class MyCustomEvent {
  constructor(
    public readonly remoteResolve: CallableFunction,
  ) {}
}

Then, when we publish events in a command handler:

// ...
await new Promise((resolve) => {
  this.eventBus.publish(new MyCustomEvent(resolve));
}
// ...

And in the event handler/saga we could have something like this:

@EventsHandler(MyCustomEvent)
export class MyCustomEventHandler implements IEventHandler<MyCustomEventEvent> {
  constructor() {}

  handle(event: MyCustomEvent) {
    // do logic
    event.remoteResolve();
  }
}

I can see why this would be violating some principles of coupling, but in my case, I didn't want to introduce a Redis queue with reducers.

from cqrs.

theo-bittencourt avatar theo-bittencourt commented on May 7, 2024 1

I'm with a similar problem faced by @leolara.

In my case, I need to somehow wait for handlers to finish before closing the app in the Jest's e2e testing.

from cqrs.

jarmokoivuniemi avatar jarmokoivuniemi commented on May 7, 2024 1

I'm with a similar problem faced by @leolara.

In my case, I need to somehow wait for handlers to finish before closing the app in the Jest's e2e testing.

Did you manage to solve this? I'm facing the same issue.

from cqrs.

drewish avatar drewish commented on May 7, 2024 1

It's annoying that this is closed with no real response. This is probably the biggest shortcoming of this module.

from cqrs.

Adrianvdh avatar Adrianvdh commented on May 7, 2024 1

Hi everyone. I've managed to solve this problem.

The TL;DR is don't use the NestJS CQRS EventBus, instead use the EventEmitter @nestjs/event-emitter.

Long Answer

@kamilmysliwiec is correct in saying that events by nature are async. This is the difference between eventual consistency and transactional consistency

By implementation, eventual consistency handles event downstream logic asynchronously long after the user has gotten their Http Response.

By implementation, transactional consistency is what most of you are looking for. Transaction concistency ensures that all the updates to the DB invoked by the system's commands handlers, and event handlers are all atomic. Meaning that you wrap your "command -> command handler -> event -> event handler -> command -> command handler -> ..." in one big transaction.

The difference between the two is that eventual consistency is in the name. The system will become consistent, eventually.

To comment on some of your concerns with error handling in the event handlers after the user has gotten their response is to adopt your own retry mechanism. There are many ways to do this, but generally speaking you'll have to adopt a message broker in between the publishing of events and consuming them. When consuming a message from the broker and an error occurs, the message will simply not be ACK'ed and the broker will allow retrying of the message.

Look here at this repo for a concrete example of using NestJs CQRS with a message broker to include resilience and error handling via a message broker (AWS SQS) nestjs-rest-cqrs-example

Now to solve everyone's issue with having NestJs CQRS module be transactionally consistent I'll provide some snippets of code to get you started with synchronous event handling

import { EventEmitter2 } from '@nestjs/event-emitter';
import { Injectable } from '@nestjs/common';

@Injectable()
export class NestEventBusAdapter {
    constructor(
        private readonly eventEmitter: EventEmitter2,
    ) {
    }

    /**
     * Synchronous event emitter using EventEmitter2 provided by @nestjs/event-emitter.
     * @docs: Return the results of the listeners via Promise.all.
     * @see: https://github.com/EventEmitter2/EventEmitter2#emitteremitasyncevent--eventns-arg1-arg2-
     * @see https://docs.nestjs.com/techniques/events
     */
    async sendEvent<TEvent extends DomainEvent>(events: TEvent[]): Promise<void> {
        await Promise.all(
            events.map(async (event) => {
                try {
                    // FIXME: Fill in request-id
                    Logger.debug(`[request-id] "${event.constructor.name}" event published`);
                    return this.eventEmitter.emitAsync(event.constructor.name, event);
                } catch (e) {
                    throw e;
                }
            }),
        );
    }
}

@Injectable()
@CommandHandler(RegisterUserCommand)
export class NestRegisterUserHandler implements ICommandHandler<RegisterUserCommand, User> {

    constructor(
        private readonly eventBus: NestEventBusAdapter,
    ) {
    }

    async execute(command: RegisterUserCommand): Promise<User> {
        await this.eventBus.sendEvent(new UserCreatedEvent(...))
    }
}




export abstract class DomainEvent {
    public readonly id: string;

    protected constructor() {
    }
}

export class UserCreatedEvent extends DomainEvent {
    constructor(
        readonly id: string,
        readonly email: string,
        readonly firstName: string,
        readonly lastName: string,
        readonly role: string,
    ) {
    }
}

// EventHandler.ts
import { DomainEvent } from '@libs/ddd/DomainEvent';
import { OnEvent } from '@nestjs/event-emitter';

/**
 * Wrapper around OnEvent() that provides the following options:
 * { suppressErrors: false, async: true, promisify: true }
 *
 * @see: https://docs.nestjs.com/techniques/events#listening-to-events
 */
export function EventHandler<T extends DomainEvent>(event: new (...args: never[]) => T): MethodDecorator {
    return function(target: unknown, propertyKey: string | symbol, descriptor: PropertyDescriptor) {
        OnEvent(event.name, { suppressErrors: false, async: true, promisify: true })(target, propertyKey, descriptor);
    };
}

// NestUserCreatedHandler.ts

@Injectable()
export class NestUserCreatedHandler {

    constructor(

    ) {}

    @EventHandler(UserCreatedEvent)
    async execute(event: UserCreatedEvent): Promise<string> {
        [...]
    }
}

from cqrs.

kamilmysliwiec avatar kamilmysliwiec commented on May 7, 2024

While it may not be possible with your event architecture, it would be a nice feature if async event handlers could be awaited from eventBus.publish, or if a workaround could be established without the overhead of the full CQRS featureset.

Events by nature have to asynchronous. If you - for some reason - want to delay an HTTP response, you can use RxJS Subjects and "signals/triggers" (subscribe to a subject from the controller and emit a signal from the event handler).

from cqrs.

cyoharry avatar cyoharry commented on May 7, 2024

Thanks for the advice! While using a subject would solve that problem, I'm looking for an easier to follow API. I've been looking for a way to avoid creating a hook system, which I'm totally willing to do, but I was hoping either an existing API or or approach to development would provide a solution. My ultimate goal is to write my modules in ways that allow them to be highly reusable as dynamic modules, while allowing modules that depend on them to inject custom business logic at specific events.

Thanks again, I'll keep trying approaches 👍

from cqrs.

rhzs avatar rhzs commented on May 7, 2024

Hi @kamilmysliwiec is there a chance to revisit this? I think it is a good idea to have this in NestJS. The equivalent concept that I can find is in Laravel Action library by Spatie https://github.com/spatie/laravel-queueable-action

It even has a retry mechanism.

from cqrs.

bradsheppard avatar bradsheppard commented on May 7, 2024

@cyoharry I recently forked the CQRS module and changed the AggregateRoot, EventBus and EventPublisher classes to support async methods (see here https://github.com/bradsheppard/nestjs-async-cqrs). I think this might help.

from cqrs.

sanderlissenburg avatar sanderlissenburg commented on May 7, 2024

@cyoharry I recently forked the CQRS module and changed the AggregateRoot, EventBus and EventPublisher classes to support async methods (see here https://github.com/bradsheppard/nestjs-async-cqrs). I think this might help.

I tried your fork. But it still doesn't wait for the event(s) to finish. So my (cli) app still exits too soon.

from cqrs.

bradsheppard avatar bradsheppard commented on May 7, 2024

@sanderlissenburg If you can send me a reproducible example I'll take a look.

from cqrs.

sanderlissenburg avatar sanderlissenburg commented on May 7, 2024

@sanderlissenburg If you can send me a reproducible example I'll take a look.

Thank you for the kind offer. In the interest of time, I've chosen to drop the aggregate part and just call the repository from the command handler instead of the event handler. Because aggragateObject->commit() is where the problem lies. That is still fire and forget, even with await in front of it. Maybe if I have time left I revisit this part and come back to you with a simple example.

from cqrs.

rharutyunyan avatar rharutyunyan commented on May 7, 2024

While it may not be possible with your event architecture, it would be a nice feature if async event handlers could be awaited from eventBus.publish, or if a workaround could be established without the overhead of the full CQRS featureset.

Events by nature have to asynchronous. If you - for some reason - want to delay an HTTP response, you can use RxJS Subjects and "signals/triggers" (subscribe to a subject from the controller and emit a signal from the event handler).

Can u please let me know how exactly can I handle this ? for example I am triggering a command in controller and want to wait till event handler is not finished its work ? Any example would be appreciated.

Thanks

from cqrs.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.