Git Product home page Git Product logo

Comments (22)

nanov avatar nanov commented on June 25, 2024 3

Because ordering of events across different aggregates (then partitions) is not important we can read it in parallel by denormalizers.

But it is, readmodels can be cross-aggregate and even cross-context, loosing the order here may be vital, depends on design. Same thing applies for sagas.

EDIT

We need to think how to avoid such situations, agree, that's a question. Probably we need to save state to the redis and only if it was saved with success then we can send the event to kafka, but if sending was failed, we should undo changes in redis...

Thats similiar to what happens now with published field, except that right now it's published does not block the aggreagte ( worker ) in any way, the aggregate does not depend on the event publishing as he is going to get the stream out of the single-source-of-truth with the next command.

from node-eventstore.

nanov avatar nanov commented on June 25, 2024 1

this is what i ment. :)

from node-eventstore.

albe avatar albe commented on June 25, 2024 1

Don't use Kafka as EventStore. Kafka does not provide consistency guarantees (i.e. via optimistic concurrency check). It will completely mess up your data if you have multiple concurrent processes rebuild an aggregate and create new events (which you will if you just process http requests). See https://dyllanwli.github.io/2019/02/10/Using-Kafka-as-a-Event-Store/#:~:text=Kafka%20does%20not%20support%20optimistic,writer%20at%20the%20application%20level.
Adding an additional (shared) snapshot per event will only lead to you having to implement a distributed transaction protocol to keep both in sync. This is super hard. Avoid yourself the pain.

The same is btw true for the current "published" flag on the event and the concept of publishing (as @nanov mentioned), with the difference that you only end up with duplicated messages (at-least-once) that you need to handle, i.e. idempotency - don't use it, it overcomplicates things in order to have consistent consumers, which you do want. Have your consumers only query the EventStore from their own last persisted position (pull). See https://twitter.com/ih8nickfinding/status/1285490013854208000

why we can't create partitions per aggregateId?

You can and it's the best way to do something like that in Kafka (don't), but to rebuild a single aggregate (or a projection thereof), you need to fetch all events from all same aggregate types (=topic) and filter only the ones matching your aggregateId. This will quickly become a bottleneck or PITA.
See https://dyllanwli.github.io/2019/02/10/Using-Kafka-as-a-Event-Store/#:~:text=Lack%20of%20entity%20isolation

We need to think how to avoid such situations, agree, that's a question. Probably we need to save state to the redis and only if it was saved with success then we can send the event to kafka, but if sending was failed, we should undo changes in redis...

You noticed the issue and are uninentionally starting to implement a distributed transaction protocol (wrongly at first of course). You need to do this at every single arrow in your architecture plan where you move data outside a single thread of execution. Your architecture will be much more complex if you deal with it. Keep data single source and immutable (=read-only). That's the whole idea of EventSourcing really.

from node-eventstore.

adrai avatar adrai commented on June 25, 2024

Not right now. But feel free to submit a PR :-)

from node-eventstore.

imissyouso avatar imissyouso commented on June 25, 2024

Seems idea of using Kafka as event store is really good. @adrai what is your personal opinion about this feature? May be you see obvious underwater rocks?

from node-eventstore.

adrai avatar adrai commented on June 25, 2024

Never used it 🤷‍♂️

from node-eventstore.

nanov avatar nanov commented on June 25, 2024

Using Kafka as an eventstore is rather awful idea.

Each aggregate ( not aggregate type, but aggregate, ie id ) is a separate ordered stream, that needs to be streamed with in order to rebuild the aggregate state ( ie before each command is executed ). Solving this with Kafka is not impossible but highly complex and unperformant ( you have two options, one partition for all events and then streaming them all and filtering on each command, or one partition per aggregate instance, which will get unmanageable and resource intensive over time ), most important Kafka is not designed for this.

That does not mean that Kafka is a bad choice for the messaging layer between the parts of your system ( which is out of scope of this library ).

from node-eventstore.

imissyouso avatar imissyouso commented on June 25, 2024

@nanov hi,
we don't need to rebuild aggregate state every command, we can cache it in internal DB, like Kafka community suggests to do it using Kafka Streams (RocksDB) or something like this.

Kafka as event store gives us possibility to use it as a Bus and as a Single event store at the same time. We do not need to do replays of events at cold start of new services anymore (for example by requesting it from bus from domain services), we just read events from Kafka with offset 0.
More advantages are described here https://www.confluent.io/blog/event-sourcing-cqrs-stream-processing-apache-kafka-whats-connection/

Also see https://blog.softwaremill.com/event-sourcing-using-kafka-53dfd72ad45d
and sceptical article https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c - comments are the most important there.

Furthermore, I already implemented the very first version of Kafka evenstore for cqrs-domain lib and seems it works.

1_jyahaf0wO27PI52k_VqpQw

from node-eventstore.

nanov avatar nanov commented on June 25, 2024

I do get the advantages regarding the other services ( saga/demoralizer ).
What you basically do is a snapshot per each event, this could work and depending on the use case it may even be a good idea, can you assure exactly-one-semantic this way?

from node-eventstore.

imissyouso avatar imissyouso commented on June 25, 2024

@nanov
Kafka provides exactly-once semantics between producer and broker since version 0.11 out of the box (https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/), it means that message will be saved in the storage only once without duplication and events losses.
As I understand for providing full exactly-once semantic between producer and consumer we should use their Transaction API on consumer side too, it is fully implemented under the hood of Kafka Streams Java library - I don't think that I will find port for this written on JS (I don't trust to https://github.com/nodefluent/kafka-streams :) ).
Summarizing, in our scenario, we can be sure that Kafka will save our event exactly-once, but our consumers can receive it at-least-once.
But is that really bad?

  1. Duplication of events in Kafka event store will never harm to final aggregate state (which cached in Redis, my implementation) since events are right ordered (Kafka guarantees it in bounds of each partition)
  2. On Denormalizers/Sagas side we use RevisionGuard mechanism for deduplication.

Does it mean that at-least-once is totally fit our requirements?

from node-eventstore.

imissyouso avatar imissyouso commented on June 25, 2024

proof-of-concept https://github.com/imissyouso/domain-cqrs-kafka-eventstore
I am going to finish all todo steps listed in readme in nearest future.

from node-eventstore.

nanov avatar nanov commented on June 25, 2024

Again, all that is known, still you are loosing the main idea of eventstreams here and compensating for it with something like a snapshot storage.

What happens if you want to review some events?
What happens if you want to change how eventhandlers behave?

All this are pretty trivial right now and come out-of-the-box, in your case you'll have to write some aggregate rebuild logic.

I definitely think that kafka is suitable for broker here ( especially for delivering commands to aggregates, as it can allow you huge amount of safe ordered parallelism ) but as an event store, in most cases, i would stick to something built for that.

from node-eventstore.

imissyouso avatar imissyouso commented on June 25, 2024

@nanov
thank you for reply, but I feel that I didn't understand:

What happens if you want to change how eventhandlers behave?

Could you provide example?
Thanks in advance for your time.

from node-eventstore.

nanov avatar nanov commented on June 25, 2024

Its pretty simple, let's say you have an event ProductDescriptionChanged, your first handler implementation just changes the description property on the aggregates state, after some time you decide that you want to also represent the timestamp of the change in your aggregate state in order to do some other business logic validation.

The simple approach would be just to change the way the handler behaves and then by next rebuild you would have everything in the state, by your approach you would have to fire some custom rebuild script on aggregate in order to update your DB.

from node-eventstore.

imissyouso avatar imissyouso commented on June 25, 2024

@nanov
thanks for your reply again, I think you did not understand my idea, to clarify I draw this image
Untitled Diagram (5)
so I still do not see any fundamental logical contradictions which can hinder to implement these points

What happens if you want to review some events?
What happens if you want to change how eventhandlers behave?

from node-eventstore.

imissyouso avatar imissyouso commented on June 25, 2024

also, such approach solves our issue thenativeweb/node-cqrs-domain#153
Here we can simple use Redis aggregate state store for making unique fields validations at 'preCondition' state because updating aggregate state in redis is processed in bounds of aggregate lock as a last step of command processing.

from node-eventstore.

nanov avatar nanov commented on June 25, 2024

I think you don't get how the current, or any classical cqrs/es, implementation works.

The fundamental difference here is that you don't have a aggregate state store, state is recreated before each new command, welcome to event-sourcing.

I am not saying that your implementation cannot work or has no uses - but it is different you remove one component (es) but introduce a two new ones (kafka, agrgegate state store) and additionally you have to be absoutliey sure that both are 100% synchronized before handling the next command on the same aggregate.

Once getting it right, you will benefit from faster writes, which cqrs/es is not about, and loose many of the benefits that rebuilding the state out of stream before handling a new command gives you.

Additionally, introducing such a heavy stack one comes to the question if using getEventStore isn't a better fit here as it gives you everything from all worlds.

from node-eventstore.

imissyouso avatar imissyouso commented on June 25, 2024

@nanov

Additionally, introducing such a heavy stack one comes to the question if using getEventStore isn't a better fit here as it gives you everything from all worlds.

yes, I already use getEventStore to inject my own event store implementation

var domain = require('cqrs-domain')({
  	//...
	eventStore: createKafkaEventStore({
		ackTimeout: 3000, // when we save events to kafka we wait when we will receive them in consumer and only then saving considered completed. If consumer doesn't received it then we throw exception.
		client: {kafkaHost: 'kafka:9092'},
		sources: [
			//offset: 0
			{topic: 'events', partition: 0}
		],
		// See https://github.com/SOHU-Co/kafka-node for parameters description
		consumer: {
			groupId: 'test',//consumer group id, default `kafka-node-group`
			// Auto commit config
			autoCommit: true,
			autoCommitIntervalMs: 5000,
			// The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued, default 100ms
			fetchMaxWaitMs: 100,
			// This is the minimum number of bytes of messages that must be available to give a response, default 1 byte
			fetchMinBytes: 1,
			// The maximum bytes to include in the message set for this partition. This helps bound the size of the response.
			fetchMaxBytes: 1024 * 1024,
			// If set true, consumer will fetch message from the given offset in the payloads
			fromOffset: true,
			// If set to 'buffer', values will be returned as raw buffer objects.
			encoding: 'utf8',
			keyEncoding: 'utf8'
		}
	})
  	//...
});

from node-eventstore.

dmitry-textmagic avatar dmitry-textmagic commented on June 25, 2024

The fundamental difference here is that you don't have an aggregate state store, the state is recreated before each new command, welcome to event-sourcing.

but isn't it technically the same? What's the difference between:
a) recreating the aggregate state by picking up the last aggregate snapshot + all events went to the event store after the last snapshot;
and b) recreating the aggregate state by loading it from the aggregate state?

a) and b) solutions look exactly the same for me, except that "snapshots" are taken continuously on each event.

Events are stored eventually in Kafka in this case, of course. It's not a bus, it's an event store.

from node-eventstore.

imissyouso avatar imissyouso commented on June 25, 2024

also, Kafka can "compact" events which works similarly to snapshots (...almost, actually it merges similar events rather than making the real snapshot of aggregate) https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

from node-eventstore.

nanov avatar nanov commented on June 25, 2024

@dmitry-textmagic

but isn't it technically the same?

Well it's not the same ( snapshots are optional, configurable, and generally avoidable ), taking them out of the way you recreate and reapply event handlers before command, it really is everything except the same.

Plus, for snapshots you really don't care if they are created or not, you don't have to wait for them either handle failuers with them, as opposed that by you it is vital that the state store is updated before you can handle next command - otherwise you'll get really unexpected results.

end of the day - kafka is not an eventsotre for such long-living cqrs/es use-cases - it is great at streaming data in a partitioned manner, it is made for it.
Sure you can work it out to work like that, but I don't see either the gain nor either a real need, you don't solve atomicty or complexity problems you just switch them to another place. You either need to have one partition for all events, or partition per aggregateId - both approaches are suboptimal and have to scale dynamically. What if i want an ordered stream of ALL events for readmodel purposes, in such case you'll need to store those events in the same partition which will result in a not-scalabale solution.

from node-eventstore.

imissyouso avatar imissyouso commented on June 25, 2024

Plus, for snapshots you really don't care if they are created or not, you don't have to wait for them either handle failuers with them, as opposed that by you it is vital that the state store is updated before you can handle next command - otherwise you'll get really unexpected results.

the next command will be executed only when we are 100% sure that aggregate was updated, it solves on the code level - we execute callback in addEvents method only when received acks from redis/kafka, otherwise we throw timeout exception and command fails. If it was failed by timeout then there could be situations when we published event to kafka but due redis failing it didn't receive state update - by this way saved state will be inconsistent with kafka. We need to think how to avoid such situations, agree, that's a question. Probably we need to save state to the redis and only if it was saved with success then we can send the event to kafka, but if sending was failed, we should undo changes in redis...

What if i want an ordered stream of ALL events for readmodel purposes, in such case you'll need to store those events in the same partition which will result in a not-scalabale solution.

why we can't create partitions per aggregateId? Then we read it in manner - per aggregateId / per partition / per event denormalizer, for example: 10 partitions reads by 10 denomalizer instances. Because ordering of events across different aggregates (then partitions) is not important we can read it in parallel by denormalizers.

and have to scale dynamically

yes, how to do repartition is a good question.

from node-eventstore.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.