Git Product home page Git Product logo

node-eventstore's Introduction

⚠️ IMPORTANT NEWS! 📰

I’ve been dealing with CQRS, event-sourcing and DDD long enough now that I don’t need working with it anymore unfortunately, so at least for now this my formal farewell!

I want to thank everyone who has contributed in one way or another. Especially...

  • Jan, who introduced me to this topic.
  • Dimitar, one of the last bigger contributors and maintainer.
  • My last employer, who gave me the possibility to use all these CQRS modules in a big Cloud-System.
  • My family and friends, who very often came up short.

Finally, I would like to thank Golo Roden, who was there very early at the beginning of my CQRS/ES/DDD journey and is now here again to take over these modules.

Golo Roden is the founder, CTO and managing director of the native web, a company specializing in native web technologies. Among other things, he also teaches CQRS/ES/DDD etc. and based on his vast knowledge, he brought wolkenkit to life. wolkenkit is a CQRS and event-sourcing framework based on Node.js. It empowers you to build and run scalable distributed web and cloud services that process and store streams of domain events.

With this step, I can focus more on i18next, locize and localistars. I'm happy about that. 😊

So, there is no end, but the start of a new phase for my CQRS modules. 😉

I wish you all good luck on your journey.

Who knows, maybe we'll meet again in a github issue or PR at i18next 😉

Adriano Raiano


Introduction

JS.ORG travis npm

The project goal is to provide an eventstore implementation for node.js:

  • load and store events via EventStream object
  • event dispatching to your publisher (optional)
  • supported Dbs (inmemory, mongodb, redis, tingodb, elasticsearch, azuretable, dynamodb)
  • snapshot support
  • query your events

Consumers

Installation

npm install eventstore

Usage

Require the module and init the eventstore:

var eventstore = require('eventstore');

var es = eventstore();

By default the eventstore will use an inmemory Storage.

Logging

For logging and debugging you can use debug by TJ Holowaychuk

simply run your process with

DEBUG=eventstore* node app.js

Provide implementation for storage

example with mongodb:

var es = require('eventstore')({
  type: 'mongodb',
  host: 'localhost',                             // optional
  port: 27017,                                   // optional
  dbName: 'eventstore',                          // optional
  eventsCollectionName: 'events',                // optional
  snapshotsCollectionName: 'snapshots',          // optional
  transactionsCollectionName: 'transactions',    // optional
  timeout: 10000,                                // optional
  // emitStoreEvents: true                       // optional, by default no store events are emitted
  // maxSnapshotsCount: 3                        // optional, defaultly will keep all snapshots
  // authSource: 'authedicationDatabase'         // optional
  // username: 'technicalDbUser'                 // optional
  // password: 'secret'                          // optional
  // url: 'mongodb://user:pass@host:port/db?opts // optional
  // positionsCollectionName: 'positions'        // optional, defaultly wont keep position
});

example with redis:

var es = require('eventstore')({
  type: 'redis',
  host: 'localhost',                          // optional
  port: 6379,                                 // optional
  db: 0,                                      // optional
  prefix: 'eventstore',                       // optional
  eventsCollectionName: 'events',             // optional
  snapshotsCollectionName: 'snapshots',       // optional
  timeout: 10000                              // optional
  // emitStoreEvents: true,                   // optional, by default no store events are emitted
  // maxSnapshotsCount: 3                     // optional, defaultly will keep all snapshots
  // password: 'secret'                       // optional
});

example with tingodb:

var es = require('eventstore')({
  type: 'tingodb',
  dbPath: '/path/to/my/db/file',              // optional
  eventsCollectionName: 'events',             // optional
  snapshotsCollectionName: 'snapshots',       // optional
  transactionsCollectionName: 'transactions', // optional
  timeout: 10000,                             // optional
  // emitStoreEvents: true,                   // optional, by default no store events are emitted
  // maxSnapshotsCount: 3                     // optional, defaultly will keep all snapshots
});

example with elasticsearch:

var es = require('eventstore')({
  type: 'elasticsearch',
  host: 'localhost:9200',                     // optional
  indexName: 'eventstore',                    // optional
  eventsTypeName: 'events',                   // optional
  snapshotsTypeName: 'snapshots',             // optional
  log: 'warning',                             // optional
  maxSearchResults: 10000,                    // optional
  // emitStoreEvents: true,                   // optional, by default no store events are emitted
  // maxSnapshotsCount: 3                     // optional, defaultly will keep all snapshots
});

example with custom elasticsearch client (e.g. with AWS ElasticSearch client. Note http-aws-es package usage in this example):

var elasticsearch = require('elasticsearch');

var esClient = = new elasticsearch.Client({
  hosts: 'SOMETHING.es.amazonaws.com',
  connectionClass: require('http-aws-es'),
  amazonES: {
    region: 'us-east-1',
    accessKey: 'REPLACE_AWS_accessKey',
    secretKey: 'REPLACE_AWS_secretKey'
  }
});

var es = require('eventstore')({
  type: 'elasticsearch',
  client: esClient,
  indexName: 'eventstore',
  eventsTypeName: 'events',
  snapshotsTypeName: 'snapshots',
  log: 'warning',
  maxSearchResults: 10000
});

example with azuretable:

var es = require('eventstore')({
  type: 'azuretable',
  storageAccount: 'nodeeventstore',
  storageAccessKey: 'aXJaod96t980AbNwG9Vh6T3ewPQnvMWAn289Wft9RTv+heXQBxLsY3Z4w66CI7NN12+1HUnHM8S3sUbcI5zctg==',
  storageTableHost: 'https://nodeeventstore.table.core.windows.net/',
  eventsTableName: 'events',               // optional
  snapshotsTableName: 'snapshots',         // optional
  timeout: 10000,                          // optional
  emitStoreEvents: true                    // optional, by default no store events are emitted
});

example with dynamodb:

var es = require('eventstore')({
    type: 'dynamodb',
    eventsTableName: 'events',                  // optional
    snapshotsTableName: 'snapshots',            // optional
    undispatchedEventsTableName: 'undispatched' // optional
    EventsReadCapacityUnits: 1,                 // optional
    EventsWriteCapacityUnits: 3,                // optional
    SnapshotReadCapacityUnits: 1,               // optional
    SnapshotWriteCapacityUnits: 3,              // optional
    UndispatchedEventsReadCapacityUnits: 1,     // optional
    UndispatchedEventsReadCapacityUnits: 1,     // optional
    useUndispatchedEventsTable: true            // optional
    eventsTableStreamEnabled: false             // optional
    eventsTableStreamViewType: 'NEW_IMAGE',     // optional
    emitStoreEvents: true                       // optional, by default no store events are emitted
});

DynamoDB credentials are obtained by eventstore either from environment vars or credentials file. For setup see AWS Javascript SDK.

DynamoDB provider supports DynamoDB local for local development via the AWS SDK endpoint option. Just set the $AWS_DYNAMODB_ENDPOINT (or %AWS_DYNAMODB_ENDPOINT% in Windows) environment variable to point to your running instance of Dynamodb local like this:

$ export AWS_DYNAMODB_ENDPOINT=http://localhost:8000

Or on Windows:

> set AWS_DYNAMODB_ENDPOINT=http://localhost:8000

The useUndispatchedEventsTable option to available for those who prefer to use DyanmoDB.Streams to pull events from the store instead of the UndispatchedEvents table. The default is true. Setting this option to false will result in the UndispatchedEvents table not being created at all, the getUndispatchedEvents method will always return an empty array, and the setEventToDispatched will effectively do nothing.

Refer to StreamViewType for a description of the eventsTableStreamViewType option

Built-in event publisher (optional)

if defined the eventstore will try to publish AND set event do dispatched on its own...

sync interface

es.useEventPublisher(function(evt) {
  // bus.emit('event', evt);
});

async interface

es.useEventPublisher(function(evt, callback) {
  // bus.sendAndWaitForAck('event', evt, callback);
});

catch connect and disconnect events

es.on('connect', function() {
  console.log('storage connected');
});

es.on('disconnect', function() {
  console.log('connection to storage is gone');
});

define event mappings [optional]

Define which values should be mapped/copied to the payload event.

es.defineEventMappings({
  id: 'id',
  commitId: 'commitId',
  commitSequence: 'commitSequence',
  commitStamp: 'commitStamp',
  streamRevision: 'streamRevision'
});

initialize

es.init(function (err) {
  // this callback is called when all is ready...
});

// or

es.init(); // callback is optional

working with the eventstore

get the eventhistory (of an aggregate)

es.getEventStream('streamId', function(err, stream) {
  var history = stream.events; // the original event will be in events[i].payload

  // myAggregate.loadFromHistory(history);
});

or

es.getEventStream({
  aggregateId: 'myAggregateId',
  aggregate: 'person',          // optional
  context: 'hr'                 // optional
}, function(err, stream) {
  var history = stream.events; // the original event will be in events[i].payload

  // myAggregate.loadFromHistory(history);
});

'streamId' and 'aggregateId' are the same... In ddd terms aggregate and context are just to be more precise in language. For example you can have a 'person' aggregate in the context 'human ressources' and a 'person' aggregate in the context of 'business contracts'... So you can have 2 complete different aggregate instances of 2 complete different aggregates (but perhaps with same name) in 2 complete different contexts

you can request an eventstream even by limit the query with a 'minimum revision number' and a 'maximum revision number'

var revMin = 5,
    revMax = 8; // if you omit revMax or you define it as -1 it will retrieve until the end

es.getEventStream('streamId' || {/* query */}, revMin, revMax, function(err, stream) {
  var history = stream.events; // the original event will be in events[i].payload

  // myAggregate.loadFromHistory(history);
});

store a new event and commit it to store

es.getEventStream('streamId', function(err, stream) {
  stream.addEvent({ my: 'event' });
  stream.addEvents([{ my: 'event2' }]);

  stream.commit();

  // or

  stream.commit(function(err, stream) {
    console.log(stream.eventsToDispatch); // this is an array containing all added events in this commit.
  });
});

if you defined an event publisher function the committed event will be dispatched to the provided publisher

if you just want to load the last event as stream you can call getLastEventAsStream instead of ´getEventStream´.

working with snapshotting

get snapshot and eventhistory from the snapshot point

es.getFromSnapshot('streamId', function(err, snapshot, stream) {
  var snap = snapshot.data;
  var history = stream.events; // events history from given snapshot

  // myAggregate.loadSnapshot(snap);
  // myAggregate.loadFromHistory(history);
});

or

es.getFromSnapshot({
  aggregateId: 'myAggregateId',
  aggregate: 'person',          // optional
  context: 'hr'                 // optional
}, function(err, snapshot, stream) {
  var snap = snapshot.data;
  var history = stream.events; // events history from given snapshot

  // myAggregate.loadSnapshot(snap);
  // myAggregate.loadFromHistory(history);
});

you can request a snapshot and an eventstream even by limit the query with a 'maximum revision number'

var revMax = 8; // if you omit revMax or you define it as -1 it will retrieve until the end

es.getFromSnapshot('streamId' || {/* query */}, revMax, function(err, snapshot, stream) {
  var snap = snapshot.data;
  var history = stream.events; // events history from given snapshot

  // myAggregate.loadSnapshot(snap);
  // myAggregate.loadFromHistory(history);
});

create a snapshot point

es.getFromSnapshot('streamId', function(err, snapshot, stream) {

  var snap = snapshot.data;
  var history = stream.events; // events history from given snapshot

  // myAggregate.loadSnapshot(snap);
  // myAggregate.loadFromHistory(history);

  // create a new snapshot depending on your rules
  if (history.length > myLimit) {
    es.createSnapshot({
      streamId: 'streamId',
      data: myAggregate.getSnap(),
      revision: stream.lastRevision,
      version: 1 // optional
    }, function(err) {
      // snapshot saved
    });

    // or

    es.createSnapshot({
      aggregateId: 'myAggregateId',
      aggregate: 'person',          // optional
      context: 'hr'                 // optional
      data: myAggregate.getSnap(),
      revision: stream.lastRevision,
      version: 1 // optional
    }, function(err) {
      // snapshot saved
    });
  }

  // go on: store new event and commit it
  // stream.addEvents...

});

You can automatically clean older snapshots by configuring the number of snapshots to keep with maxSnapshotsCount in eventstore options.

own event dispatching (no event publisher function defined)

es.getUndispatchedEvents(function(err, evts) {
  // or es.getUndispatchedEvents('streamId', function(err, evts) {
  // or es.getUndispatchedEvents({ // free choice (all, only context, only aggregate, only aggregateId...)
  //                                context: 'hr',
  //                                aggregate: 'person',
  //                                aggregateId: 'uuid'
  //                              }, function(err, evts) {

  // all undispatched events
  console.log(evts);

  // dispatch it and set the event as dispatched

  for (var e in evts) {
    var evt = evts[r];
    es.setEventToDispatched(evt, function(err) {});
    // or
    es.setEventToDispatched(evt.id, function(err) {});
  }

});

query your events

for replaying your events or for rebuilding a viewmodel or just for fun...

skip, limit always optional

var skip = 0,
    limit = 100; // if you omit limit or you define it as -1 it will retrieve until the end

es.getEvents(skip, limit, function(err, evts) {
  // if (events.length === amount) {
  //   events.next(function (err, nextEvts) {}); // just call next to retrieve the next page...
  // } else {
  //   // finished...
  // }
});

// or

es.getEvents('streamId', skip, limit, function(err, evts) {
  // if (events.length === amount) {
  //   events.next(function (err, nextEvts) {}); // just call next to retrieve the next page...
  // } else {
  //   // finished...
  // }
});

// or

es.getEvents({ // free choice (all, only context, only aggregate, only aggregateId...)
  context: 'hr',
  aggregate: 'person',
  aggregateId: 'uuid'
}, skip, limit, function(err, evts) {
  // if (events.length === amount) {
  //   events.next(function (err, nextEvts) {}); // just call next to retrieve the next page...
  // } else {
  //   // finished...
  // }
});

by revision

revMin, revMax always optional

var revMin = 5,
    revMax = 8; // if you omit revMax or you define it as -1 it will retrieve until the end

es.getEventsByRevision('streamId', revMin, revMax, function(err, evts) {});

// or

es.getEventsByRevision({
  aggregateId: 'myAggregateId',
  aggregate: 'person',          // optional
  context: 'hr'                 // optional
}, revMin, revMax, function(err, evts) {});

by commitStamp

skip, limit always optional

var skip = 0,
    limit = 100; // if you omit limit or you define it as -1 it will retrieve until the end

es.getEventsSince(new Date(2015, 5, 23), skip, limit, function(err, evts) {
  // if (events.length === amount) {
  //   events.next(function (err, nextEvts) {}); // just call next to retrieve the next page...
  // } else {
  //   // finished...
  // }
});

// or

es.getEventsSince(new Date(2015, 5, 23), limit, function(err, evts) {
  // if (events.length === amount) {
  //   events.next(function (err, nextEvts) {}); // just call next to retrieve the next page...
  // } else {
  //   // finished...
  // }
});

// or

es.getEventsSince(new Date(2015, 5, 23), function(err, evts) {
  // if (events.length === amount) {
  //   events.next(function (err, nextEvts) {}); // just call next to retrieve the next page...
  // } else {
  //   // finished...
  // }
});

streaming your events

Some databases support streaming your events, the api is similar to the query one

skip, limit always optional

var skip = 0,
    limit = 100; // if you omit limit or you define it as -1 it will retrieve until the end

var stream = es.streamEvents(skip, limit);
// or
var stream = es.streamEvents('streamId', skip, limit);
// or by commitstamp
var stream = es.streamEventsSince(new Date(2015, 5, 23), skip, limit);
// or by revision
var stream = es.streamEventsByRevision({
  aggregateId: 'myAggregateId',
  aggregate: 'person',
  context: 'hr',
});

stream.on('data', function(e) {
  doSomethingWithEvent(e);
});

stream.on('end', function() {
  console.log('no more evets');
});

// or even better
stream.pipe(myWritableStream);

currently supported by:

  1. mongodb

get the last event

for example to obtain the last revision nr

es.getLastEvent('streamId', function(err, evt) {
});

// or

es.getLastEvent({ // free choice (all, only context, only aggregate, only aggregateId...)
  context: 'hr',
  aggregate: 'person',
  aggregateId: 'uuid'
} function(err, evt) {
});

obtain a new id

es.getNewId(function(err, newId) {
  if(err) {
    console.log('ohhh :-(');
    return;
  }

  console.log('the new id is: ' + newId);
});

position of event in store

some db implementations support writing the position of the event in the whole store additional to the streamRevision.

currently those implementations support this:

  1. inmemory ( by setting ```trackPosition`` option )
  2. mongodb ( by setting positionsCollectionName option)

special scaling handling with mongodb

Inserting multiple events (documents) in mongodb, is not atomic. For the eventstore tries to repair itself when calling getEventsByRevision. But if you want you can trigger this from outside:

es.store.getPendingTransactions(function(err, txs) {
  if(err) {
    console.log('ohhh :-(');
    return;
  }

  // txs is an array of objects like:
  // {
  //   _id: '/* the commitId of the committed event stream */',
  //   events: [ /* all events of the committed event stream */ ],
  //   aggregateId: 'aggregateId',
  //   aggregate: 'aggregate', // optional
  //   context: 'context'      // optional
  // }

  es.store.getLastEvent({
    aggregateId: txs[0].aggregateId,
    aggregate: txs[0].aggregate, // optional
    context: txs[0].context      // optional
  }, function (err, lastEvent) {
    if(err) {
      console.log('ohhh :-(');
      return;
    }

    es.store.repairFailedTransaction(lastEvent, function (err) {
      if(err) {
        console.log('ohhh :-(');
        return;
      }

      console.log('everything is fine');
    });
  });
});

Catch before and after eventstore events

Optionally the eventstore can emit brefore and after events, to enable this feature set the emitStoreEvents to true.

var eventstore = require('eventstore');
var es = eventstore({
  emitStoreEvents: true,
});

es.on('before-clear', function({milliseconds}) {});
es.on('after-clear', function({milliseconds}) {});

es.on('before-get-next-positions', function({milliseconds, arguments: [positions]}) {});
es.on('after-get-next-positions', function({milliseconds, arguments: [positions]}) {});

es.on('before-add-events', function({milliseconds, arguments: [events]}) {});
es.on('after-add-events', function(milliseconds, arguments: [events]) {});

es.on('before-get-events', function({milliseconds, arguments: [query, skip, limit]}) {});
es.on('after-get-events', function({milliseconds, arguments: [query, skip, limit]}) {});

es.on('before-get-events-since', function({milliseconds, arguments: [milliseconds, date, skip, limit]}) {});
es.on('after-get-events-since', function({milliseconds, arguments: [date, skip, limit]}) {});

es.on('before-get-events-by-revision', function({milliseconds, arguments: [query, revMin, revMax]}) {});
es.on('after-get-events-by-revision', function({milliseconds, arguments, [query, revMin, revMax]}) {});

es.on('before-get-last-event', function({milliseconds, arguments: [query]}) {});
es.on('after-get-last-event', function({milliseconds, arguments: [query]}) {});

es.on('before-get-undispatched-events', function({milliseconds, arguments: [query]}) {});
es.on('after-get-undispatched-events', function({milliseconds, arguments: [query]}) {});

es.on('before-set-event-to-dispatched', function({milliseconds, arguments: [id]}) {});
es.on('after-set-event-to-dispatched', function({milliseconds, arguments: [id]}) {});

es.on('before-add-snapshot', function({milliseconds, arguments: [snap]}) {});
es.on('after-add-snapshot', function({milliseconds, arguments: [snap]}) {});

es.on('before-clean-snapshots', function({milliseconds, arguments: [query]}) {});
es.on('after-clean-snapshots', function({milliseconds, arguments: [query]}) {});

es.on('before-get-snapshot', function({milliseconds, arguments: [query, revMax]}) {});
es.on('after-get-snapshot', function({milliseconds, arguments: [query, revMax]}) {});

es.on('before-remove-transactions', function({milliseconds}, arguments: [event]) {});
es.on('after-remove-transactions', function({milliseconds}, arguments: [event]) {});

es.on('before-get-pending-transactions', function({milliseconds}) {});
es.on('after-get-pending-transactions', function({milliseconds}) {});

es.on('before-repair-failed-transactions', function({milliseconds, arguments: [lastEvt]}) {});
es.on('after-repair-failed-transactions', function({milliseconds, arguments: [lastEvt]}) {});

es.on('before-remove-tables', function({milliseconds}) {});
es.on('after-remove-tables', function({milliseconds}) {});

es.on('before-stream-events', function({milliseconds, arguments: [query, skip, limit]}) {});
es.on('after-stream-events', function({milliseconds, arguments: [query, skip, limit]}) {});

es.on('before-stream-events-since', function({milliseconds, arguments: [date, skip, limit]}) {});
es.on('after-stream-events-since', function({milliseconds, arguments: [date, skip, limit]}) {});

es.on('before-get-event-stream', function({milliseconds, arguments: [query, revMin, revMax]}) {});
es.on('after-get-event-stream', function({milliseconds, arguments: [query, revMin, revMax]}) {});

es.on('before-get-from-snapshot', function({milliseconds, arguments: [query, revMax]}) {});
es.on('after-get-from-snapshot', function({milliseconds, arguments: [query, revMax]}) {});

es.on('before-create-snapshot', function({milliseconds, arguments: [obj]}) {});
es.on('after-create-snapshot', function({milliseconds, arguments: [obj]}) {});

es.on('before-commit', function({milliseconds, arguments: [eventstream]}) {});
es.on('after-commit', function({milliseconds, arguments: [eventstream]}) {});

es.on('before-get-last-event-as-stream', function({milliseconds, arguments: [query]}) {});
es.on('after-get-last-event-as-stream', function({milliseconds, arguments: [query]}) {});

Sample Integration

  • nodeCQRS A CQRS sample integrating eventstore

Inspiration

Database Support

Currently these databases are supported:

  1. inmemory
  2. mongodb (node-mongodb-native)
  3. redis (redis)
  4. tingodb (tingodb)
  5. azuretable (azure-storage)
  6. dynamodb (aws-sdk)

own db implementation

You can use your own db implementation by extending this...

var Store = require('eventstore').Store,
    util = require('util'),
    _ = require('lodash');

function MyDB(options) {
  options = options || {};
  Store.call(this, options);
}

util.inherits(MyDB, Store);

_.extend(MyDB.prototype, {

  // ...

});

module.exports = MyDB;

and you can use it in this way

var es = require('eventstore')({
  type: MyDB
});
// es.init...

License

Copyright (c) 2018 Adriano Raiano, Jan Muehlemann

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

node-eventstore's People

Contributors

adrai avatar bobbytes avatar brighthas avatar chriscosgriff avatar dannyryman avatar dependabot[bot] avatar developmentalmadness avatar evereq avatar glockenbeat avatar goloroden avatar jameskyburz avatar jamuhl avatar jrutley avatar kennethlynne avatar marcbachmann avatar mathiasverraes avatar mmmdreg avatar nanov avatar odin16 avatar pingchen avatar rehia avatar rvin100 avatar sbiaudet avatar scriptersugar avatar ssajous avatar surlemur-zuehlke avatar theolzacharopoulos avatar tyguy avatar wrobel avatar wspringer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-eventstore's Issues

InMemory store leaks variable reference to stored data

This issue popped up during testing of my domains with cqrs-domain, where (for performance and environment set up reasons) I use an in-memory store instead of an external DB.

The InMemory database, when retrieving events, returns a direct reference to the events that were stored in memory. By returning this direct reference, anything using the data can mutate the events that are in the memory store. This poses a problem when you have anything handling the event that, for some reason, manipulates the data. And that in turn violates the immutability principle of event data.

Example

One example of this would be an categoryAdded and articleAdded event handler in an aggregate such as this:

let aggregate = {
    categories: {
        cat1: {
            id: 'cat1',
            articles: {
                art1: {
                    id: 'art1',
                    name: 'article'
                }
            }
        }
    }
}

If in the above case we have an event that adds cat1 (categoryAdded), and another that adds art1 to cat1 (articleAdded), the event handler for articleAdded will inadvertently mutate the payload of categoryAdded, since the event handler simply does something like cat1.articles[article.id] = article.

This is of course not a problem for the other database drivers, since they always return a copy of the original data as stored in the database, so mutation of the originally stored data is never an issue.

Fixes and workarounds

A workaround could be that we should always use deep clones when manipulating the aggregate, but this is cumbersome, prone to dev error, and otherwise impacts perf negatively. It doesn't make sense to do it this way.

I have a fix ready - I'll prep a PR to resolve this for the InMemory database.

Wrong signature calling mongodb update?

I had a completely other problem, however, I encounterned a possible fault in the mongodb store L389:
this.events.update({'_id' : id}, updateCommand, callback);

The corresponding function in the mongodb driver L1084 expects another signature:
Collection.prototype.update = function(selector, document, options, callback) {

No clue, why everything is still working... the function from mongodb does not handle this correctly, but returns a Promise instead of calling the callback.

[Question] Audit/Explorateur like LOKAD

Hello, i was wondering if any of you built a nodejs web app that can be plugged on the eventstore that would allow for auditing and investigation? Something like the lokad "maintain app" . If not, I believe I would start one as I found it kind of handy.
thanks,
Alex.

low quality documentation

I think ppl (like me) who only know the basics, but never used an event storage in a project, won't understand how to use this lib, because the documentation is too low quality. It is just a bunch of examples without any indication why and when we should use the actual function.

Calling getNewEventStream throws "ReferenceError: self is not defined"

Calling getNewEventStream, on an instance of Store:

var eventstore = require('eventstore');
var es = eventstore.createStore();
es.getNewEventStream("testid");

Throws:

ReferenceError: self is not defined
at Object.Store.getNewEventStream (.../node_modules/eventstore/lib/eventStore.js:320:32)
...

Version:
eventstore 0.7.6

Dispatcher doesn't dispatch sequentially

If for whatever reason a large number of events don't get dispatched, the next time the dispatcher is started, it'll get the whole list of those events and call the callback in a loop. If you have some sort of async service using the dispatcher, a whole bunch of asynchronous requests will fire within milliseconds of each of them. It'd be nice if the dispatcher would wait for the passed callback to be called so event store doesn't overload other services downstream.

Redis overall performance improvements

While heavily using this awesome lib with Redis, I have a couple of performance improvements in mind.
I'd like to share them here, to open the discussion on it.
Obviously, this will lead to some PRs.

  • the way events are retrieved by scanning all the keys with a match pattern forces to get all the keys before any mget, because scan does not guarantee the order. maybe using a sorted set to store keys could help scanning them more efficiently, and even stream events (I know, node-eventstore does not support streaming... yet!)

  • snapshots are stored forever, although they can be useful for some time only. It could be useful to be able to clear the oldest, or having a rolling strategy which only keeps the nth latest.

I'll add other improvements in this thread. This is an open discussion.
So don't hesitate to make suggestions or remarks.

Get all events since event with id?

I'm wondering how replay is done best. My intuition was to take "THE_LAST_EVENT_SEEN"-id from the revisionGuard of my event-denormalizer and read all events from the event store that come after that one.

However, I still didn't find this feature and wonder how this should be done instead... What is the desired way to do this (or am I completely wrong with this thinking)?

Updating to ES2015/16/17

It would be nice to update the codebase to a newer version of JavaScript for less verbose and potentially more readable code. If there is an interest in this I'd offer to send a PR, as it would also give me an excuse to thoroughly dig through the codebase.

How to pass 'options' into the custom implementation of 'Store'?

Adriano,

Following your instructions (https://github.com/adrai/node-eventstore#own-db-implementation), I was able to have EventStore use my own implementation of Store.

But after being a happy camper for a few days, now I don't see a way to pass options into my Store's (and also, EventStore's) constructor.

https://github.com/adrai/node-eventstore/blob/master/index.js#L7 :

function getSpecificStore(options) {
  options = options || {};

  if (options.prototype instanceof Base) {
    return options;
  }

https://github.com/adrai/node-eventstore/blob/master/index.js#L48https://github.com/adrai/node-eventstore/blob/master/index.js#L48 :

module.exports = function(options) {
  options = options || {};

  var Store;

  try {
    Store = getSpecificStore(options);
  } catch (err) {
    throw err;
  }

  return new Eventstore(options, new Store(options));
};

module.exports.Store = Base;

It appears that when using the custom Store implementation the way you recommend, what gets passed as options into the factory method and EventStore constructor, is the constructor of the custom Store. Then the same Store constructor gets called with itself as the value of options...

Am I missing something here?

Thanks,
-Leo

Cleaning Snapshots

I started a conversation in the issue #94 to talk about overall performance improvements.
I was talking about a feature, where the snapshots are cleaned automatically.
So I started an implementation here https://github.com/rehia/node-eventstore/tree/clean-snapshots, where I implemented the snapshot cleaning feature for in memory and redis stores.
I still need to make some wider tests, but I also wanted to have your opinion on this feature.

I basically added a new option : max snapshots count, which indicates how many snapshots you want to keep at max. So if the value is 5, and you have already 10 snapshots, when you will create the next one, the 6 oldest will be deleted. By default, of course, none is deleted, as it is actually the case.

What do you think about this feature ? Is this the way you think snapshot cleaning can be done ?

Thanks for your feedback

Redis indexation

In #94, I was talking about some ways to improve redis implementation performance, mostly when there are a large number of events (or snapshots).
The problem with actual the implementation is the scan which usually need to scan every events (or snapshots) of an aggregate, and then make the necessary calculations to only take needed events.

I have made a quick test with my production data.
I have an aggregate with +46K events with a total amount of +117K
I tried to scan all the keys (only scanning, no get or mget) using 2 strategies:

  • I made a common scan on all the keys. It took 6181ms.
  • I put all the aggregate keys in a sorted set (members score was the event version), and made a zrange on all the keys. It took 415ms

I think that sorted sets could be a good option to index aggregates keys, at least for 2 reasons:

  • It seems more efficient to scan all the keys at once, for an aggregate
  • It will be more efficient to get keys between 2 revisions, for an aggregate (like you need to do when you reload state from a snapshot and an event)

This means that the database need to be indexed at first, and the index maintained. So whenever a client connects, we need to check if the store has been indexed, and if it's not the case, scan all the aggregates, and scan for their events and snapshots keys, and finally index the keys (events and snapshots separated obviously). Once done, each time an event is added, its key should be added to the corresponding sorted set.

What do you think about this solution? I'll try to work on an implementation. But if you have any remarks, objections or suggestions, don't hesitate!

Lambda, SNS, DynamoDB

I am working on an event store implementation using AWS SNS, DynamoDB, and Lambdas. Understanding that a DynamoDB implementation would be needed, is there any reason why node-eventstore would be a bad fit? SNS vs Atom?

Eventstream uncommittedEvents possibly overwrites events that were not committed yet

Scenario: Retrieve the evenstream of an aggregate and keep a reference to it, adding new events to it and committing occassionally.

Expected Result: All events added to the stream are actually persisted

Actual Result: Some events are not persisted, because the uncommittedEvents array has been altered between the store.addEvents() call and the callback (async IO) and then gets overwritten with an empty array [].

Solution: Create a copy of the eventstream.uncommittedEvents array and reset the uncommitedEvents early before the IO call, then use the copy inside the callback only and re-append the uncommitted events on error.

multiple snapshots: how to do `getFromSnapshot` using `version`

Browsing through the docs and this looks great.

One thing that caught my eye, is that I want to create a new snapshot after N events have happened. This results in M snapshots over time that I want to be versioned.

createSnapshot allows me to define an optional version. Normally I'd like to retrieve the latest snapshot (is there a shortcut to get to the latest without defining a version ? ) but sometimes I want to fetch an arbitrary version. However, getFromSnapshot (at least from the docs) doesn't seem to allow to pass version

How to do this?

Get and pass all events from event store to node-cqrs-eventdenormalizer

Hello,

Thank you for your library.
The issue concerns how node-eventstore transmits events to node-cqrs-eventdenormalizer.

I want to rebuild a viewmodel by extracting all the events from the store.

For eventdenormalizer I can do:
eventdenormalizer.clear(function(err) {}); eventdenormalizer.replay([/* ordered array of events */], function (err) { if (err) { console.log(err); } });

For node-eventstore I can use to fetch all the events:
es.getEvents()

However, I didn't find the solution to pass my events from the event store to my eventdenormalizer.
Do I need to use eventstore.setEventToDispatched() for each event or is there another way?

Thanks.

Josh

Remove default access keys from azuretable.js

Hey,

I was wondering why I didn't get any error messages using node-eventstore with the azuretable impl without providing access keys. Well the default access keys in azuretable.js do work :) I guess you don't want people go to town on that storage account?

Cheers
Michael

Duplicate revision id

I'm not sure if this is intentional feature or bug...

If I create two events for the same aggregateId at the same time using mongo store (haven't tried others), they are both commited successfully with the same revisionId. I would expect the store to throw for the second event so that it can retry (revalidate on the updated state and then commit if successful).

To illustrate this, given a bank account balance $5. If two commands come at the same time to withdraw $4 and withdraw $3, they will both validate against the perceived state ($5) and thus both will dispatch events, leading to the account being in negative balance.

Expected behavior would be for the first event to be commited to win, and the second to be forced to retry. In the above example, if the 4$ was withdrawn first, second command will be forced to retry against the new state ($1), not pass validation, and reject the command.

I imagine this could be worked around by using a synthetic eventId (e.g aggregateId-expectedRevisionId), but I would think the store should handle this behavior, allowing any arbitrary eventId (eg uuid).

Please correct me if my understanding is incorrect.

Only Getting Undispatched Items per StreamID

Hi, not an issue, but I couldn't find a better way to ask this question. Is there any way that I can get only the undispatched items per streamId? It works in the "add - commit" scenario but not if I do not want to add an item. the getUndispatched function returns ALL undispatched item for the es and I could loop over this, but I guess this is not the most efficient way.

Thank you very much!

How to Init()?

Hi, I'm trying to POC this for storing events - however, I'm finding that I'm not able to get the init() call to complete.

I've got the initialisation code for the es in a file that is pulled into my index.js by a require statement. I can see the Init() line being called and it waits on the process.nextTick(function() { statement.

How should I be initialising the eventstore? My index.js spins up a REST api in express and listens to a port.

Questioning child process for dispatch

Given the event processing via callback characteristics of node.js I'm questioning if forking a child process for event dispatching is necessary. I'm probably missing something but couldn't the same async behavior be achieved without the complexity of child process?

Error: The events passed should all have a streamRevision!

I got the following error when I try to write an event into event store which is empty:

Error: The events passed should all have a streamRevision!
    at Object.EventStream ([projectDirectory]/node_modules/eventstore/lib/eventStream.js:52:15)
    at [projectDirectory]/node_modules/eventstore/lib/eventstore.js:284:22
    at[projectDirectory]/node_modules/eventstore/lib/databases/mongodb.js:333:16
    at handleCallback ([projectDirectory]/node_modules/mongodb/lib/utils.js:95:12)

We are still using nodejs 0.12.7. I traced down to the line 52 in eventStream.js. It appears that, when the events is [], the code is still managed to reach line 49 to cause to crash.

After I changed the line 47-48 to

   for(var i=0, len=events.length; i < len; i++) {
      var evt = events[i];

The crash problem stops.

save fired repeatedly in a loop

Hello guys,

I have a basic view builder like below

module.exports = require('cqrs-eventdenormalizer').defineViewBuilder({
  name: 'detailsUpdated',
  id: 'payload.id',
  context:'workflowContext',
  aggregate:'approval'
}, function(data, vm) {
    vm.set(data);
    debug('saving detailsUpdated', JSON.stringify(data, null, '\t'));
});

The debug statement is fired repeatedly on the console with the time (in ms) it takes for action.
It seems to be re-saving the document repeatedly in a loop. The issue is quite intermittent.

I have to kill the process to stop it from looping.

// log output on console
mymodule saving detailsUpdated +390ms { // JSON I removed }
mymodule saving detailsUpdated +827ms { // JSON I removed }
mymodule saving detailsUpdated +252ms { // JSON I removed }

....
...

I need to fix the issue before the product go-live. Please help.

Idea: event aggregation or projection engine

Whats currently missing is a way of aggregating events and projecting them in a simple manner.

Look at the following pseudo code:

var ag = eventstore.createAggregator("uniqueaggregatorname");

ag.on('event',function(e,done){
    // do event projection here 

    // needs to be called to ensure event was processed.
    done(); 
});

ag.start();

What this dose is to call each event only once per unique aggregator name even if the app is restarted. The current cursor should be automatically saved in the underlying event store storage.

If you think future, this can be extended to a full projection engine like this pseudo code:

var pr = eventstore.createProjection("uniqueprojectionname");

pr.state = {name:"-"};

pr.on('project',function(event,state,done){
    if(event.name=="namechanged"){
        state.name = event.name;
    }
    done();
});

pr.on('stateChanged',function(state){
    console.log(state);
});

pr.start();

The difference is that here we store not only the current cursor, but rather the last projected state too.

Currently this is only an idea to inspire a discussion.

Cannot use custom store when it is brought in as a dependency (v1.1.3)

Adriano,

Current implementation fails when I have more than one version of eventstore module in the project's node-modules tree. If the project makes use both of cqrs-domain and my-custom-store, which both require eventstore, then the project's node-modules tree will look like this:

  • cqrs-domain
    • eventstore (1)
  • my-custom-store
    • eventstore (2)

Then this check:

if (options.type && options.type.prototype instanceof Base) {
    return options.type;
}

will always come back negative, as Base is defined in eventstore (1), while options.type is a sub-class of Base from eventstore (2)

How do you feel about replacing it with this:

if (options.type && typeof options.type === 'function') {
    return options.type;
}

Best,
-Leo

Feature request, commit multiple event streams in one go

I have a situation where I have dependencies between aggregates parent/child relationship. In this case, it would be really useful if it would be possible to say that I wanted to save changes of two (or more) event streams as a single transaction.

Similar to how multiple events are stored using an intermediate transaction document.

If there's a better way to do this, I'm all ears.

[redis] aggregates versioning might be inconsistent

While I was trying to use snapshots with redis, I ran into an issue which causes bugs when reloading my stream.
In some cases, and often under heavy load, redis store can produce multiple events with the same revision. So at a moment, the number of events (eg. 15) is higher than the current revision (eg 13, event revision 5 and 6 are "duplicated").
The problem starts when loading a snapshot and the corresponding events. The last snapshot is created with revision 13. And when the snapshot is loaded, events with revision above 13 are loaded with it, so mainly the 2 latest events (revisions 12 and 13), causing an inconsistent state when reloading from snapshot and events.

The problem is caused by the fact that the streamRevision of an event is based on the last loaded event, here https://github.com/adrai/node-eventstore/blob/663fb9fc3d0765ae22196c1bff3ea9aa8e79b305/lib/eventstore.js#L402-L413
Because this operation is not atomic, under heavy load or with lots of events, another command on the aggregate can cause the load of the aggregate with the same events, before the first one had enough time to commit. Both generated events finally get the same revision.

The problem is not directly in redis store. And I don't know if other stores face the same issue (using transactions ?). There is no transaction in redis, only atomic operations using MULTI (with no dependency on intermediate results) or Lua scripts (when needing to execute an operation depending on the previous result).

I open this ticket to open the discussion on this. I was thinking about a solution, but only for redis: instead of calling directly a SET, we could create a Lua script (easily deployed with redis driver, with all the caching stuff) which could INCR a value corresponding to the actual revision of an aggregate, and use it as the streamRevision, and then make the SET. Can't be safer, all is done atomically. And hopefully, redis loads natively a JSON library for Lua. But the problem is that the stream loaded in memory is no longer in sync. It could causes

Don't hesitate to contribute to this thread. I would love to hear your thoughts. In the mean time, I'll try to make a PR to show my intent.

Events not dispatching.

It seems that events never reach eventdispatcher when using mongodb. I've traced the execution to line 101 of eventstore.js:

    self.dispatcher.send({ action: 'addUndispatchedEvents', payload: JSON.stringify(evts) });

then execution goes into child_process.js and gets swallowed, no errors and the publisher isn't called either.

Here's some code to reproduce:

var eventstore = require('eventstore'),
    storage = require('eventstore.mongoDb');

var publisher = {
    publish: function(evt) {
        console.log('info: ' + new Date().toLocaleString() + ': ' + 'event published');
        //never get's here.
    }
}

store = storage.createStorage({dbName:'mydb'});
store.connect(function(err) {

    var es = eventstore.createStore({logger: 'console'});

    es.configure(function() {
        this.use(publisher);
        this.use(store);
    }).start();

    es.getEventStream('1', 0, -1, function(err,stream) {
        stream.addEvent({id: '1'});
        stream.commit();
    });


})

Missing callback

When tested nodeCQRS https://github.com/jamuhl/nodeCQRS

I get this error when creating an item http://localhost:3000/ using the redis store

node.js:134
throw e; // process.nextTick error, or 'error' event on first tick
^
ReferenceError: callback is not defined
at /Users/jameskyburz/Documents/src/nodeCQRS/node_modules/eventstore.redis/storage.js:280:13
at try_callback (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:466:9)
at RedisClient.return_reply (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:503:13)
at HiredisReplyParser. (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:259:14)
at HiredisReplyParser.emit (events.js:64:17)
at HiredisReplyParser.execute (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/lib/parser/hiredis.js:43:18)
at RedisClient.on_data (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:422:27)
at Socket. (/Users/jameskyburz/Documents/src/nodeCQRS/node_modules/redis/index.js:66:14)
at Socket.emit (events.js:64:17)
at Socket._onReadable (net.js:678:14)

Added pull request

Stoping the Store

Is there any proper way to clean the store?

If you start the EventSotre on the test setup method. What would you do on the tearDown?

Thanks for this work.

MongoDB storage provider still creating the default 'eventstore' db when configured to use a different dbName.

I configured the MongoDB storage provider to use a different dbName, following the readme example. The new db is correctly created and used for event storage, but an "eventstore" db is still created each time I start the server.

Library versions:

eventstore 0.7.6
eventstore.mongodb 0.7.5

Test code:

var eventstore = require('eventstore');
var storage = require('eventstore.mongodb');

var es = eventstore.createStore();
storage.createStorage({
  host: 'localhost',
  port: 27017,
  dbName: 'testEventStore',
  eventsCollectionName: 'events',
  snapshotsCollectionName: 'snapshots'
}, function(err, store) {
  es.configure(function() {
    es.use(store);
  });
  es.start();
});

Geteventstore Integration

Hi,

This question is not any issue with your Eventstore. I use it. It works great.

I was thinking is it a good idea to use Greg's GetEventStore in Node.js?

There is one driver available here :
https://github.com/kenpratt/nodejs-EventStore

Have you attempted this? Do you see any pros & cons in this?

Any feedback is highly appreciated.

Thanks,
Bhoomi.

Kafka support?

Is there a plan on introducing Kafka as one of the event store as well?

Support for mongodb uri connection strings

We construct mongo connection strings and I was wondering if that is supported in this module. I'm mainly asking because we use the different options pretty heavily depending on the environment (local doesn't use a replica set whereas production does).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.