Comments (12)
I'm currently looking into preparing a PR but I'm not sure what would be the nicest solution from the API perspective. I wouldn't really want to break backwards compatibility for this small change, therefore I thought about some options, but none of them really looks good for me...
Option 1: Introduction of a "special case" case class that can be returned by the processEvent
function such that:
case class EventWithAggregateIds(e: Any, customDestinationAggregateIds: Set[String])
override final def onEvent = {
case payload if processEvent.isDefinedAt(payload) =>
val currentProcessedEvents = processEvent(payload)
if (lastSequenceNr > processingProgress)
processedEvents = processedEvents :+ currentProcessedEvents.map {
case EventWithAggregateIds(e, customDestinationAggregateIds) =>
createEvent(e, customDestinationAggregateIds)
case e =>
createEvent(e, lastHandledEvent.customDestinationAggregateIds)
}
}
While it probably works nicely I find the API design a bit "weird" and not very obvious.
Option 2: Introduce an additional method
case class EventWithAggregateIds(e: Any, customDestinationAggregateIds: Set[String])
def processEventWithAggregateIds: PartialFunction[Any, Seq[EventWithAggregateIds]] = // no-op default implementation
This would have to be handled as an additional possibility to process events and onEvent
would have to handle both. This solution feels overly complex for the use-case.
Option 3: Limit an EventsourcedProcessor to a fixed set of custom destination aggregate IDs for all emitted events by adding the following def to be implemented by subclasses:
def customDestinationAggregateIds: Option[Set[String]] = None // return Some(...) to use feature
Option 4: Provide an additional method to determine custom destination aggregate IDs for each event:
def customDestinationAggregateIds: PartialFunction[Any, Set[String]] = // no-op default implementation
While I feel that option 4 is the cleanest (whatever this means) it still feels weird to "ask" the user implementation twice for the information regarding one event. First: "process the event", then second: "tell me where you want it to go".
So... what's your opinion on this? Or do you maybe have other idea(s)?
from eventuate.
I see Option 5 (not backwards compatible!):
-
redefine
Process
to return for each event, custom destinations:type Process = PartialFunction[Any, Seq[(Any, Set[String])]]
(alternatively the tuple could be a dedicated type)
-
provider helper function that maps old style
processEvent
to new styleprocessEvent
:def keepCustomDestinations(process: PartialFunction[Any, Seq[Any]]): Process = process.andThen(_.map((_, lastHandledEvent.customDestinationAggregateIds)))
existing implementations would have to be changed by using this helper.
from eventuate.
I wouldn't really want to break backwards compatibility
I think we should actually break it for having a nice solution. provessEvent
should have type PartialFunction[Any, Seq[DurableEvent]]
which gives application logic the possibility to change other event metadata as well. Those metadata that must not be changed by application logic (like vector timestamp, ...) should be overwritten by the processor after processed DurableEvent
s have been generated by processEvent
.
from eventuate.
We could also provide a new subclass of the new EventsourcedProcessor
that restores old behavior by providing a processEvent: PartialFunction[Any, Seq[DurableEvent]]
method. In this case, the migration path is just using this new subclass instead of EventsourcedProcessor
.
from eventuate.
When comparing it to the stream-based processing solution, another alternative would be to keep processEvent
as is and have an additional preProcessDurableEvent: DurableEvent => DurableEvent
method that defaults to identity
but can be overridden by implementation classes if needed. In the stream-based solution this would be comparable to a processing stage before the DurableEventProcessor
. This would make the streaming and the actor-based solution equivalent which we should try to achieve anyway.
from eventuate.
This comes with the disadvantage that each event created by a single processEvent
call must have the same destinations, right?. So it makes it equivalent to streaming but less flexible than an ordinary persist
(orpersistOnEvent
) of an EventsourcedActor
!?!
from eventuate.
@volkerstampa that's right.
from eventuate.
I think we should actually break it for having a nice solution.
Agreed.
Those metadata that must not be changed by application logic (like vector timestamp, ...) should be overwritten by the processor after processed DurableEvents have been generated by processEvent.
I agree that there's certain fields like vector timestamp that should not be changed by application logic. However, I'd find it quite confusing to have an API that first allows an implementor to change this metadata but which will be overwritten afterwards. Maybe it would be an option to provide "intermediate" format / case class that just contains those fields allowed to be modified by application logic and have the framework convert it to a full blown DurableEvent.
from eventuate.
We already planned that for later versions of Eventuate that we are currently prototyping. There should be something like EventMetadata
for that purpose. But this is a mid-term goal. For a short-term solution, the proposed one is a good compromise IMO.
from eventuate.
the proposed one is a good compromise IMO
Which one do you refer to? processEvent: PartialFunction[Any, Seq[DurableEvent]]
or preProcessDurableEvent: DurableEvent => DurableEvent
?
For the preProcessDurableEvent
solution I assume the method would be executed after processEvent
and the its input would be the event produced by processEvent
, right?
from eventuate.
I thought about having preProcessDurableEvent
before calling processEvent
but I like your proposal more. It is actually a postProcessDurableEvent
and would even solve to issue raised by @volkerstampa. However, it diverges from what is possible with the stream-based solution.
from eventuate.
OTOH the existing processEvent
and preProcessDurableEvent
can be implemented in terms of processEvent: PartialFunction[Any, Seq[DurableEvent]]
but that's something we can also do/refactor later.
from eventuate.
Related Issues (20)
- Emit progress data with processed messages
- Events created by EventsourcedProcessor lack system timestamp
- Common event processing model
- LogEventDispatcherSpec failed
- Improve logging when CassandraEventLog stops itself due to errors when writing
- Add documentation on how to setup akka remoting with TLS
- Support disaster recovery with limited event recovery
- Document that RBMH Eventuate is not Eventuate.io
- Help Users Navigate User Guide Code HOT 15
- Persist on event may persist events again in case of disaster recovery HOT 3
- Drop support for optional persistOnEventId in PersistOnEventRequest
- Upgrade to Akka 2.5
- Publish .proto files
- Potentially Inconsistent EventLogClock snapshot in Cassandra backend in case of disaster recovery
- Is this thing still on? HOT 2
- Adapt replication batch size dynamically HOT 1
- Using EventsourceProcessors in combination with Replication may lead to loss of processed events HOT 1
- Memory consumption of `LeveldbNumericIdentifierStore` HOT 2
- Replay from Cassandra stops prematurely if there are large sequence number gaps covering empty partitions HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
š Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. ššš
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ā¤ļø Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from eventuate.