commanded / commanded Goto Github PK
View Code? Open in Web Editor NEWUse Commanded to build Elixir CQRS/ES applications
License: MIT License
Use Commanded to build Elixir CQRS/ES applications
License: MIT License
I tried using the Supervisor setup as described in the README, but the app then crashes on startup:
** (Mix) Could not start application my_app: MyApp.Application.start(:normal, []) returned an error: shutdown: failed to start child: MyApp.Supervisor
** (EXIT) shutdown: failed to start child: Commanded.Supervisor
** (EXIT) shutdown: failed to start child: Registry
** (EXIT) already started: #PID<0.226.0>
It seems that the example setup does not work anymore with Elixir 1.4+, due to the Application inference changes, causing commanded
to be automatically started. Adding Commanded.Supervisor
to the supervision tree causes the Registry
to be started twice under the same process name.
I'm still new to Elixir and not entirely sure how this could be fixed, but wanted to open this issue to let you know. I'd be happy to help out with fixing the README and/or testing the fixed example supervision setup, if needed.
Implement rolling snapshots for aggregate state. This uses the memento pattern to serialize an aggregate's state. The serialized state is stored in the Event Store as snapshot data.
Instead of loading every event for an aggregate when rebuilding its state you load the snapshot and then replay every event since it was created.
As an example, assume the snapshot was taken after persisting an event for the aggregate at version 100. When the aggregate process is restarted we load and deserialize the snapshot data as the aggregate's state. Then we fetch and replay the aggregate's events from version 100.
This is a performance optimisation for aggregate's that have a long lifetime or raise a large number of events.
Configure snapshotting per aggregate within the app configuration. Define how frequently snapshots are taken (after every x events) and an optional snapshot version.
# config/config.exs
config :commanded, ExampleAggregate
snapshot_every: 100,
snapshot_version: 1
Incrementing the snapshot version would ignore any previous snapshot taken at an earlier version during aggregate state rebuild. This allows changing the aggregate structure without requiring a snapshot data migration.
Create an "examples" folder inside the Command root folder, like the ecto application, with two examples, the bank account and the conference reservation system (from Microsoft book).
The supported event stores provide storing metadata per event appended to a stream. It would be useful to allow this metadata to be provided during commanded dispatch.
An example usage would be to include the identity of the user invoking the command:
command = %OpenAccount{account_number: "ACC123", initial_balance: 1_000}
metadata = %{"user_id" => "user-12345"}
:ok = BankRouter.dispatch(command, metadata: metdata)
The optional metadata would be recorded against any persisted events.
Replace String.to_atom
with String.to_existing_atom
in the Commanded.Event.JsonSerializer.deserialize/2
function.
This is to prevent Erlang atom exhaustion.
Process managers handle events and return commands to be dispatched in response (they may return no commands, as nil
or []
).
Currently, the commands must be successfully dispatched or the process manager will crash.
To allow you to configure how command dispatch errors are handled a new error/3
callback is proposed for the Commanded.ProcessManagers.ProcessManager
behaviour. This callback function is used to handle any errors returned from command dispatch. You can use pattern matching on the error and/or command to explicitly handle certain errors or commands.
The proposed error handling options are:
{:retry, context}
- retry the failed command, provide a context map to provide state to subsequent failures. This could be used to count the number of retries, failing after too many attempts.{:retry, delay, context}
- retry the failed command, after sleeping for the requested delay (in milliseconds). Context is a map, the same as {:retry, context}
above.{:skip, :discard_pending}
- discard the failed command and any pending commands.{:skip, :continue_pending}
- skip the failed command and continue dispatching any pending commands.{:continue, commands, context}
- continue dispatching the given commands. This allows you to retry the failed command, modify it and retry, drop it, or drop all pending commands by passing an empty list []
.{:stop, reason}
- stop the process manager with the given reason.The default behaviour will be to stop the process manager using the error reason returned from the failed command dispatch.
defmodule ExampleProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: "ExampleProcessManager",
router: ExampleRouter
# stop process manager after third failed attempt
def error({:error, _failure}, _failed_command, _pending_commands, %{attempts: attempts})
when attempts >= 2
do
{:stop, :too_many_attempts}
end
# retry command, record attempt count in context map
def error({:error, _failure}, _failed_command, _pending_commands, context) do
context = Map.update(context, :attempts, 1, fn attempts -> attempts + 1 end)
{:retry, context}
end
end
Configuring command dispatch using the Commanded.Commands.Router
macro forces you to implement a command handler per command.
defmodule ExampleRouter do
use Commanded.Commands.Router
dispatch OpenAccount, to: OpenAccountHandler, aggregate: BankAccount, identity: :account_number
dispatch DepositMoney, to: DepositMoneyHandler, aggregate: BankAccount, identity: :account_number
end
Often the command handler will just delegate to the aggregate root. It does nothing more and adds no value.
defmodule Commanded.ExampleDomain.DepositMoneyHandler do
alias Commanded.ExampleDomain.BankAccount
alias Commanded.ExampleDomain.BankAccount.Commands.DepositMoney
@behaviour Commanded.Commands.Handler
def handle(%BankAccount{} = aggregate, %DepositMoney{} = deposit_money) do
aggregate
|> BankAccount.deposit(deposit_money)
end
end
To reduce boilerplate code the command handlers should be optional. The default behaviour will be to dispatch the command to the aggregate.
A proposal for routing without a command handler. The to:
key is the aggregate root itself and the aggregate:
key is not required.
defmodule ExampleRouter do
use Commanded.Commands.Router
dispatch OpenAccount, to: BankAccount, identity: :account_number
dispatch DepositMoney, to: BankAccount, identity: :account_number
end
By default the command will be dispatched to an aggregate root function named execute
. Pattern matching will ensure that it invokes the correct function for a given command struct.
Consider making the command to function mapping configurable. As an example the function could be selected by the name of the command. So a command named OpenAccount
maps to the function open_account/2
.
It must still be possible to configure a command handler for the cases where they are required. An example would be if you need to request additional data for the command from elsewhere (e.g. external HTTP request).
A function that creates an aggregate root is currently the same as any function expecting an existing aggregate instance.
You must use some field from the aggregate's state to determine whether it is a new (default state) or existing (populated from the first raised event) instance.
Here's an example using the bank account aggregate in the unit tests. It uses pattern matching on the bank account's account_number
field checking that it is nil
to ensure a new account is being opened
%BankAccount{}
stateThe execute/2
function receives default state when no events exist for the aggregate:
defmodule Commanded.ExampleDomain.BankAccount do
defstruct [
account_number: nil,
balance: 0,
]
def execute(%BankAccount{account_number: nil}, %OpenAccount{account_number: account_number, initial_balance: initial_balance})
when is_number(initial_balance) and initial_balance > 0
do
%BankAccountOpened{account_number: account_number, initial_balance: initial_balance}
end
end
An execute/1
function would be called when there are no events for the given aggregate (i.e. we are creating a new instance):
defmodule Commanded.ExampleDomain.BankAccount do
defstruct [
account_number: nil,
balance: 0,
]
def execute(%OpenAccount{account_number: account_number, initial_balance: initial_balance})
when is_number(initial_balance) and initial_balance > 0
do
%BankAccountOpened{account_number: account_number, initial_balance: initial_balance}
end
end
Alternatively, rename the execute
function to create
when executing a command against a new aggregate instance:
defmodule Commanded.ExampleDomain.BankAccount do
defstruct [
account_number: nil,
balance: 0,
]
def create(%OpenAccount{account_number: account_number, initial_balance: initial_balance})
when is_number(initial_balance) and initial_balance > 0
do
%BankAccountOpened{account_number: account_number, initial_balance: initial_balance}
end
end
This makes it explicit that certain aggregate function(s) expect to create a new instance. Pattern matching will cause an exception when attempting to create more than one instance (e.g. open an account for the same account number).
I'm working on an open-source p2p application that I want to structure with your library, but don't want to require users to have to install anything other than BEAM and Elixir. I'm aware of the guidance you provided here with regards to custom adapters, but they seem relatively simple (presumably because your eventstore library is doing most of the heavy lifting).
Is my only option for using mnesia as the event store to write a library similar to eventstore and an adapter? If so, do you have any useful resources to point me towards, as I've only been writing in Elixir for a few months, and haven't yet messed with mnesia or amnesia?
Any help would be greatly appreciated, thank you!
PS: can't wait for the conduit book to come out!
It would be useful to be able to define a prefix on a per-aggregate basis. This would reduce risk of collisions when using a lot of aggregates with generated IDs and ease separation of streams on a business logic level.
Let's assume we have a general User aggregate which uses UUIDs as identification. For our business logic we need to relate additional data to this user, as such we add additional aggregates which also use the user ids as identification for ease of use. While being related but separate entities this would result in a single stream containing all events from all aggregates per user.
Similar to process managers or event handlers the name of an aggregate could be defined in a use clause. This could internally define a @behaviour
conformance with a callback providing the aggregate name.
To allow backwards compatibility the Aggregate
module could use function_exported?/3
to check for the behaviour conformance in advance and fall back to the current logic. The old behaviour could be deprecated if wished for and removed in a later version.
What do you think of this?
Port the event subscription strategy from router to process_manager_instances, and completely remove process manager router. The idea is to start the process manager that is part of a specific transaction directly from the command router, based on the aggregate_id that is supposed to generate events for that PM. Note that because we are starting the process manager using an event, we are sending the first command (place order) to process manager to let it subscribe dynamically, but another option would be to officially support commands in the process manager following the diagram posted here. In the flow below, the scheduler(saga) will be inactive, but it is interesting how PM would react in the case it publishes the event "order expired"... because it's an FSM, sending a "cancel expiration" is not needed, once the FSM state should be 'completed' when receiving a 'payment received' before the 'order expired' event. (fix: first send a command, wait for : ok (so stream was created) , subscribe and afterwards change state.
I need urgent help with this issue, once the multi-db #37 implementation depends on this simplification.
Advantages:
If you want to edit a new sequence diagram, copy the script below to www.websequencediagrams.com
title CQRS Eventsourcing
REST -> command router: place_order[id = 1]
command router -> process manager: start(place_order[id = 1])
process manager --> eventstore : subscribe(order_placed[id = 1])
command router -> registry: open_aggregate(place_order[id = 1])
registry -> order aggregate: start(place_order[id = 1])
order aggregate -> eventstore: persist(order_placed[id = 1])
eventstore -> process manager: publish(order_placed[id = 1])
process manager --> eventstore : subscribe(reservation_accepted[id = 1])
process manager --> eventstore : subscribe(reservation_rejected[id = 1])
process manager -> registry: dispatch(make_seat_reservation[id = 1])
process manager -> process manager: awaiting reservation
registry -> reservation aggregate: start(make_seat_reservation[id = 1])
reservation aggregate -> eventstore: persist(reservation_accepted[id = 1])
eventstore --> process manager: publish(reservation_accepted[id = 1])
process manager --> eventstore: subscribe(order_expired[id = 1])
process manager --> eventstore: subscribe(payment_received[id = 1])
process manager -> command router : dispatch(mark_order_as_booked[id = 1])
process manager -> command router : dispatch(expire_order[id = 1])
command router -> registry: open_aggregate(mark_order_as_booked[id = 1])
command router -> registry: open_saga(expire_order[id = 1])
process manager -> process manager: awaiting payment
registry -> order aggregate: start(mark_order_as_booked[id = 1])
registry -> scheduller: start(expire_order[id = 1])
payment -> eventstore: persist(payment_received[id = 1])
eventstore --> process manager: publish(payment_received[id = 1])
process manager -> command router: dispatch(commit_reservation[id = 1])
process manager -> process manager: completed
command router -> registry: open_aggregate(commit_reservation[id = 1])
registry -> order aggregate: start(commit_reservation[id = 1])
order aggregate -> eventstore: persist(reservation_commited[id = 1])
As described above, using the Command Dispatcher, that is not a gen_server, we can start the process manager and the first subscription, but we still have to figure out how to continue.
Let me try to draft a solution to have only one gen_server: because we know beforehand which commands are causing which events, i.e. which causes are causing which effects, we do have a list of cause and effect. This is in relation on commands and event types, but the UUID's are known in runtime only. If we let the process manager knows this list, and dynamically store UUID's, we will be able to subscribe to specific streams dynamically, always before sending a command, requesting a subscription for the 'effects' of that command. In the FSM it's even more explicitly, once we clearly defines them. Immediately before we send a %MakeSeatReservation{}, we subscribe for %ReservationAccepted and %ReservationRejected events. It's also performance friendly, once we allocate resource dynamically, and even, after leaving states, we can unsubscribe.
defstate non_started do
defevent on(%OrderPlaced{uuid: uuid} = order) do
next_state(:awaiting_reservation, [%MakeSeatReservation{uuid: uuid}])
end
defevent _, do: respond(:error)
end
# AWAITING RESERVATION CONFIRMATION
defstate awaiting_reservation do
defevent on(%ReservationAccepted{uuid: id} = reservation) do
next_state(:awaiting_payment, [%MarkOrderAsBooked{}, %ExpireOrder{}])
end
defevent on(%ReservationRejected{} = reservation) do
next_state(:completed, %RejectOrder{})
end
defevent _, do: respond(:error)
end
# AWAITING PAYMENT
defstate awaiting_payment do
defevent on(%OrderExpired{} = order) do
next_state(:completed, [%CancelSeatReservation{}, %RejectOrder{}])
end
defevent on(%PaymentReceived{} = payment) do
next_state(:completed, [%CommitSeatReservation{}])
end
defevent _, do: respond(:error)
end
# COMPLETED
defstate completed do
defevent _, do: respond(:error)
end
Add a module that defines the behaviour expected for an event store to be used with Commanded.
Currently only the PostgreSQL-based eventstore is supported. By explicitly defining the interface between an Commanded and an event store we can support additional storage via individual adapters.
This has been done in #47.
Create a GenServer based in-memory adapter to be used in unit tests.
Follow the integration test approach adopted by ecto to support test cases for multiple adapters.
Dispatching a command returns an :ok
response when successfully handled by the registered command handler or aggregate.
:ok = Router.dispatch(%RegisterAccount{uuid: UUID.uuid4(), username: "slashdotdash"})
This provides a strong consistency guarantee for the write side (in a CQRS application). You are guaranteed that the write succeeded once you receive the :ok
.
Event handlers in Commanded are asynchronous when processing persisted events. This allows parallelisation of handlers for both performance and independence so that one slow handler doesn't negatively affect any other. The downside of this approach is eventual consistency for your read models; after successful command dispatch you have no way of knowing when the handlers have processed the events created by the command .
The current solution to eventual consistency in the read model is to use one of the following approaches: "fake" the response by using data from the command; poll the read model until it has been updated; use pub/sub from the read model to notify subscribers when the event has been processed.
This requires you to write boilerplate code for every command dispatch that needs to subsequently query the read model that will be updated by events created from the command. Most web apps use the POST/Redirect/GET pattern that is affected by this problem. As an example, after publishing a blog post the user should be redirected to view their new post.
The proposed solution is to allow "synchronous" command dispatch and allow you to configure which event handlers should run with strong, not eventual, consistency.
Add a consistency
option to the command dispatch
function:
:ok = Router.dispatch(%RegisterAccount{uuid: UUID.uuid4(), username: "slashdotdash"}, consistency: :strong)
The supported consistency options would be :eventual
(the current behaviour and default) and :strong
.
For each event handler, and process manager, you would be able to specify its consistency guarantee.
Article read model projection (an event handler) requests :strong
consistency:
defmodule ArticleProjector do
use Commanded.Event.Handler,
name: "UserProjector",
consistency: :strong
def handle(%ArticlePublished{title: title, body: body}) do
# ... update read model
end
end
An email sending event handler can be configured for :eventual
consistency since it can safely run async:
defmodule SendWelcomeEmail do
use Commanded.Event.Handler,
name: "SendWelcomeEmail",
consistency: :eventual
def handle(%UserRegistered{name: name, email: email}) do
# ... send welcome email async
end
end
It would also be possible to define the consistency for process managers in exactly the same manner.
When dispatching a command using consistency: :strong
the dispatch will block until all of the strongly consistent event handlers and process managers have handled all events created by the command. This guarantees that when you receive the :ok
response from dispatch your strongly consistent read models will have been updated and can safely be queried.
This proposal would be backwards compatible with any existing usage. The default consistency guarantee will be eventual, as it already is. But you will now be allowed to opt-in to strong consistency when dispatching a command.
Dear @slashdotdash , let me ask you why all the routing structure for handling commands and events, wouldn't be easier to use the GenEvent framework instead ?
thanks
Using the behaviour defined in #49. Migrate the adapter in #47 into its own project & Git repo.
This will add support for Greg Young's geteventstore via the extreme client.
Is it possible to have metadata available in command handler? My use-case is I need to attach some extra information like user IP address, user agent when calling third party APIs in command handlers.
The eventstore
API has been extended to support creating a subscription from a specified position. You can choose to subscribe to the all stream's origin, its current position, or a given event id.
When registering a new event handler or process manager the subscription it initially creates should be created from the position provided to the start_link
function. This allows the user to determine which events it receives. By default this should be the origin position so that all events are received.
You can choose to start a subscription from the current event store position. Use this when you don't want newly created event handlers or process managers to go through all previous events (e.g. event handler to send transactional emails added to a deployed system).
An instance of a process manager persists its state after processing an event using the EventStore's snapshot facility. It should record the id of the event it has processed as part of the snapshot metadata.
This can be used to ensure that events are not handled more than once. The EventStore guarantees at least once delivery of each event to a subscriber. An event may be received more than once. So the process manager instance must ignore duplicates.
Hey!
I was trying out a sample app with commanded and changed my backing eventstore - but Commanded kept coming back to the InMemory store - which wasn't what my config set. Reinstalling + compiling fixed this.
I assume the problem is https://github.com/slashdotdash/commanded/blob/master/lib/commanded/event_store/event_store.ex#L13
When a command is dispatched it is routed to the single registered command handler by the configured router. The aggregate is loaded, or created as necessary. The handler executes a function on the aggregate passing in the received aggregate state.
The aggregate and command handler both return the aggregate state on success. The pending events are persisted to the event store.
Should the command fail, an {:error, ... }
tuple should be returned. This would prevent the aggregate state and pending events from being persisted.
Add support for command format validation to support basic data validation rules.
Used to check the different data types within each command for compliance with criteria. Some example validation rules include check for attribute presence, absence, inclusion, exclusion, format, length.
Using a library such as vex would provide a set of rules and validator.
Validation failures should be returned as an error from the call to dispatch the command. An aggregate may also reject a command due to business rule failures. These should be treated in the same way and returned to the command dispatcher as errors.
At some point after deployment of a process manager you may realise that you need additional data in its state, available from events already seen/ignored.
Greg Young describes a number of strategies for versioning process managers:
Another approach is to event source process manager state allowing it to be rebuilt whenever required (#127). This can be combined with state snapshotting as a performance optimisation.
To support upcasting of state, Commanded could: replay a process manager's events to allow each instance's state to be rebuilt (and not dispatch commands during replay); or provide an upgrade
function that is called to mutate the process manager's state to a new version; or event source its state.
When manually sending TransferMoney commands, like the one below, and checking it's creation in the Observer, I noted that after the process is finished, it continues there pending. After reaching the final state, the process could suicide, and a TTL also would be suitable for high loading, and cache only what is in real use. Please, let me ask you to comment how commanded is in production, some benchmarks.
thanks :)
:ok = Bank.Router.dispatch(%TransferMoney{source_account: "ACC333", target_account: "ACC123", amount: 5})
Hi @slashdotdash,
really interesting project, I already trying to apply it, for some tests on my existing project
I found some issues, for example problem when I fetch dependencies via hex.pm
looks like you didn't bumped up version, but you made already some changes (fixes)
I switched from hex to github and now its working.
Would be good to wire commanded, eventsourced and eventsource via version on hex, instead of github, but probably its to early for it.
I will send you soon some feedback about using it on my project, I would love to discuss validations topics.
Next thing is handling commands with ack, in my case I have splited CQRS on http level:
POST /command
GET /query
WS /updates
I'm thinking also about adding:
GET /command
then we could handle commands as: fire & forget or fire & wait for ack.
This will require probably passing PID of web process to command handler, just idea.
If you are interested in feedback and discussion we can make skype call
Dear @slashdotdash , we are a fintech startup company, and we would like to use Commanded as our CQRS framework, but there are some critical differences that are pushing us to fork it and adapt for our needs. We noticed that you simplified (cleaned macros) the way you keep state on aggregates and process managers and it gave a new look in the overall code 👍 Below is a list of proposals we could contribute and finally have only one source code, please, let us know if you are interested in adding them, and helping to deploy as soon as possible. For a while, me and Shmuel, our CTO would start this process together, and if you have time available.
FSM support for process managers
https://github.com/work-capital/engine/blob/develop/lib/engine/process_manager/process_manager.ex
https://github.com/work-capital/engine/blob/develop/test/engine/process_manager/process_manager_test.exs
Keep state out of Process Manager
use process manager only to orchestrate between aggregates (see how we solve that, keeping target and source fields on account)
https://github.com/work-capital/engine/blob/develop/test/engine/example/account/account.ex
Snapshoting and Separete the Data Persistence Layer
to easily track bugs, and have 'one point' for database side efects, we created a 'Container' module, that also could have the 'replay' and other functions inside it.
https://github.com/work-capital/engine/blob/develop/lib/engine/container.ex
Support Eventstore database
create a 'plug-in' style, with a 'common message' between postgre and eventstore drivers:
https://github.com/exponentially/extreme
https://github.com/slashdotdash/eventstore
Monads support to test Aggregates and Process Managers without side-efects
We need to create several tests, and divide and train our developers to develop business features without side-effects. With Eventsourced, it was relatively easy, but still have some difficulties on piping, here an idea from Rob Brown: rob-brown/MonadEx#6 .
Differentiate between Domain error and Command Validation.
We need to track 'Exceptional' Events when an error is a 'Business Error'.
http://danielwhittaker.me/2016/04/20/how-to-validate-commands-in-a-cqrs-application/
Add excwption events
Solve the pending bugs as soon as possible
We have a high priority in supporting the Eventstore database, implement a good FSM support for process managers, and Monads for free testing environment, and we have a full day time to write, once we need to be in production in 2 weeks, so please, let us know if you are interested in implement these features and guide us to write it as soon as possible.
best
Henry & Shmuel
Include the following unit test helpers that allow verification of raised events.
https://gist.github.com/slashdotdash/c1cdc2e58683d897b862c527ec9d0d8f
@gregoryyoung @slashdotdash read an interesting article about event sourcing and how the pieces fit together and would very much appreciate your opinion on what it proposes.
It describes the familiar aggregates, process managers and projections in terms of their inputs and outputs, but also "fills in the gaps" with new terms and modules currently unaddressed (at least by my own understanding) by traditional event sourcing architectures.
My questions are:
commanded
?commanded
into a commanded
-based event sourcing system?How would you recommend I go about implementing these gateways - with some of these new patterns/modules or with a different pattern altogether?
Thanks for your time and your help with the other issue I opened!
Port this idea to commanded: https://github.com/work-capital/engine/blob/develop/lib/engine/storage/storage.ex
Let me suggest to have a "storage" abstraction here, and use https://github.com/slashdotdash/eventstore as Postgre dependency to connect to Postgree and https://github.com/exponentially/extreme to connect to 'GetEventStore'
Once #49 is done, the existing eventstore integration should be moved into its own project & Git repo.
Included in #47.
To allow long running process managers to handle timeouts.
A process manager may request to be notified after a given period or at an exact date and time.
When the time arrives the process manager is notified and it may dispatch commands in response.
Requires consideration of how to persist these time-based triggers to survive application restarts and process crashes.
We can start with tinyMQ
https://github.com/ChicagoBoss/tinymq
Allow a command router to define middleware modules that are executed before, after and on failure of each command dispatch.
This provides an extension point to add in command validation, authorization, logging, etc. Typical behaviour that you want to be called for every command the router dispatches.
It will follow the [middleware approach taken by Exq]https://github.com/akira/exq#middleware-support].
defmodule BankingRouter do
use Commanded.Commands.Router
middleware Commanded.Logger
middleware MyCommandValidator
middleware AuthorizeCommand
dispatch OpenAccount, to: OpenAccountHandler, aggregate: BankAccount, identity: :account_number
dispatch DepositMoney, to: DepositMoneyHandler, aggregate: BankAccount, identity: :account_number
end
The middleware modules will be executed in the order they’ve been defined. They will receive a struct containing the command being dispatched.
defmodule MyMiddleware do
@behaviour Commanded.Commands.Middleware
def before_dispatch(%{command: command} = pipeline) do
pipeline
end
def after_dispatch(%{command: command} = pipeline) do
pipeline
end
def after_failure(%{} = pipeline) do
pipeline
end
end
Router.dispatch/1
doesn't allow to retrieve the events produced by the aggregate in response to the command.
It could either be added as a breaking change to unify the API:
# Would have previously returned :ok
iex> Router.dispatch(command)
{
:ok,
%ExecutionResult{
aggregate_uuid: aggregate_uuid,
aggregate_version: aggregate_version,
events: events,
metadata: metadata,
}
}
Or as a non-breaking change, using a new option
iex> Router.dispatch(command)
:ok
iex> Router.dispatch(command, include_aggregate_version: true)
{:ok, aggregate_version}
iex> Router.dispatch(command, include_execution_result: true)
{
:ok,
%ExecutionResult{
aggregate_uuid: aggregate_uuid,
aggregate_version: aggregate_version,
events: events,
metadata: metadata,
}
}
How can we make our process react when things go wrong?
Before I start, I'd like to say that I'm posting this because of the idea of distributed sagas. You can check about that in the videos below:
Distributed Sagas: A Protocol for Coordinating Microservices - Caitie McCaffrey - JOTB17
Using sagas to maintain data consistency in a microservice architecture by Chris Richardson
Lets say we are modeling a Travel process which need to get reservation for hotel, car and flight.
Each aggregate (hotel, car and flight) has some kind of command ReserveX{} and a CancelXReservation{} and events XReserved and ReservationXCanceled. However, if it is not possible to reserve (no seats in the flight) the aggregate will return something like {:error, :no_available_seats}.
Now lets think about the TravelProcessManager, basically it listen to TravelCreated events (on the travel aggregate) and when that happens it dispatches three commands, ReserveHotel, ReserveCar, ReserveFlight.
In the happy path, it waits for the XReserved events. However, whenever it receives a {:error, :no_available_seats} it should cancel all other reservations that may have been made.
How can we handle that?
One idea is to make aggregates emit events for failure as well.
Maybe that is not the resposability of a process manager and I should implement that thing myself, and create some lib to implement that protocol directly. (Its basically a writeahead log and a rollback procedure using some DAG stuff). Anyway, I think that maybe something nice to add to the commanded lib.
I'd like to know what you guys think about it,
Cheers,
Bernardo
Greg Young explains correlation and causation ids succinctly as:
Every message has 3 ids, MessageId, CorrelationId, CausationId.
When you are responding to a message (either a command or an event) you copy the CorrelationId of the message you are responding to to your message. The CausationId of your message is the MessageId of the message you are responding to.
Currently, Commanded does not set causation id, and correlation id is set as a generated UUID when appending events. It should follow the approach outlined by Greg above.
It seems like the event data is being sent as a string and not as json to event-store.
For JSON you need to set data_content_type
of extreme lib to "1" (it is set to 0 by default):
https://github.com/exponentially/extreme/blob/5bc3b34a94bb8530964b674261b978910339c77c/lib/extreme.ex#L137
I'm not sure where you set this.
Here is my events in Gregg's event store sent by commanded (pay attention to the "data" element):
{
"correlationId": "ccf88746-06ba-459f-b93a-9ccce1cebdf1",
"readerPosition": {
"$s": {
"$ce-account": 6
}
},
"events": [
{
"eventStreamId": "account-333-121-568-3245",
"eventType": "Elixir.Bank.Events.AccountOpened",
"data": "{\"timestamp_utc\":1506096118,\"initial_balance\":0,\"client_id\":\"3324-john.oliver\",\"account_id\":\"333-121-568-3245\"}",
"metadata": "{\"$correlationId\":\"7e083e9e-bcf6-4cc5-8cc8-47d84f14ec50\"}",
"readerPosition": {
"$s": {
"$ce-account": 0
}
}
},
{
"eventStreamId": "account-333-121-568-3245",
"eventNumber": 1,
"eventType": "Elixir.Bank.Events.MoneyDeposited",
"data": "{\"timestamp_utc\":1506096512,\"amount\":1000,\"account_id\":\"333-121-568-3245\"}",
"metadata": "{\"$correlationId\":\"fc725dea-c9d9-4cb2-aa47-dcc0cc8ca59c\"}",
"readerPosition": {
"$s": {
"$ce-account": 1
}
}
},
{
"eventStreamId": "account-333-121-568-3245",
"eventNumber": 2,
"eventType": "Elixir.Bank.Events.MoneyWithdrawn",
"data": "{\"timestamp_utc\":1506096582,\"amount\":25,\"account_id\":\"333-121-568-3245\"}",
"metadata": "{\"$correlationId\":\"87c5ffdf-d424-48df-878f-ce276265487a\"}",
"readerPosition": {
"$s": {
"$ce-account": 2
}
}
},
{
"eventStreamId": "account-333-121-568-3245",
"eventNumber": 3,
"eventType": "Elixir.Bank.Events.MoneyDeposited",
"data": "{\"timestamp_utc\":1506096699,\"amount\":33,\"account_id\":\"333-121-568-3245\"}",
"metadata": "{\"$correlationId\":\"da047c54-7349-400b-a065-a9313413716c\"}",
"readerPosition": {
"$s": {
"$ce-account": 3
}
}
},
{
"eventStreamId": "account-333-121-568-3245",
"eventNumber": 4,
"eventType": "Elixir.Bank.Events.MoneyWithdrawn",
"data": "{\"timestamp_utc\":1506096699,\"amount\":661,\"account_id\":\"333-121-568-3245\"}",
"metadata": "{\"$correlationId\":\"61df179b-03ef-40f5-90e7-e9852a0aa526\"}",
"readerPosition": {
"$s": {
"$ce-account": 4
}
}
},
{
"eventStreamId": "account-333-121-568-3245",
"eventNumber": 5,
"eventType": "Elixir.Bank.Events.MoneyDeposited",
"data": "{\"timestamp_utc\":1506096699,\"amount\":500,\"account_id\":\"333-121-568-3245\"}",
"metadata": "{\"$correlationId\":\"2292148d-70ea-4885-b4cb-968070c2f2d2\"}",
"readerPosition": {
"$s": {
"$ce-account": 5
}
}
},
{
"eventStreamId": "account-333-121-568-3245",
"eventNumber": 6,
"eventType": "Elixir.Bank.Events.MoneyDeposited",
"data": "{\"timestamp_utc\":1506096699,\"amount\":22,\"account_id\":\"333-121-568-3245\"}",
"metadata": "{\"$correlationId\":\"fd6ac525-a7e6-450d-9781-f280245800f6\"}",
"readerPosition": {
"$s": {
"$ce-account": 6
}
}
}
]
}
Any tips on how to hack it to work?
Hey!
I'm really just getting started along the process of evaluating CQRS as a pattern for us, and I'm curious if you can speak to how this works in a clustered environment. Using something like Postgres for the actual event log solves the issue of having a consistent view of events but how does that work for any in memory state held?
We're looking at an upcoming project that seems to be a good fit for a CQRS/ES approach but I confess I'm still at the point where it makes sense as a diagram on someone's slide deck but I am unclear about what a project's code actually looks like.
I'll just leave this here, as I personally would find this useful. Feel free to close it if it's irrelevant.
It would be nice to support a simple catch-up-subscription, per "category", with client persistent state.
For example, I would like to project all events from "$ce-cars" category.
Using the current competing consumers, it is hard to make commanded integrate with other non-commanded systems using ES.
For example, I would like system A to publish events to "cars-123" stream. System A is using HTTP API. Then system B (with commanded) would subscribe to "$ce-cars" and project all the cars into a sql table.
I guess I'm not sure what this will involve, because using Extreme is already sufficient, but this is just a thought. Maybe you can make an even nicer abstraction.
When using the Commanded.Commands.Router
to configure command dispatch, it would be helpful to validate that the command, handler, and aggregate modules exist.
In the following example, if the two module alias
statements are forgotten then command dispatch will fail (UndefinedFunctionError
module is not available).
defmodule MyApp.BankRouter do
use Commanded.Commands.Router
alias MyApp.OpenAccount
alias MyApp.BankAccount
dispatch OpenAccount, to: BankAccount, identity: :account_number
end
Instead, this should cause a compilation failure, prompting the user that the modules cannot be found.
Commanded is currently restricted to run on a single node only.
Using a library such as Swarm would help to run on a cluster of nodes.
Must consider how to deal with split brain scenarios.
When we start a new aggregate, and kill it, supervisor immediately restart it with the last state, replaying all the events. But when sending a new Command to the same aggregate UUID, a new aggregate is created, and updated, so we end up with two aggregates on memory, one unusable (and consumed resources to be restarted) and a new fresh one, that was started by the last command request. Using :observer.start can help to track their state and see what is going on. Pls, let me know if you need more details to reproduce it.
Event handlers and process managers are expected to return :ok
on successful handling of an event. They return {:error, reason}
on failure. In this case the event handler should be retried.
Potential error handling options: retry, skip, ignore, stop, or park event.
An aggregate process that has been idle for a configured period of time should be shutdown.
GenServer
s have built in support for timeouts. You include a timeout value when replying to a handle_call
or handle_cast
request. If no subsequent requests are made to your GenServer in the timeout period it sends :timeout
to handle_info
. You can then shutdown the process.
Allow an aggregate to return a tagged success tuple - {:ok, events}
- containing the resultant events from a command function.
Remove dependency on eventsourced
library and allow Commanded to use any Elixir module that follows these conventions.
Command functions accept the aggregate root state and the command. They return the events that have resulted. This may be zero (nil
or []
), one, or more.
defmodule Account do
defstruct [account_number: nil, balance: 0]
defmodule Deposit, do: defstruct [:account_number, :amount]
defmodule MoneyDeposited, do: defstruct [:account_number, :amount]
# command function
def execute(%Account{}, %Deposit{amount: amount}) when amount > 0 do
%MoneyDeposited{amount: amount}
end
# state mutator
def apply(%Account{balance: balance} = account, %MoneyDeposited{amount: amount}) do
%Account{account | balance: balance + amount}
end
end
Tracking raised events, aggregate version, and calling the apply
state mutator will be handled by theCommanded.Aggregates.Aggregate
process.
The expected execute
and apply
functions could be specified as a behaviour (e.g. Commanded.Aggregates.AggregateRoot
).
The current command router dispatch
registration can be repetitive when multiple commands are handled by the same command handler.
defmodule Router do
use Commanded.Commands.Router
dispatch MyApp.Commands.Foo, to: CommandHandler, aggregate: Aggregate, identity: :uuid
dispatch MyApp.Commands.Bar, to: CommandHandler, aggregate: Aggregate, identity: :uuid
dispatch MyApp.Commands.Baz, to: CommandHandler, aggregate: Aggregate, identity: :uuid
end
Following the Elixir convention for alias/import/require/use to reference multiple modules at once the registration would become more succinct.
defmodule Router do
use Commanded.Commands.Router
alias MyApp.Commands.{Foo, Bar, Baz}
dispatch [Foo, Bar, Baz],
to: CommandHandler, aggregate: Aggregate, identity: :uuid
end
Replace registry.ex with the new Registry module in Elixir 1.4.
Ensure that the field on a command to uniquely identify the aggregate instance is a string.
Use inspect
to convert the given identity field, from a command, to a string.
First of all, I wanted to say that this library is great. Thank you for your hard work. It took my codebase from over 800 LOC to about 100!
I see that a namespaced event is serialized like so:
Elixir.Foo.Events.BurgerRequested
Let's say I have a different node, with a module called Bar
which has a re-difinition of that event in the module Bar.Events.BurgerRequested
.
In this case, if I were to start commanded on that second note, would it expect Elixir.Bar.Events.BurgerRequested
?
If yes, is there a way to specify how these things are mapped?
P.S.: If this is not possible, it's not a big deal to parse these things myself. Aggregates are really the biggest PITA to implement, so this would just be a cherry on top for PMs or Projections.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.