Git Product home page Git Product logo

Comments (4)

alexeyzimarev avatar alexeyzimarev commented on September 23, 2024

SOmething is really wrong there, I tried replacing the attribute by the all stream implicit subscription and the sample crashes with null reference exception.

   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Orleankka.ActorRef.<<Tell>b__7_0>d.MoveNext() in /Users/alexey/GitHub/Orleankka/Source/Orleankka/ActorRef.cs:line 44
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at Orleankka.DefaultActorRefMiddleware.<Send>d__1`1.MoveNext() in /Users/alexey/GitHub/Orleankka/Source/Orleankka/ActorRefMiddleware.cs:line 28
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Example.Program.<Run>d__1.MoveNext() in /Users/alexey/GitHub/Orleankka/Samples/CSharp/EventSourcing/Idiomatic/Program.cs:line 43
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Example.Program.<Main>d__0.MoveNext() in /Users/alexey/GitHub/Orleankka/Samples/CSharp/EventSourcing/Idiomatic/Program.cs:line 29
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Example.Program.<Main>()

from orleankka.

alexeyzimarev avatar alexeyzimarev commented on September 23, 2024

Ok, there are really a few issues.

  • Subscriptions work in a way that for each new stream that matches the implicit subscription filter, Orleans creates a new grain. Subscribing to multiple streams from one grain is impossible.
  • The Regex match for some reason doesn't work although the expression is correct. I solved it by creating another attribute that works with StartsWith.
  • The subscription must be done explicitly when the actor is activated. Events aren't being sent to the actor as other messages.

My changes are:

    [StartsWithImplicitStreamSubscription("InventoryItem")]
    public class Inventory : DispatchActorGrain, IInventory, IGrainWithGuidCompoundKey
    {
        async Task On(Activate _)
        {
            var streamProvider = GetStreamProvider("sms");

            var guid = this.GetPrimaryKey(out var extension);
            var stream = streamProvider.GetStream<EventEnvelope>(guid, extension);

            await stream.SubscribeAsync((envelope, token) => Receive(envelope.Event));
        }
        
        void On(InventoryItemCreated e)     => items[Id] = new InventoryItemDetails(e.Name, 0, true);
        void On(InventoryItemCheckedIn e)   => items[Id].Total += e.Quantity;
        void On(InventoryItemCheckedOut e)  => items[Id].Total -= e.Quantity;
        void On(InventoryItemDeactivated e) => items[Id].Active = false;
        void On(InventoryItemRenamed e)     => items[Id].Name   = e.NewName;
        
        static readonly Dictionary<string, InventoryItemDetails> items =
            new Dictionary<string, InventoryItemDetails>();

        InventoryItemDetails[] Answer(GetInventoryItems _) => items.Values.ToArray();
        int Answer(GetInventoryItemsTotal _) => items.Values.Sum(x => x.Total);
    }

    public class StartsWithPredicate : IStreamNamespacePredicate
    {
        readonly string startsWith;

        public StartsWithPredicate(string startsWith) => this.startsWith = startsWith;

        public bool IsMatch(string streamNamespace) => streamNamespace.StartsWith(startsWith);
    }

    [AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
    public class StartsWithImplicitStreamSubscriptionAttribute : ImplicitStreamSubscriptionAttribute
    {
        public StartsWithImplicitStreamSubscriptionAttribute(string startsWith)
            : base(new StartsWithPredicate(startsWith))
        { }
    }

I had to remove the typed envelope since the subscription must be created for a specific event type.

And, of course, since the aim here is to collect data across multiple streams, I made the dictionary static, which is not an option for high-frequency processing, but it works for the sake of example.

from orleankka.

yevhen avatar yevhen commented on September 23, 2024

Ye, I'm aware of this problem. Unfortunately, the original declarative stream subscriptions in Orleankka were the crutch to support since they required too much pouring into Orleans internals. Moreover, they were creating an illusion of consistency (between stream and projection) which could be harmful to uninitiated developers, blinding them for making the projections reliable.

In a real-world application, the one-to-many projection should subscribe to global log instead of an individual aggregate stream, otherwise, it would be hard to achieve idempotency (it will need to store checkpoints for each stream). In this example, the projection is not idempotent considering at-least-once event delivery semantics.

I think it's better to remove this example completely or find the other way to express the logic, such as directly pushing from the aggregate actor to the projection actor, instead of relying on Orleans' implicit stream subscriptions, which doesn't work for one-to-many relationship. This still won't be idempotent but at least it will not require a static dictionary (which doesn't work in a distributed setting).

from orleankka.

alexeyzimarev avatar alexeyzimarev commented on September 23, 2024

I agree that it should be one actor per projection, not per aggregate actor, but the idea of pushing to an aggregate-type stream doesn't sound appealing to me, since ideally, developers should be able to choose between the aggregate-type stream to make aggregate state snapshots, which is a marginal use-case, since the aggregate actor already keeps its state, and the all-stream, which I see as a default stream for projections.

from orleankka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.