thenativeweb / node-eventstore Goto Github PK
View Code? Open in Web Editor NEWEventStore Implementation in node.js
Home Page: http://eventstore.js.org/
License: MIT License
EventStore Implementation in node.js
Home Page: http://eventstore.js.org/
License: MIT License
streamId & aggregateId difference?
and document detailed please.
I think ppl (like me) who only know the basics, but never used an event storage in a project, won't understand how to use this lib, because the documentation is too low quality. It is just a bunch of examples without any indication why and when we should use the actual function.
I'm not sure if this is intentional feature or bug...
If I create two events for the same aggregateId at the same time using mongo store (haven't tried others), they are both commited successfully with the same revisionId. I would expect the store to throw for the second event so that it can retry (revalidate on the updated state and then commit if successful).
To illustrate this, given a bank account balance $5. If two commands come at the same time to withdraw $4 and withdraw $3, they will both validate against the perceived state ($5) and thus both will dispatch events, leading to the account being in negative balance.
Expected behavior would be for the first event to be commited to win, and the second to be forced to retry. In the above example, if the 4$ was withdrawn first, second command will be forced to retry against the new state ($1), not pass validation, and reject the command.
I imagine this could be worked around by using a synthetic eventId (e.g aggregateId-expectedRevisionId
), but I would think the store should handle this behavior, allowing any arbitrary eventId (eg uuid).
Please correct me if my understanding is incorrect.
I started a conversation in the issue #94 to talk about overall performance improvements.
I was talking about a feature, where the snapshots are cleaned automatically.
So I started an implementation here https://github.com/rehia/node-eventstore/tree/clean-snapshots, where I implemented the snapshot cleaning feature for in memory and redis stores.
I still need to make some wider tests, but I also wanted to have your opinion on this feature.
I basically added a new option : max snapshots count, which indicates how many snapshots you want to keep at max. So if the value is 5, and you have already 10 snapshots, when you will create the next one, the 6 oldest will be deleted. By default, of course, none is deleted, as it is actually the case.
What do you think about this feature ? Is this the way you think snapshot cleaning can be done ?
Thanks for your feedback
I had a completely other problem, however, I encounterned a possible fault in the mongodb store L389:
this.events.update({'_id' : id}, updateCommand, callback);
The corresponding function in the mongodb driver L1084 expects another signature:
Collection.prototype.update = function(selector, document, options, callback) {
No clue, why everything is still working... the function from mongodb does not handle this correctly, but returns a Promise instead of calling the callback.
Calling getNewEventStream
, on an instance of Store
:
var eventstore = require('eventstore');
var es = eventstore.createStore();
es.getNewEventStream("testid");
Throws:
ReferenceError: self is not defined
at Object.Store.getNewEventStream (.../node_modules/eventstore/lib/eventStore.js:320:32)
...
Version:
eventstore 0.7.6
I configured the MongoDB storage provider to use a different dbName, following the readme example. The new db is correctly created and used for event storage, but an "eventstore"
db is still created each time I start the server.
Library versions:
eventstore 0.7.6
eventstore.mongodb 0.7.5
Test code:
var eventstore = require('eventstore');
var storage = require('eventstore.mongodb');
var es = eventstore.createStore();
storage.createStorage({
host: 'localhost',
port: 27017,
dbName: 'testEventStore',
eventsCollectionName: 'events',
snapshotsCollectionName: 'snapshots'
}, function(err, store) {
es.configure(function() {
es.use(store);
});
es.start();
});
It seems that events never reach eventdispatcher when using mongodb. I've traced the execution to line 101 of eventstore.js:
self.dispatcher.send({ action: 'addUndispatchedEvents', payload: JSON.stringify(evts) });
then execution goes into child_process.js and gets swallowed, no errors and the publisher isn't called either.
Here's some code to reproduce:
var eventstore = require('eventstore'),
storage = require('eventstore.mongoDb');
var publisher = {
publish: function(evt) {
console.log('info: ' + new Date().toLocaleString() + ': ' + 'event published');
//never get's here.
}
}
store = storage.createStorage({dbName:'mydb'});
store.connect(function(err) {
var es = eventstore.createStore({logger: 'console'});
es.configure(function() {
this.use(publisher);
this.use(store);
}).start();
es.getEventStream('1', 0, -1, function(err,stream) {
stream.addEvent({id: '1'});
stream.commit();
});
})
Hey,
I was wondering why I didn't get any error messages using node-eventstore with the azuretable impl without providing access keys. Well the default access keys in azuretable.js do work :) I guess you don't want people go to town on that storage account?
Cheers
Michael
Is there a plan on introducing Kafka as one of the event store as well?
Could I query the event data through event.payload data such as event.payload.parent?
Could you show me an example?
store.findEvents({payload:{parent:["parentA","parentB"]}} ... ) ?
Is there any proper way to clean the store?
If you start the EventSotre on the test setup method. What would you do on the tearDown?
Thanks for this work.
In case of a event add failure (after the transaction is added but before repairFailedTransaction
), clients would read from an inconsistent event stream; this can lead to inconsistent application state witch can have unpredictable consequences.
In #94, I was talking about some ways to improve redis implementation performance, mostly when there are a large number of events (or snapshots).
The problem with actual the implementation is the scan
which usually need to scan every events (or snapshots) of an aggregate, and then make the necessary calculations to only take needed events.
I have made a quick test with my production data.
I have an aggregate with +46K events with a total amount of +117K
I tried to scan all the keys (only scanning, no get
or mget
) using 2 strategies:
scan
on all the keys. It took 6181ms.zrange
on all the keys. It took 415msI think that sorted sets could be a good option to index aggregates keys, at least for 2 reasons:
This means that the database need to be indexed at first, and the index maintained. So whenever a client connects, we need to check if the store has been indexed, and if it's not the case, scan all the aggregates, and scan for their events and snapshots keys, and finally index the keys (events and snapshots separated obviously). Once done, each time an event is added, its key should be added to the corresponding sorted set.
What do you think about this solution? I'll try to work on an implementation. But if you have any remarks, objections or suggestions, don't hesitate!
Adriano,
Current implementation fails when I have more than one version of eventstore module in the project's node-modules tree. If the project makes use both of cqrs-domain and my-custom-store, which both require eventstore, then the project's node-modules tree will look like this:
Then this check:
if (options.type && options.type.prototype instanceof Base) {
return options.type;
}
will always come back negative, as Base is defined in eventstore (1), while options.type is a sub-class of Base from eventstore (2)
How do you feel about replacing it with this:
if (options.type && typeof options.type === 'function') {
return options.type;
}
Best,
-Leo
This issue popped up during testing of my domains with cqrs-domain
, where (for performance and environment set up reasons) I use an in-memory
store instead of an external DB.
The InMemory database, when retrieving events, returns a direct reference to the events that were stored in memory. By returning this direct reference, anything using the data can mutate the events that are in the memory store. This poses a problem when you have anything handling the event that, for some reason, manipulates the data. And that in turn violates the immutability principle of event data.
One example of this would be an categoryAdded
and articleAdded
event handler in an aggregate such as this:
let aggregate = {
categories: {
cat1: {
id: 'cat1',
articles: {
art1: {
id: 'art1',
name: 'article'
}
}
}
}
}
If in the above case we have an event that adds cat1
(categoryAdded
), and another that adds art1
to cat1
(articleAdded
), the event handler for articleAdded
will inadvertently mutate the payload of categoryAdded
, since the event handler simply does something like cat1.articles[article.id] = article
.
This is of course not a problem for the other database drivers, since they always return a copy of the original data as stored in the database, so mutation of the originally stored data is never an issue.
A workaround could be that we should always use deep clones when manipulating the aggregate, but this is cumbersome, prone to dev error, and otherwise impacts perf negatively. It doesn't make sense to do it this way.
I have a fix ready - I'll prep a PR to resolve this for the InMemory database.
Hi, I'm trying to POC this for storing events - however, I'm finding that I'm not able to get the init() call to complete.
I've got the initialisation code for the es in a file that is pulled into my index.js by a require statement. I can see the Init() line being called and it waits on the process.nextTick(function() { statement.
How should I be initialising the eventstore? My index.js spins up a REST api in express and listens to a port.
Hi, not an issue, but I couldn't find a better way to ask this question. Is there any way that I can get only the undispatched items per streamId? It works in the "add - commit" scenario but not if I do not want to add an item. the getUndispatched function returns ALL undispatched item for the es and I could loop over this, but I guess this is not the most efficient way.
Thank you very much!
Hello guys,
I have a basic view builder like below
module.exports = require('cqrs-eventdenormalizer').defineViewBuilder({
name: 'detailsUpdated',
id: 'payload.id',
context:'workflowContext',
aggregate:'approval'
}, function(data, vm) {
vm.set(data);
debug('saving detailsUpdated', JSON.stringify(data, null, '\t'));
});
The debug statement is fired repeatedly on the console with the time (in ms) it takes for action.
It seems to be re-saving the document repeatedly in a loop. The issue is quite intermittent.
I have to kill the process to stop it from looping.
// log output on console
mymodule saving detailsUpdated +390ms { // JSON I removed }
mymodule saving detailsUpdated +827ms { // JSON I removed }
mymodule saving detailsUpdated +252ms { // JSON I removed }
....
...
I need to fix the issue before the product go-live. Please help.
While I was trying to use snapshots with redis, I ran into an issue which causes bugs when reloading my stream.
In some cases, and often under heavy load, redis store can produce multiple events with the same revision. So at a moment, the number of events (eg. 15) is higher than the current revision (eg 13, event revision 5 and 6 are "duplicated").
The problem starts when loading a snapshot and the corresponding events. The last snapshot is created with revision 13. And when the snapshot is loaded, events with revision above 13 are loaded with it, so mainly the 2 latest events (revisions 12 and 13), causing an inconsistent state when reloading from snapshot and events.
The problem is caused by the fact that the streamRevision
of an event is based on the last loaded event, here https://github.com/adrai/node-eventstore/blob/663fb9fc3d0765ae22196c1bff3ea9aa8e79b305/lib/eventstore.js#L402-L413
Because this operation is not atomic, under heavy load or with lots of events, another command on the aggregate can cause the load of the aggregate with the same events, before the first one had enough time to commit. Both generated events finally get the same revision.
The problem is not directly in redis store. And I don't know if other stores face the same issue (using transactions ?). There is no transaction in redis, only atomic operations using MULTI
(with no dependency on intermediate results) or Lua scripts (when needing to execute an operation depending on the previous result).
I open this ticket to open the discussion on this. I was thinking about a solution, but only for redis: instead of calling directly a SET
, we could create a Lua script (easily deployed with redis driver, with all the caching stuff) which could INCR
a value corresponding to the actual revision of an aggregate, and use it as the streamRevision
, and then make the SET
. Can't be safer, all is done atomically. And hopefully, redis loads natively a JSON library for Lua. But the problem is that the stream loaded in memory is no longer in sync. It could causes
Don't hesitate to contribute to this thread. I would love to hear your thoughts. In the mean time, I'll try to make a PR to show my intent.
Please add an example to README.
The module does not support mongodb authentication through a database, (no authSource on the authentication options)
https://mongodb.github.io/node-mongodb-native/driver-articles/mongoclient.html#auth-options
Whats currently missing is a way of aggregating events and projecting them in a simple manner.
Look at the following pseudo code:
var ag = eventstore.createAggregator("uniqueaggregatorname");
ag.on('event',function(e,done){
// do event projection here
// needs to be called to ensure event was processed.
done();
});
ag.start();
What this dose is to call each event only once per unique aggregator name even if the app is restarted. The current cursor should be automatically saved in the underlying event store storage.
If you think future, this can be extended to a full projection engine like this pseudo code:
var pr = eventstore.createProjection("uniqueprojectionname");
pr.state = {name:"-"};
pr.on('project',function(event,state,done){
if(event.name=="namechanged"){
state.name = event.name;
}
done();
});
pr.on('stateChanged',function(state){
console.log(state);
});
pr.start();
The difference is that here we store not only the current cursor, but rather the last projected state too.
Currently this is only an idea to inspire a discussion.
Scenario: Retrieve the evenstream of an aggregate and keep a reference to it, adding new events to it and committing occassionally.
Expected Result: All events added to the stream are actually persisted
Actual Result: Some events are not persisted, because the uncommittedEvents array has been altered between the store.addEvents()
call and the callback (async IO) and then gets overwritten with an empty array []
.
Solution: Create a copy of the eventstream.uncommittedEvents array and reset the uncommitedEvents early before the IO call, then use the copy inside the callback only and re-append the uncommitted events on error.
I'm considering RethinkDB as the event store due to the simplicity to setup as cluster in 1 node / multiple node.
While heavily using this awesome lib with Redis, I have a couple of performance improvements in mind.
I'd like to share them here, to open the discussion on it.
Obviously, this will lead to some PRs.
the way events are retrieved by scanning all the keys with a match pattern forces to get all the keys before any mget
, because scan
does not guarantee the order. maybe using a sorted set to store keys could help scanning them more efficiently, and even stream events (I know, node-eventstore
does not support streaming... yet!)
snapshots are stored forever, although they can be useful for some time only. It could be useful to be able to clear the oldest, or having a rolling strategy which only keeps the nth latest.
I'll add other improvements in this thread. This is an open discussion.
So don't hesitate to make suggestions or remarks.
@adrai I assume that you have been using the library for a few projects at least? If so, could you give some insights into the stability/durability of the project? Any kind of feedback would be appreciated ๐
It would be good if event store supports kinesis?
Hi,
This question is not any issue with your Eventstore. I use it. It works great.
I was thinking is it a good idea to use Greg's GetEventStore in Node.js?
There is one driver available here :
https://github.com/kenpratt/nodejs-EventStore
Have you attempted this? Do you see any pros & cons in this?
Any feedback is highly appreciated.
Thanks,
Bhoomi.
https://github.com/adrai/node-eventstore/blob/master/lib/event.js#L40
this.context = event.context || eventstream.context;
I got the following error when I try to write an event into event store which is empty:
Error: The events passed should all have a streamRevision!
at Object.EventStream ([projectDirectory]/node_modules/eventstore/lib/eventStream.js:52:15)
at [projectDirectory]/node_modules/eventstore/lib/eventstore.js:284:22
at[projectDirectory]/node_modules/eventstore/lib/databases/mongodb.js:333:16
at handleCallback ([projectDirectory]/node_modules/mongodb/lib/utils.js:95:12)
We are still using nodejs 0.12.7. I traced down to the line 52 in eventStream.js. It appears that, when the events is [], the code is still managed to reach line 49 to cause to crash.
After I changed the line 47-48 to
for(var i=0, len=events.length; i < len; i++) {
var evt = events[i];
The crash problem stops.
Hello, i was wondering if any of you built a nodejs web app that can be plugged on the eventstore that would allow for auditing and investigation? Something like the lokad "maintain app" . If not, I believe I would start one as I found it kind of handy.
thanks,
Alex.
We construct mongo connection strings and I was wondering if that is supported in this module. I'm mainly asking because we use the different options pretty heavily depending on the environment (local doesn't use a replica set whereas production does).
Adriano,
Following your instructions (https://github.com/adrai/node-eventstore#own-db-implementation), I was able to have EventStore use my own implementation of Store.
But after being a happy camper for a few days, now I don't see a way to pass options into my Store's (and also, EventStore's) constructor.
https://github.com/adrai/node-eventstore/blob/master/index.js#L7 :
function getSpecificStore(options) {
options = options || {};
if (options.prototype instanceof Base) {
return options;
}
module.exports = function(options) {
options = options || {};
var Store;
try {
Store = getSpecificStore(options);
} catch (err) {
throw err;
}
return new Eventstore(options, new Store(options));
};
module.exports.Store = Base;
It appears that when using the custom Store implementation the way you recommend, what gets passed as options into the factory method and EventStore constructor, is the constructor of the custom Store. Then the same Store constructor gets called with itself as the value of options...
Am I missing something here?
Thanks,
-Leo
If for whatever reason a large number of events don't get dispatched, the next time the dispatcher is started, it'll get the whole list of those events and call the callback in a loop. If you have some sort of async service using the dispatcher, a whole bunch of asynchronous requests will fire within milliseconds of each of them. It'd be nice if the dispatcher would wait for the passed callback to be called so event store doesn't overload other services downstream.
I'm wondering how replay is done best. My intuition was to take "THE_LAST_EVENT_SEEN"-id from the revisionGuard of my event-denormalizer and read all events from the event store that come after that one.
However, I still didn't find this feature and wonder how this should be done instead... What is the desired way to do this (or am I completely wrong with this thinking)?
I am working on an event store implementation using AWS SNS, DynamoDB, and Lambdas. Understanding that a DynamoDB implementation would be needed, is there any reason why node-eventstore would be a bad fit? SNS vs Atom?
cqrs framework use node-eventstore
https://github.com/leogiese/cqrs
your eventstore very good , thanks @adrai
I have a situation where I have dependencies between aggregates parent/child relationship. In this case, it would be really useful if it would be possible to say that I wanted to save changes of two (or more) event streams as a single transaction.
Similar to how multiple events are stored using an intermediate transaction document.
If there's a better way to do this, I'm all ears.
So we can be a little smarter about re-indexing our read models, it'd be cool if we could retrieve a list of unique aggregate ID's.
Node.js provides an EventEmitter class. Is there a reason why the interface does not match and the fakePublisher does not use it?
@rvin100 or @sbiaudet can you implement this function for azuretable?
f802f69#diff-83b1684b9c75ed90f43e1189240e7d4dR146
Browsing through the docs and this looks great.
One thing that caught my eye, is that I want to create a new snapshot after N events have happened. This results in M snapshots over time that I want to be versioned.
createSnapshot
allows me to define an optional version
. Normally I'd like to retrieve the latest snapshot (is there a shortcut to get to the latest without defining a version
? ) but sometimes I want to fetch an arbitrary version
. However, getFromSnapshot
(at least from the docs) doesn't seem to allow to pass version
How to do this?
Hello,
Thank you for your library.
The issue concerns how node-eventstore transmits events to node-cqrs-eventdenormalizer.
I want to rebuild a viewmodel by extracting all the events from the store.
For eventdenormalizer I can do:
eventdenormalizer.clear(function(err) {}); eventdenormalizer.replay([/* ordered array of events */], function (err) { if (err) { console.log(err); } });
For node-eventstore I can use to fetch all the events:
es.getEvents()
However, I didn't find the solution to pass my events from the event store to my eventdenormalizer.
Do I need to use eventstore.setEventToDispatched() for each event or is there another way?
Thanks.
Josh
Could you please check: http://stackoverflow.com/questions/23181661/nodeeventstore-publish-twice.
Thanks,
Ron
It would be nice to update the codebase to a newer version of JavaScript for less verbose and potentially more readable code. If there is an interest in this I'd offer to send a PR, as it would also give me an excuse to thoroughly dig through the codebase.
When tested nodeCQRS https://github.com/jamuhl/nodeCQRS
I get this error when creating an item http://localhost:3000/ using the redis store
node.js:134
throw e; // process.nextTick error, or 'error' event on first tick
^
ReferenceError: callback is not defined
at /Users/jameskyburz/Documents/src/nodeCQRS/node_modules/eventstore.redis/storage.js:280:13
at try_callback (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:466:9)
at RedisClient.return_reply (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:503:13)
at HiredisReplyParser. (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:259:14)
at HiredisReplyParser.emit (events.js:64:17)
at HiredisReplyParser.execute (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/lib/parser/hiredis.js:43:18)
at RedisClient.on_data (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:422:27)
at Socket. (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:66:14)
at Socket.emit (events.js:64:17)
at Socket._onReadable (net.js:678:14)
Added pull request
Given the event processing via callback characteristics of node.js I'm questioning if forking a child process for event dispatching is necessary. I'm probably missing something but couldn't the same async behavior be achieved without the complexity of child process?
Thanks for maintaining this project!
Since version 1.10, arrays convert to objects when stored with a redis backend. Version 1.9 works fine.
This is caused by the new Lua scripts in this commit:
401c537
See below link for more information:
http://openmymind.net/Lua-JSON-turns-empty-arrays-into-empty-hashes/
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.