NB this is long and needs lots of editing.
Storage model (see source)
Batches
Events are stored in immutable batches consisting of:
p
artitionKey: string
// stream identifier
i
ndex: int64
// base index of this batch (0
for first event in a stream)
id
: string
// same as i
(CosmosDb forces every doc to have one, and it must be a string
)
e
vents: Event[]
// (see next section) typically there is one item in the array (can be many if events are small, for RU and perf-efficiency reasons)
ts
// CosmosDb-intrinsic last updated date for this record (changes when replicated etc, hence see t
below)
Events
Per Event
, we have the following:
c
ase - the case of this union in the Discriminated Union of events this stream bears (aka Event Type)
d
ata - json data (CosmosDb maintains it as actual json; it can be indexed and queried if desired)
m
etadata - carries ancillary information for an event
t
- creation timestamp
Tip Batch
The tip is readable via a point-read, as the id
has a fixed known value (-1
). It uses the same base layout as an Event-Batch, but adds the following:
_etag
: CosmosDb-managed field updated per-touch (facilitates NotModified
result, see below)
id
: always -1
so one can reference it in a point-read GET request and not pay the cost and latency associated with a full query
u
: Array of _unfold_ed events based on a point-in-time state (see State, Snapshots, Events and Unfolds, Unfolded Events and unfold
in the programming model section)
State, Snapshots, Events and Unfolds
In an Event Sourced system, we typically distinguish between the following basic elements
-
Events - Domain Events representing actual real world events that have occurred, reflecting the domain as understood by domain experts - see Event Storming. The customer favorited the item, the customer saved SKU Y for later, $200 got charged with transaction id X.
-
State - derived representations established from Events. A given set of code in an environment will, in service of some decision making process, interpret those events as implying a particular state in a model. If we change the code slightly or add a field, you wouldn't necessarily expect a version of your code from a year ago to generate you equivalent state that you can simply blast into your object model and go. (But you could easily hold a copy in memory as long as your process runs)
-
Snapshots - A snapshot is an intentionally roundtrippable version of a State, which can be saved and restored. Typically one would do this to save the cost of loading all the Events in a long running sequence of Events to re-establish the State. The EventStore folks have a great walkthrough on Rolling Snapshots.
-
Projections - the term projection is heavily overloaded, meaning anything from the proceeds of a SELECT statement, the result of a map
operation, an EventStore projection, an event being propagated via Kafka (no, further examples are not required).
.... and:
- Unfolds - the term
unfold
is based on the FP function of that name, bearing the signature 'state -> 'event seq
. When using Equinox.Cosmos
, the unfold produces projections, represented as _event_s to snapshot the state at a position in the stream.
Generating and saving unfold
ed events
Periodically, along with writing the events that a decision function yields to represent the implications of a command given the present state, we also unfold
the resulting state'
and supply those to the sync
function too. The unfold
function takes the state
and projects one or more snapshot-events which can be used to reestablish the same state we have thus far derived from watching the events on the stream. Unlike normal events, unfold
ed events do not get replicated to other systems, and can also be thrown away at will (we also compress them rather than storing them as fully expanded json).
Reading from the Storage Model
Most reads request tip with anIfNoneMatch
precondition citing the `etag it bore when we last saw it, which, when combined with a cache means one of the following happens when a reader is trying to establish the state of a stream prior to processing a Command:
NotModified
(depending on workload, can be the dominant case) - for 1
RU, minimal latency and close-to-0
network bandwidth, we know the present state
NotFound
(there's nothing in the stream) - for equivalently low cost, we know the state is initial
Found
- (if there are multiple writers and/or we don't have a cached version) - for the minimal possible cost (a point read, not a query), we have all we need to establish the state:-
i
: a version number
e
: events since that version number
u
: unfolded auxiliary events computed at the same time as the batch of events was sent (aka projections/snapshots) - (these enable us to establish the state
without further queries or roundtrips to load and fold all preceding events)
Building a state from the Storage Model and/or the Cache
Given a stream with:
{ id:0, i:0, e: [{c:c1, d:d1}]},
{ id:1, i:1, e: [{c:c2, d:d2}]},
{ id:2, i:2, e: [{c:c2, d:d3}]},
{ id:3, i:3, e: [{c:c1, d:d4}]},
{ id:-1,
i:4,
e: [{i:4, c:c3, d:d5}],
u: [{i:4, c:s1, d:s5Compressed}, {i:3, c:s2, d:s4Compressed}],
_etag: "etagXYZ"
}
If we have state4
based on the events up to {i:3, c:c1, d: d4}
and the index document, we can produce the state
by folding in a variety of ways:
fold initial [ C1 d1; C2 d2; C3 d3; C1 d4; C3 d5 ]
(but would need a query to load the first 2 batches, with associated RUs and roundtrips)
fold state4 [ C3 d5 ]
(only need to pay to transport the tip document as a point read)
- (if
isStart (S1 s5)
= true
): fold initial [S1 s5]
(point read + transport + decompress s5
)
- (if
isStart (S2 s4)
= true
): fold initial [S2 s4; C3 d5]
(only need to pay to transport the tip document as a point read and decompress s4
and s5
)
If we have state3
based on the events up to {i:3, c:c1, d: d4}
, we can produce the state
by folding in a variety of ways:
fold initial [ C1 d1; C2 d2; C3 d3; C1 d4; C3 d5 ]
(but query, roundtrips)
fold state3 [C1 d4 C3 d5]
(only pay for point read+transport)
fold initial [S2 s4; C3 d5]
(only pay for point read+transport)
- (if
isStart (S1 s5)
= true
): fold initial [S1 s5]
(point read + transport + decompress s5
)
- (if
isStart (S2 s4)
= true
): fold initial [S2 s4; C3 d5]
(only need to pay to transport the tip document as a point read and decompress s4
and s5
)
If we have state5
based on the events up to C3 d5
, and (being the writer, or a recent reader), have the etag: etagXYZ
, we can do a HTTP GET
with etag: IfNoneMatch etagXYZ
, which will return 302 Not Modified
with < 1K of data, and a charge of 1.00
RU allowing us to derive the state as:
Programming model
In F#, the Equinox programming model involves, per aggregation of events on a given category of stream:
'state
: the state required to support the decision or query being supported (not serializable or stored; can be held in a .NET MemoryCache
)
initial: 'state
: the implied state of an empty stream
'event
: a discriminated union representing all the possible Events from which a state be evolve
d (see e
and u
in the data model). Typically the mapping of the json to an 'event
c
ase is driven by a UnionContractEncoder
fold : 'state -> 'event seq -> 'state
: function used to fold events (real ones and/or unfolded ones) into the running 'state
evolve: state -> 'event -> 'state
- the folder
function from which fold
is built, representing the application of the delta the 'event
implies for the model to the state
decide: 'state -> 'command -> event' list
: responsible for (in an idempotent manner) interpreting a command
in the context of a state
as the events
that should be written to the stream to record the decision
When using the Equinox.Cosmos
adapter, one will typically implement two further functions in order to avoid having to have every 'event
in the stream having to be loaded and processed in order to build the 'state
(versus a single cheap point read from CosmosDb to read the tip):
unfold: 'state -> 'event seq
: function used to render events representing the state which facilitate quickly re-establishing a state
without needing to go back to the first event that occurred on a stream
isStart: 'event -> bool
: predicate indicating whether a given 'event
is sufficient as a starting point e.g.
High level Command Processing flow
When running a decision process, we thus have the following stages:
- establish a known
'state
(based on a given position in the stream of Events)
- let the
decide
function look at the request/command and yield a set of events (or none) that represent the effect of that decision in terms of events
- update the stream _contingent on the stream still being in the same State/Position it was in step 1
3a. if there is no conflict (nobody else decided anything since we decided what we'd do), append the events to the stream (record the new position and etag)
3b. if there is a conflict, take the conflicting events that other writers have produced since step 1, fold
them into our state, and go back to 2 (the CosmosDb stored procedure sends them back immediately at zero cost or latency)
- if it makes sense for our scenario, hold the state, position and etag in our cache. When a reader comes along, do a point-read of the tip and jump straight to step 2 if nothing has been modified.
Sync stored procedure high level flow (see source)
The sync
stored procedure takes a document as input which is almost identical to the format of the tip batch (in fact, if the stream is found to be empty, it forms the template for the first document created in the stream). The request includes the following elements:
expectedVesion
: the position the requestor is basing their proposed batch of events on (no, an etag
would not be relevant)
e
: array of Events (see Event, above) to append if the expectedVersion check is fulfilled
u
: array of unfold
ed events which supersede items with equivalent c
ase values (aka snapshots, projectiosn)
maxEvents
: the maximum number of events to record in an individual batch. For example:
- if
e
contains 2 events, the tip document's e
has 2 documents and the maxEvents
is 5
, the events get merged into the tip
- if
maxEvents
is 1
, the tip gets frozen as a Batch
, and the new request becomes the tip (as an atomic transaction on the server side)
- (PROPOSAL/FUTURE)
thirdPartyUnfoldRetention
: how many events to keep before the base (i
) of the batch if required by lagging u
nfolds which would otherwise fall out of scope as a result of the appends in this batch (this will default to 0
, so for example if a writer says maxEvents 10
and there is an u
nfold based on an event more than 10
old it will be removed as part of the appending process)
Example
The following example is a minimal version of the Favorites model, with shortcuts for brevity (yes, and imperfect performance characteristics):
(* Event schemas *)
type Item = { id: int; name: string; added: DateTimeOffset }
type Event =
| Added of Item
| Removed of itemId
| Compacted of items: Item[]
(* State types *)
type State = Item list
let contains state id = state |> List.exists (fun x -> x.id=id)
(* Folding functions to build state from events *)
let evolve state event =
match event with
| Compacted items -> List.ofArray items
| Added item -> item :: state
| Removed id -> List.filter (not (contains state id))
let fold state events = Seq.fold evolve state events
(* Decision Processing *)
type Command =
| Add item
| Remove itemId: int
let decide command state =
match command with
| Add (id, name, date) ->
if contains id then [] else [Added {id=id; name=name; date=date}]
| Remove id ->
if contains id then [Removed id] else []
(* Equinox.Cosmos Unfold Functions to allow loading without queries *)
let unfold state =
[Event.Compacted state]
let isOrigin = function
| Compacted _ -> true
| _ -> false