Git Product home page Git Product logo

Comments (22)

henry-hz avatar henry-hz commented on August 23, 2024

If pointing a Command to a Process Manager sounds too much heresy, we can simply pipe this command through a third entity, that will turn on the PM, Saga, Scheduler, and the same command arrives directly to the desired aggregate. We can label this command with a 'starter' tag, and this entity (maybe call it 'bootstrapper') will take care of starting all the players involved in the process.

Milan's answer about narrow subscriptions: exponentially/extreme#18 (comment)

from commanded.

gregoryyoung avatar gregoryyoung commented on August 23, 2024

Why would it be an issue to send a command to a process manager? It is just a message.

from commanded.

henry-hz avatar henry-hz commented on August 23, 2024

Welcome @gregoryyoung, it's an honor to have you around :)
Commanded is following some purities to keep the design very very simple, and "pure functional". The aggregate only receives commands, and emit events (we even made a cool monad in Monadex lib for fast business modeling).

Process managers were also designed with a limitaion in receiving events and dispatching commands, focusing only on "orchestrating" aggregates. Sending "commands" would make it looks like having "business logic", instead of purely reacting between aggregates. We chat about it, I argued that it's too limiting, specially when changing the subscription strategy from "subscribe all per process manager" to "subscribe on what you want". Also conceptually, in BPMN business processes have a 'start' point. So I like the idea to have commands in PM's.

Another point. To make a simpler PM, it would be nice to subscribe events before the stream exists, so before starting the PM, I could setup all the necessary subscriptions, like in RabbitMQ. Why Eventstore limits subscriptions only for existing streams ? Is it the right to have ES to make the MQ job ?

About the process manager data, do you think that the right design is to have a Pure FSM (as posted above), without persisting any data inside, or holding some data beyond the machine state (specially common data as the stream ID's) that is reusable by several states, would be also a good design ?

from commanded.

gregoryyoung avatar gregoryyoung commented on August 23, 2024

I think you are looking for a pattern here that seems to be missing CorrelationId. Process managers generally subscribe to correlation id not to specific streams.

from commanded.

henry-hz avatar henry-hz commented on August 23, 2024

@slashdotdash can we subscribe for correlation-ids ? Is it here ? https://github.com/slashdotdash/eventstore/blob/282a7ddd56b842886d93575d5da43f36d84bc1d5/lib/event_store/subscriptions/stream_subscription.ex

from commanded.

slashdotdash avatar slashdotdash commented on August 23, 2024

@henry-hz The identifier returned by the process manager's interested?/1 function can be thought of as the correlation id.

This is used as the process manager instance's identity and to filter relevant events from the event subscription.

defmodule TransferMoneyProcessManager do
  @behaviour Commanded.ProcessManagers.ProcessManager

  def interested?(%MoneyTransferRequested{transfer_uuid: transfer_uuid}), do: {:start, transfer_uuid}
  def interested?(%MoneyWithdrawn{transfer_uuid: transfer_uuid}), do: {:continue, transfer_uuid}
  def interested?(%MoneyDeposited{transfer_uuid: transfer_uuid}), do: {:continue, transfer_uuid}
  def interested?(_event), do: false
end

from commanded.

gregoryyoung avatar gregoryyoung commented on August 23, 2024

@henry-hz write the following projection (assumes correlation id is in the metadata)

fromAll().
    when({
           $any : function(s,e) {
                 if(e.metadata.correlationId) {
                       linkTo(e.metadata.correlationId, e);
                 }
           }
    })

This will give you a stream per correlation id. Then its just a matter of listening to that stream. Alternatively I would implement this with a fromAll subscription as you will be routing to your process managers anyways

from commanded.

henry-hz avatar henry-hz commented on August 23, 2024

@gregoryyoung got the first idea.
The second idea would be to subscribe to all events in one router only, and use it to filter by correlation-id as the query above ? and this router would 'know' before hand about any created process manager genserver, and flow all messages to them ?

@slashdotdash sure, but seems that 1 router for all PM could be a better design, also adding officially correlation-id & causation-id as Greg posted here. @gregoryyoung do you still hold this approach ?

from commanded.

henry-hz avatar henry-hz commented on August 23, 2024

About the process manager persistence, I found a good answer here "It's a read model. You can rebuild the process manager from the history of events each time you need it; or you can treat it like a snapshot that you update."

PM's, coded to support correlation-id, are really a read model ? If yes, we have both options above.
Seems that the best design option depends on other question: Should we send commands to PM's ? If yes, commands are not persisted, so PM's wouldn't be a read model, and we would need to persist them.

This topic talks about it. @gregoryyoung do you still holds this strategy posted there ?

Make something called "context" when you go to write your events 
take corr/cause id from "context" and put them on the metadata of all 
events you are emitting. Projections does this internally btw 
for all events you write from a projection if the metadata is on the 
event you are responding to.

from commanded.

slashdotdash avatar slashdotdash commented on August 23, 2024

I like the idea of process managers subscribing to events using the correlation Id.

Rebuilding their state will then exactly follow the approach used by aggregates. Replay all the events with the correlation Id they are interested in and mutate state using the apply/2 function.

It would require the hosting infrastructure (e.g. Commanded) to correctly apply the correlation and causation Ids to persisted events. I'm not sure if this could be completely automated and hidden from the user.

from commanded.

gregoryyoung avatar gregoryyoung commented on August 23, 2024

from commanded.

gregoryyoung avatar gregoryyoung commented on August 23, 2024

from commanded.

henry-hz avatar henry-hz commented on August 23, 2024

thanks @gregoryyoung
@slashdotdash below a first draft : (you can plot here)
maybe the command router could be the "context" @gregoryyoung commented above, injection there transparently, the necessary meta-data.
I did it very fast, super-busy deploying this and next week...
please@slashdotdash let me know if you can advance. Maybe using my closed PR to continue the DB abstraction ?
The idea of using metadata, could clean the "interested?" subscriptions in PM's elegantly.
As we today are using a flag in Command Router to specify the aggregate ID as identity, a bool flag could be used to sign that this command is a PM boot, and from there meta data takes care of the flow.
Note that we have more players in the scene, aggregate-id, process-manager-id, message-id, correlation-id, causation-id. I used X-Request-ID only as a causation ID illustration from external services.

"For instance, the process manager writes its own id as the correlationId in the command; the events produced by a copy the correlation id of the command, and your process manager subscribes to all events that have its own correlationId." from this link

title CQRS Eventsourcing
Client            -> REST API         : PUT(/orders/123/, X-Request-ID=99)
REST API          -> command router   : dispatch(PlaceOrder{order: 123, meta: [msg_id: 1, co_id:2, ca_id:99])
command router    -> process manager  : start(pm_id: 2)
process manager   --> bus             : subscribe(co_id: 2)
command router    -> registry         : open_aggregate(order, ord_id: 123)
registry          -> order aggregate  : start_aggregate(order, ord_id: 123, co_id: 2)
order aggregate   -> eventstore       : persist_events(order_placed)
eventstore        -> bus              : publish(all_events)
bus               -> process manager  : publish(order_placed, agg_id: 123, co_id: 2)
process manager   -> Registry         : dispatch(make_seat_reservation[id = 1])
process manager   -> process manager  : awaiting reservation
registry          -> reservation      : start(make_seat_reservation[id = 1])
reservation agg   -> eventstore       : persist(reservation_accepted, agg_id: 343, co_id: 2])
eventstore        --> process manager : publish(reservation_accepted, co_id: 2)
process manager   -> command router   : dispatch(mark_order_as_booked[id = 1])
process manager   -> command router   : dispatch(expire_order[id = 1])
command router    -> registry         : open_aggregate(mark_order_as_booked[id = 1])
command router    -> registry         : open_saga(expire_order[id = 1])
process manager   -> process manager  : awaiting payment
registry          -> order aggregate  : start(mark_order_as_booked[id = 1])
registry          -> scheduller  : start(expire_order[id = 1])

from commanded.

henry-hz avatar henry-hz commented on August 23, 2024

@slashdotdash any idea on how to abstract the subscriptions ? I thought that the simplest would be sending the process-manager-instance PID to the subscription gen-server and get updates from there.

About the correlation-id, let's go: we flag the command that is the 'cause' of all consequent events that are correlated with this command. So the process manager will receive this first event (that was also flagged, coming from the flagged command) and will say: "oho, you, event, are you the one that will cause all commands in this business process ? so let's copy your id into the correlation-id meta-data on all commands emitted here, and consequently, events on aggregates created by this commands, will also include this correlation-id." Meanwhile, [approach 01], the process manager already subscribed the "query" over the first flagged id, once we had known beforehand that this id is a future correlation-id, so a "projection" was started to listen that id, and the same will be used to feed the process manager subscription. Meanwhile, [approach 02], when we started the application, a gen-server subscribed to all events from eventstore, and will be waiting for the process manager to subscribe the "query" over the first flagged id, once we had known beforehand that his id is a future correlation-id, so process manager will be subscribed to receive filtered messaged based on correlation-id from this gen-server.
I personally like the first approach, on extreme (greg eventstore) we can use this general query to create one stream per correlation-id, as described by @gregoryyoung

fromAll().
    when({
           $any : function(s,e) {
                 if(e.metadata.correlationId) {
                       linkTo(e.metadata.correlationId, e);
                 }
           }
    })

The question is how should we proceed to use the same approach using postgre (with the eventstore adapter) ? What do you suggest to implement the subscription/publication abstraction ?
The second approach seems to be more pragmatic, easy to implement, once for every adapter, a "find all" events query is enough.
Another issue, extreme has a "catch up" message, to notify that in case of replaying, until a certain point the events were already processed, and from that point, the subscriber should differentiate that the new coming events are really new, and still weren't processed. I still couldn't study how you implemented it, so if it's similar, the abstraction should be smoother.

from commanded.

drozzy avatar drozzy commented on August 23, 2024

Dumb question: How do you decide on correlation id? Can't there be multiple processes flowing through a set of events?

@henry-hz when you say:

About the correlation-id, let's go: we flag the command that is the 'cause' of all consequent events that are correlated with this command.

It looks more like causation-id than correlation-id. Again, I don't know what I'm talking about so feel free to ignore this.

P.S.: What about starting a process manager based on subscription to a stream as opposed to an aggregate id? You are free to write your own projections in event store, e.g. for correlation-id 111, named order-process-111, and then your process manager can declare that it subscribes to category $ce-order-process (here category as in event store category).

from commanded.

henry-hz avatar henry-hz commented on August 23, 2024

For me was difficult to create a PR, too many changes... specially on the metadata, etc... so I started a new project. for a while it's not active, because I am busy with other issues. I wish @slashdotdash to review this idea :) . In fact, it seems it could be a complete refactor.
https://github.com/work-capital/workflow/tree/master

from commanded.

gregoryyoung avatar gregoryyoung commented on August 23, 2024

from commanded.

henry-hz avatar henry-hz commented on August 23, 2024

@gregoryyoung this is ok, i got it, my difficult was how to implement it in commanded itself, so I started a POC with a different approach.

from commanded.

gregoryyoung avatar gregoryyoung commented on August 23, 2024

from commanded.

henry-hz avatar henry-hz commented on August 23, 2024

sure. the commanded lib uses callbacks to abstract pure functional structure like this, and encapsulate side-effects in the process manager instance. To implement the correlation_id into this model, abstracting it into the process_manager_instance, we would have to pass it on every handle call, and store it as a meta_data for every event. Once there is no meta-data support in the basic event structure, it would need a more deep refactoring to add it. I would like to implement here, but for me, was easier to start a new POC, where I abstracted the persistence to be used by process-managers, sagas (schedulers) and aggregates. To implement all the meta-data flow, in a way to let devs to choose what to meta-data should be flow under the hood, pipelines (as implemented on commands in commanded lib) should be an elegant solution.

from commanded.

drozzy avatar drozzy commented on August 23, 2024

I think this should be closed and a more focused issue should be opened. It's really old issue!

from commanded.

slashdotdash avatar slashdotdash commented on August 23, 2024

Closing, replaced by new issue: "Correlation and causation ids" (#70).

from commanded.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.