Git Product home page Git Product logo

node-eventstore's Issues

low quality documentation

I think ppl (like me) who only know the basics, but never used an event storage in a project, won't understand how to use this lib, because the documentation is too low quality. It is just a bunch of examples without any indication why and when we should use the actual function.

Duplicate revision id

I'm not sure if this is intentional feature or bug...

If I create two events for the same aggregateId at the same time using mongo store (haven't tried others), they are both commited successfully with the same revisionId. I would expect the store to throw for the second event so that it can retry (revalidate on the updated state and then commit if successful).

To illustrate this, given a bank account balance $5. If two commands come at the same time to withdraw $4 and withdraw $3, they will both validate against the perceived state ($5) and thus both will dispatch events, leading to the account being in negative balance.

Expected behavior would be for the first event to be commited to win, and the second to be forced to retry. In the above example, if the 4$ was withdrawn first, second command will be forced to retry against the new state ($1), not pass validation, and reject the command.

I imagine this could be worked around by using a synthetic eventId (e.g aggregateId-expectedRevisionId), but I would think the store should handle this behavior, allowing any arbitrary eventId (eg uuid).

Please correct me if my understanding is incorrect.

Cleaning Snapshots

I started a conversation in the issue #94 to talk about overall performance improvements.
I was talking about a feature, where the snapshots are cleaned automatically.
So I started an implementation here https://github.com/rehia/node-eventstore/tree/clean-snapshots, where I implemented the snapshot cleaning feature for in memory and redis stores.
I still need to make some wider tests, but I also wanted to have your opinion on this feature.

I basically added a new option : max snapshots count, which indicates how many snapshots you want to keep at max. So if the value is 5, and you have already 10 snapshots, when you will create the next one, the 6 oldest will be deleted. By default, of course, none is deleted, as it is actually the case.

What do you think about this feature ? Is this the way you think snapshot cleaning can be done ?

Thanks for your feedback

Wrong signature calling mongodb update?

I had a completely other problem, however, I encounterned a possible fault in the mongodb store L389:
this.events.update({'_id' : id}, updateCommand, callback);

The corresponding function in the mongodb driver L1084 expects another signature:
Collection.prototype.update = function(selector, document, options, callback) {

No clue, why everything is still working... the function from mongodb does not handle this correctly, but returns a Promise instead of calling the callback.

Calling getNewEventStream throws "ReferenceError: self is not defined"

Calling getNewEventStream, on an instance of Store:

var eventstore = require('eventstore');
var es = eventstore.createStore();
es.getNewEventStream("testid");

Throws:

ReferenceError: self is not defined
at Object.Store.getNewEventStream (.../node_modules/eventstore/lib/eventStore.js:320:32)
...

Version:
eventstore 0.7.6

MongoDB storage provider still creating the default 'eventstore' db when configured to use a different dbName.

I configured the MongoDB storage provider to use a different dbName, following the readme example. The new db is correctly created and used for event storage, but an "eventstore" db is still created each time I start the server.

Library versions:

eventstore 0.7.6
eventstore.mongodb 0.7.5

Test code:

var eventstore = require('eventstore');
var storage = require('eventstore.mongodb');

var es = eventstore.createStore();
storage.createStorage({
  host: 'localhost',
  port: 27017,
  dbName: 'testEventStore',
  eventsCollectionName: 'events',
  snapshotsCollectionName: 'snapshots'
}, function(err, store) {
  es.configure(function() {
    es.use(store);
  });
  es.start();
});

Events not dispatching.

It seems that events never reach eventdispatcher when using mongodb. I've traced the execution to line 101 of eventstore.js:

    self.dispatcher.send({ action: 'addUndispatchedEvents', payload: JSON.stringify(evts) });

then execution goes into child_process.js and gets swallowed, no errors and the publisher isn't called either.

Here's some code to reproduce:

var eventstore = require('eventstore'),
    storage = require('eventstore.mongoDb');

var publisher = {
    publish: function(evt) {
        console.log('info: ' + new Date().toLocaleString() + ': ' + 'event published');
        //never get's here.
    }
}

store = storage.createStorage({dbName:'mydb'});
store.connect(function(err) {

    var es = eventstore.createStore({logger: 'console'});

    es.configure(function() {
        this.use(publisher);
        this.use(store);
    }).start();

    es.getEventStream('1', 0, -1, function(err,stream) {
        stream.addEvent({id: '1'});
        stream.commit();
    });


})

Remove default access keys from azuretable.js

Hey,

I was wondering why I didn't get any error messages using node-eventstore with the azuretable impl without providing access keys. Well the default access keys in azuretable.js do work :) I guess you don't want people go to town on that storage account?

Cheers
Michael

Kafka support?

Is there a plan on introducing Kafka as one of the event store as well?

Stoping the Store

Is there any proper way to clean the store?

If you start the EventSotre on the test setup method. What would you do on the tearDown?

Thanks for this work.

Redis indexation

In #94, I was talking about some ways to improve redis implementation performance, mostly when there are a large number of events (or snapshots).
The problem with actual the implementation is the scan which usually need to scan every events (or snapshots) of an aggregate, and then make the necessary calculations to only take needed events.

I have made a quick test with my production data.
I have an aggregate with +46K events with a total amount of +117K
I tried to scan all the keys (only scanning, no get or mget) using 2 strategies:

  • I made a common scan on all the keys. It took 6181ms.
  • I put all the aggregate keys in a sorted set (members score was the event version), and made a zrange on all the keys. It took 415ms

I think that sorted sets could be a good option to index aggregates keys, at least for 2 reasons:

  • It seems more efficient to scan all the keys at once, for an aggregate
  • It will be more efficient to get keys between 2 revisions, for an aggregate (like you need to do when you reload state from a snapshot and an event)

This means that the database need to be indexed at first, and the index maintained. So whenever a client connects, we need to check if the store has been indexed, and if it's not the case, scan all the aggregates, and scan for their events and snapshots keys, and finally index the keys (events and snapshots separated obviously). Once done, each time an event is added, its key should be added to the corresponding sorted set.

What do you think about this solution? I'll try to work on an implementation. But if you have any remarks, objections or suggestions, don't hesitate!

Cannot use custom store when it is brought in as a dependency (v1.1.3)

Adriano,

Current implementation fails when I have more than one version of eventstore module in the project's node-modules tree. If the project makes use both of cqrs-domain and my-custom-store, which both require eventstore, then the project's node-modules tree will look like this:

  • cqrs-domain
    • eventstore (1)
  • my-custom-store
    • eventstore (2)

Then this check:

if (options.type && options.type.prototype instanceof Base) {
    return options.type;
}

will always come back negative, as Base is defined in eventstore (1), while options.type is a sub-class of Base from eventstore (2)

How do you feel about replacing it with this:

if (options.type && typeof options.type === 'function') {
    return options.type;
}

Best,
-Leo

InMemory store leaks variable reference to stored data

This issue popped up during testing of my domains with cqrs-domain, where (for performance and environment set up reasons) I use an in-memory store instead of an external DB.

The InMemory database, when retrieving events, returns a direct reference to the events that were stored in memory. By returning this direct reference, anything using the data can mutate the events that are in the memory store. This poses a problem when you have anything handling the event that, for some reason, manipulates the data. And that in turn violates the immutability principle of event data.

Example

One example of this would be an categoryAdded and articleAdded event handler in an aggregate such as this:

let aggregate = {
    categories: {
        cat1: {
            id: 'cat1',
            articles: {
                art1: {
                    id: 'art1',
                    name: 'article'
                }
            }
        }
    }
}

If in the above case we have an event that adds cat1 (categoryAdded), and another that adds art1 to cat1 (articleAdded), the event handler for articleAdded will inadvertently mutate the payload of categoryAdded, since the event handler simply does something like cat1.articles[article.id] = article.

This is of course not a problem for the other database drivers, since they always return a copy of the original data as stored in the database, so mutation of the originally stored data is never an issue.

Fixes and workarounds

A workaround could be that we should always use deep clones when manipulating the aggregate, but this is cumbersome, prone to dev error, and otherwise impacts perf negatively. It doesn't make sense to do it this way.

I have a fix ready - I'll prep a PR to resolve this for the InMemory database.

How to Init()?

Hi, I'm trying to POC this for storing events - however, I'm finding that I'm not able to get the init() call to complete.

I've got the initialisation code for the es in a file that is pulled into my index.js by a require statement. I can see the Init() line being called and it waits on the process.nextTick(function() { statement.

How should I be initialising the eventstore? My index.js spins up a REST api in express and listens to a port.

Only Getting Undispatched Items per StreamID

Hi, not an issue, but I couldn't find a better way to ask this question. Is there any way that I can get only the undispatched items per streamId? It works in the "add - commit" scenario but not if I do not want to add an item. the getUndispatched function returns ALL undispatched item for the es and I could loop over this, but I guess this is not the most efficient way.

Thank you very much!

save fired repeatedly in a loop

Hello guys,

I have a basic view builder like below

module.exports = require('cqrs-eventdenormalizer').defineViewBuilder({
  name: 'detailsUpdated',
  id: 'payload.id',
  context:'workflowContext',
  aggregate:'approval'
}, function(data, vm) {
    vm.set(data);
    debug('saving detailsUpdated', JSON.stringify(data, null, '\t'));
});

The debug statement is fired repeatedly on the console with the time (in ms) it takes for action.
It seems to be re-saving the document repeatedly in a loop. The issue is quite intermittent.

I have to kill the process to stop it from looping.

// log output on console
mymodule saving detailsUpdated +390ms { // JSON I removed }
mymodule saving detailsUpdated +827ms { // JSON I removed }
mymodule saving detailsUpdated +252ms { // JSON I removed }

....
...

I need to fix the issue before the product go-live. Please help.

[redis] aggregates versioning might be inconsistent

While I was trying to use snapshots with redis, I ran into an issue which causes bugs when reloading my stream.
In some cases, and often under heavy load, redis store can produce multiple events with the same revision. So at a moment, the number of events (eg. 15) is higher than the current revision (eg 13, event revision 5 and 6 are "duplicated").
The problem starts when loading a snapshot and the corresponding events. The last snapshot is created with revision 13. And when the snapshot is loaded, events with revision above 13 are loaded with it, so mainly the 2 latest events (revisions 12 and 13), causing an inconsistent state when reloading from snapshot and events.

The problem is caused by the fact that the streamRevision of an event is based on the last loaded event, here https://github.com/adrai/node-eventstore/blob/663fb9fc3d0765ae22196c1bff3ea9aa8e79b305/lib/eventstore.js#L402-L413
Because this operation is not atomic, under heavy load or with lots of events, another command on the aggregate can cause the load of the aggregate with the same events, before the first one had enough time to commit. Both generated events finally get the same revision.

The problem is not directly in redis store. And I don't know if other stores face the same issue (using transactions ?). There is no transaction in redis, only atomic operations using MULTI (with no dependency on intermediate results) or Lua scripts (when needing to execute an operation depending on the previous result).

I open this ticket to open the discussion on this. I was thinking about a solution, but only for redis: instead of calling directly a SET, we could create a Lua script (easily deployed with redis driver, with all the caching stuff) which could INCR a value corresponding to the actual revision of an aggregate, and use it as the streamRevision, and then make the SET. Can't be safer, all is done atomically. And hopefully, redis loads natively a JSON library for Lua. But the problem is that the stream loaded in memory is no longer in sync. It could causes

Don't hesitate to contribute to this thread. I would love to hear your thoughts. In the mean time, I'll try to make a PR to show my intent.

Idea: event aggregation or projection engine

Whats currently missing is a way of aggregating events and projecting them in a simple manner.

Look at the following pseudo code:

var ag = eventstore.createAggregator("uniqueaggregatorname");

ag.on('event',function(e,done){
    // do event projection here 

    // needs to be called to ensure event was processed.
    done(); 
});

ag.start();

What this dose is to call each event only once per unique aggregator name even if the app is restarted. The current cursor should be automatically saved in the underlying event store storage.

If you think future, this can be extended to a full projection engine like this pseudo code:

var pr = eventstore.createProjection("uniqueprojectionname");

pr.state = {name:"-"};

pr.on('project',function(event,state,done){
    if(event.name=="namechanged"){
        state.name = event.name;
    }
    done();
});

pr.on('stateChanged',function(state){
    console.log(state);
});

pr.start();

The difference is that here we store not only the current cursor, but rather the last projected state too.

Currently this is only an idea to inspire a discussion.

Eventstream uncommittedEvents possibly overwrites events that were not committed yet

Scenario: Retrieve the evenstream of an aggregate and keep a reference to it, adding new events to it and committing occassionally.

Expected Result: All events added to the stream are actually persisted

Actual Result: Some events are not persisted, because the uncommittedEvents array has been altered between the store.addEvents() call and the callback (async IO) and then gets overwritten with an empty array [].

Solution: Create a copy of the eventstream.uncommittedEvents array and reset the uncommitedEvents early before the IO call, then use the copy inside the callback only and re-append the uncommitted events on error.

Redis overall performance improvements

While heavily using this awesome lib with Redis, I have a couple of performance improvements in mind.
I'd like to share them here, to open the discussion on it.
Obviously, this will lead to some PRs.

  • the way events are retrieved by scanning all the keys with a match pattern forces to get all the keys before any mget, because scan does not guarantee the order. maybe using a sorted set to store keys could help scanning them more efficiently, and even stream events (I know, node-eventstore does not support streaming... yet!)

  • snapshots are stored forever, although they can be useful for some time only. It could be useful to be able to clear the oldest, or having a rolling strategy which only keeps the nth latest.

I'll add other improvements in this thread. This is an open discussion.
So don't hesitate to make suggestions or remarks.

Geteventstore Integration

Hi,

This question is not any issue with your Eventstore. I use it. It works great.

I was thinking is it a good idea to use Greg's GetEventStore in Node.js?

There is one driver available here :
https://github.com/kenpratt/nodejs-EventStore

Have you attempted this? Do you see any pros & cons in this?

Any feedback is highly appreciated.

Thanks,
Bhoomi.

Error: The events passed should all have a streamRevision!

I got the following error when I try to write an event into event store which is empty:

Error: The events passed should all have a streamRevision!
    at Object.EventStream ([projectDirectory]/node_modules/eventstore/lib/eventStream.js:52:15)
    at [projectDirectory]/node_modules/eventstore/lib/eventstore.js:284:22
    at[projectDirectory]/node_modules/eventstore/lib/databases/mongodb.js:333:16
    at handleCallback ([projectDirectory]/node_modules/mongodb/lib/utils.js:95:12)

We are still using nodejs 0.12.7. I traced down to the line 52 in eventStream.js. It appears that, when the events is [], the code is still managed to reach line 49 to cause to crash.

After I changed the line 47-48 to

   for(var i=0, len=events.length; i < len; i++) {
      var evt = events[i];

The crash problem stops.

[Question] Audit/Explorateur like LOKAD

Hello, i was wondering if any of you built a nodejs web app that can be plugged on the eventstore that would allow for auditing and investigation? Something like the lokad "maintain app" . If not, I believe I would start one as I found it kind of handy.
thanks,
Alex.

Support for mongodb uri connection strings

We construct mongo connection strings and I was wondering if that is supported in this module. I'm mainly asking because we use the different options pretty heavily depending on the environment (local doesn't use a replica set whereas production does).

How to pass 'options' into the custom implementation of 'Store'?

Adriano,

Following your instructions (https://github.com/adrai/node-eventstore#own-db-implementation), I was able to have EventStore use my own implementation of Store.

But after being a happy camper for a few days, now I don't see a way to pass options into my Store's (and also, EventStore's) constructor.

https://github.com/adrai/node-eventstore/blob/master/index.js#L7 :

function getSpecificStore(options) {
  options = options || {};

  if (options.prototype instanceof Base) {
    return options;
  }

https://github.com/adrai/node-eventstore/blob/master/index.js#L48https://github.com/adrai/node-eventstore/blob/master/index.js#L48 :

module.exports = function(options) {
  options = options || {};

  var Store;

  try {
    Store = getSpecificStore(options);
  } catch (err) {
    throw err;
  }

  return new Eventstore(options, new Store(options));
};

module.exports.Store = Base;

It appears that when using the custom Store implementation the way you recommend, what gets passed as options into the factory method and EventStore constructor, is the constructor of the custom Store. Then the same Store constructor gets called with itself as the value of options...

Am I missing something here?

Thanks,
-Leo

Dispatcher doesn't dispatch sequentially

If for whatever reason a large number of events don't get dispatched, the next time the dispatcher is started, it'll get the whole list of those events and call the callback in a loop. If you have some sort of async service using the dispatcher, a whole bunch of asynchronous requests will fire within milliseconds of each of them. It'd be nice if the dispatcher would wait for the passed callback to be called so event store doesn't overload other services downstream.

Get all events since event with id?

I'm wondering how replay is done best. My intuition was to take "THE_LAST_EVENT_SEEN"-id from the revisionGuard of my event-denormalizer and read all events from the event store that come after that one.

However, I still didn't find this feature and wonder how this should be done instead... What is the desired way to do this (or am I completely wrong with this thinking)?

Lambda, SNS, DynamoDB

I am working on an event store implementation using AWS SNS, DynamoDB, and Lambdas. Understanding that a DynamoDB implementation would be needed, is there any reason why node-eventstore would be a bad fit? SNS vs Atom?

Feature request, commit multiple event streams in one go

I have a situation where I have dependencies between aggregates parent/child relationship. In this case, it would be really useful if it would be possible to say that I wanted to save changes of two (or more) event streams as a single transaction.

Similar to how multiple events are stored using an intermediate transaction document.

If there's a better way to do this, I'm all ears.

multiple snapshots: how to do `getFromSnapshot` using `version`

Browsing through the docs and this looks great.

One thing that caught my eye, is that I want to create a new snapshot after N events have happened. This results in M snapshots over time that I want to be versioned.

createSnapshot allows me to define an optional version. Normally I'd like to retrieve the latest snapshot (is there a shortcut to get to the latest without defining a version ? ) but sometimes I want to fetch an arbitrary version. However, getFromSnapshot (at least from the docs) doesn't seem to allow to pass version

How to do this?

Get and pass all events from event store to node-cqrs-eventdenormalizer

Hello,

Thank you for your library.
The issue concerns how node-eventstore transmits events to node-cqrs-eventdenormalizer.

I want to rebuild a viewmodel by extracting all the events from the store.

For eventdenormalizer I can do:
eventdenormalizer.clear(function(err) {}); eventdenormalizer.replay([/* ordered array of events */], function (err) { if (err) { console.log(err); } });

For node-eventstore I can use to fetch all the events:
es.getEvents()

However, I didn't find the solution to pass my events from the event store to my eventdenormalizer.
Do I need to use eventstore.setEventToDispatched() for each event or is there another way?

Thanks.

Josh

Updating to ES2015/16/17

It would be nice to update the codebase to a newer version of JavaScript for less verbose and potentially more readable code. If there is an interest in this I'd offer to send a PR, as it would also give me an excuse to thoroughly dig through the codebase.

Missing callback

When tested nodeCQRS https://github.com/jamuhl/nodeCQRS

I get this error when creating an item http://localhost:3000/ using the redis store

node.js:134
throw e; // process.nextTick error, or 'error' event on first tick
^
ReferenceError: callback is not defined
at /Users/jameskyburz/Documents/src/nodeCQRS/node_modules/eventstore.redis/storage.js:280:13
at try_callback (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:466:9)
at RedisClient.return_reply (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:503:13)
at HiredisReplyParser. (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:259:14)
at HiredisReplyParser.emit (events.js:64:17)
at HiredisReplyParser.execute (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/lib/parser/hiredis.js:43:18)
at RedisClient.on_data (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:422:27)
at Socket. (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:66:14)
at Socket.emit (events.js:64:17)
at Socket._onReadable (net.js:678:14)

Added pull request

Questioning child process for dispatch

Given the event processing via callback characteristics of node.js I'm questioning if forking a child process for event dispatching is necessary. I'm probably missing something but couldn't the same async behavior be achieved without the complexity of child process?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.