Comments (4)
SOmething is really wrong there, I tried replacing the attribute by the all stream implicit subscription and the sample crashes with null reference exception.
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Orleankka.ActorRef.<<Tell>b__7_0>d.MoveNext() in /Users/alexey/GitHub/Orleankka/Source/Orleankka/ActorRef.cs:line 44
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
at Orleankka.DefaultActorRefMiddleware.<Send>d__1`1.MoveNext() in /Users/alexey/GitHub/Orleankka/Source/Orleankka/ActorRefMiddleware.cs:line 28
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Example.Program.<Run>d__1.MoveNext() in /Users/alexey/GitHub/Orleankka/Samples/CSharp/EventSourcing/Idiomatic/Program.cs:line 43
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Example.Program.<Main>d__0.MoveNext() in /Users/alexey/GitHub/Orleankka/Samples/CSharp/EventSourcing/Idiomatic/Program.cs:line 29
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Example.Program.<Main>()
from orleankka.
Ok, there are really a few issues.
- Subscriptions work in a way that for each new stream that matches the implicit subscription filter, Orleans creates a new grain. Subscribing to multiple streams from one grain is impossible.
- The Regex match for some reason doesn't work although the expression is correct. I solved it by creating another attribute that works with
StartsWith
. - The subscription must be done explicitly when the actor is activated. Events aren't being sent to the actor as other messages.
My changes are:
[StartsWithImplicitStreamSubscription("InventoryItem")]
public class Inventory : DispatchActorGrain, IInventory, IGrainWithGuidCompoundKey
{
async Task On(Activate _)
{
var streamProvider = GetStreamProvider("sms");
var guid = this.GetPrimaryKey(out var extension);
var stream = streamProvider.GetStream<EventEnvelope>(guid, extension);
await stream.SubscribeAsync((envelope, token) => Receive(envelope.Event));
}
void On(InventoryItemCreated e) => items[Id] = new InventoryItemDetails(e.Name, 0, true);
void On(InventoryItemCheckedIn e) => items[Id].Total += e.Quantity;
void On(InventoryItemCheckedOut e) => items[Id].Total -= e.Quantity;
void On(InventoryItemDeactivated e) => items[Id].Active = false;
void On(InventoryItemRenamed e) => items[Id].Name = e.NewName;
static readonly Dictionary<string, InventoryItemDetails> items =
new Dictionary<string, InventoryItemDetails>();
InventoryItemDetails[] Answer(GetInventoryItems _) => items.Values.ToArray();
int Answer(GetInventoryItemsTotal _) => items.Values.Sum(x => x.Total);
}
public class StartsWithPredicate : IStreamNamespacePredicate
{
readonly string startsWith;
public StartsWithPredicate(string startsWith) => this.startsWith = startsWith;
public bool IsMatch(string streamNamespace) => streamNamespace.StartsWith(startsWith);
}
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public class StartsWithImplicitStreamSubscriptionAttribute : ImplicitStreamSubscriptionAttribute
{
public StartsWithImplicitStreamSubscriptionAttribute(string startsWith)
: base(new StartsWithPredicate(startsWith))
{ }
}
I had to remove the typed envelope since the subscription must be created for a specific event type.
And, of course, since the aim here is to collect data across multiple streams, I made the dictionary static, which is not an option for high-frequency processing, but it works for the sake of example.
from orleankka.
Ye, I'm aware of this problem. Unfortunately, the original declarative stream subscriptions in Orleankka were the crutch to support since they required too much pouring into Orleans internals. Moreover, they were creating an illusion of consistency (between stream and projection) which could be harmful to uninitiated developers, blinding them for making the projections reliable.
In a real-world application, the one-to-many projection should subscribe to global log instead of an individual aggregate stream, otherwise, it would be hard to achieve idempotency (it will need to store checkpoints for each stream). In this example, the projection is not idempotent considering at-least-once event delivery semantics.
I think it's better to remove this example completely or find the other way to express the logic, such as directly pushing from the aggregate actor to the projection actor, instead of relying on Orleans' implicit stream subscriptions, which doesn't work for one-to-many relationship. This still won't be idempotent but at least it will not require a static dictionary (which doesn't work in a distributed setting).
from orleankka.
I agree that it should be one actor per projection, not per aggregate actor, but the idea of pushing to an aggregate-type stream doesn't sound appealing to me, since ideally, developers should be able to choose between the aggregate-type stream to make aggregate state snapshots, which is a marginal use-case, since the aggregate actor already keeps its state, and the all
-stream, which I see as a default stream for projections.
from orleankka.
Related Issues (20)
- Document F# bugger or add a startup check HOT 1
- Create template for dotnet cli tool HOT 4
- Orleans AlwaysInterleave question HOT 3
- Missing ServiceId of ClusterOptions in F# samples and Docs HOT 1
- Storage provider issue HOT 1
- F# + recieve message DU instead of object. HOT 3
- Orleankka + Dashboard \n Dashboard can't see methods information
- What should I do to call orleankka actor from orleans silo? HOT 5
- Is ImplicitStreamSubscription supported? HOT 5
- .Net Core 3 Preview 6 Idiomatic Example HOT 4
- Auto.Interfaces assembly generated in wrong folder HOT 4
- Undefined UseOrleankka HOT 1
- Is this project active? HOT 4
- Feature request: StreamRefMiddleware HOT 3
- Allow IActorGrain and GrainWithStringKey in inherited interface
- Nake.bat uses version of EventStore no longer available
- Missing NuGet for Orleankka.TestKit version 7.0.0 HOT 2
- ActorPath changes between ActorRef and ActorGrain HOT 4
- StreamRef and ImplicitStreamSubscription issues HOT 2
- Impact of Orleans 7.0 Stream Changes to Memory Stream / SMS
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from orleankka.