Comments (12)
Which version you are on now?
from orleankka.
0.23.0
from orleankka.
ye, that's old. Do you use declarative stream subscriptions?
from orleankka.
No, in this case, we are using explicit subscriptions.
from orleankka.
If you're not using declarative stream subscriptions - you can try latest version (I'll push update today) there this was fixed.
from orleankka.
We are using declarative stream subscriptions on some other parts of the application. I can't update until we figure out migration path for declarative subscriptions.
But I can make some reflection magic, to mimic fixes of the latest version.
from orleankka.
yep, should not be hard ))
from orleankka.
https://github.com/OrleansContrib/Orleankka/blob/v1/Source/Orleankka/StreamSubscription.cs#L13 get this one by reflection
from orleankka.
It almost works. =D
static class StreamSubscriptionExtensions
{
private static readonly FieldInfo handle = typeof(StreamSubscription).GetTypeInfo().GetDeclaredField("handle");
private static readonly MethodInfo GetAllSubscriptionHandles = typeof(StreamRef).GetTypeInfo().GetDeclaredMethod("GetAllSubscriptionHandles");
/// <summary>
/// Returns a list of all current stream subscriptions.
/// </summary>
/// <returns> A promise for a list of StreamSubscription </returns>
public static async Task<List<StreamSubscription>> Subscriptions(this StreamRef streamRef)
{
var handles = await (Task<IList<StreamSubscriptionHandle<object>>>)GetAllSubscriptionHandles.Invoke(streamRef, null);
return handles.Select(CreateStreamSubscription).ToList();
}
private static StreamSubscription CreateStreamSubscription(StreamSubscriptionHandle<object> x)
{
return Activator.CreateInstance(typeof(StreamSubscription), BindingFlags.Instance | BindingFlags.NonPublic, null, new object[] { x }, null, null) as StreamSubscription;
}
public static Task Resume(this StreamSubscription streamSubscription, Func<object, Task> callback)
{
Requires.NotNull(callback, nameof(callback));
var observer = new Observer((item, token) => callback(item));
var subscriptionHandle = (StreamSubscriptionHandle<object>)handle.GetValue(streamSubscription);
return subscriptionHandle.ResumeAsync(observer);
}
public static Task Resume<T>(this StreamSubscription streamSubscription, Func<T, Task> callback)
{
Requires.NotNull(callback, nameof(callback));
return streamSubscription.Resume(item => callback((T)item));
}
class Observer : IAsyncObserver<object>
{
readonly Func<object, StreamSequenceToken, Task> callback;
public Observer(Func<object, StreamSequenceToken, Task> callback)
{
this.callback = callback;
}
public Task OnNextAsync(object item, StreamSequenceToken token = null)
=> callback(item, token);
public Task OnCompletedAsync() => Task.CompletedTask;
public Task OnErrorAsync(Exception ex) => Task.CompletedTask;
}
}
After calling StreamSubscription.Resume
, StreamSubscriptionHandle.observer
remains null
. I will just unsubscribe all subscriptions and resubscribe, for now. =D
from orleankka.
I've never understood why Orleans guys had not implemented subscriptions in idempotent way. Duplicate subscriptions from the same client/actor in 100% of cases is a mistake.
P.S. "StreamSubscriptionHandle.observer remains null" might be a normal behavior. Check Orleans code ..
from orleankka.
I totally agree with you and now I have discovered one more surprise...
subscriptionHandle.ResumeAsync(observer)
returns Task<StreamSubscriptionHandle<object>>
- > new valid handle while old handle becomes invalid.
I think that latest Orleankka codebase does not take this behavior into account.
from orleankka.
Nice catch!
from orleankka.
Related Issues (20)
- Exceptions in loading assemblies to ConfigureApplicationParts HOT 10
- Document F# bugger or add a startup check HOT 1
- Create template for dotnet cli tool HOT 4
- Orleans AlwaysInterleave question HOT 3
- Missing ServiceId of ClusterOptions in F# samples and Docs HOT 1
- Storage provider issue HOT 1
- F# + recieve message DU instead of object. HOT 3
- Orleankka + Dashboard \n Dashboard can't see methods information
- What should I do to call orleankka actor from orleans silo? HOT 5
- Is ImplicitStreamSubscription supported? HOT 5
- .Net Core 3 Preview 6 Idiomatic Example HOT 4
- Auto.Interfaces assembly generated in wrong folder HOT 4
- Undefined UseOrleankka HOT 1
- Is this project active? HOT 4
- Feature request: StreamRefMiddleware HOT 3
- Allow IActorGrain and GrainWithStringKey in inherited interface
- Nake.bat uses version of EventStore no longer available
- Missing NuGet for Orleankka.TestKit version 7.0.0 HOT 2
- ActorPath changes between ActorRef and ActorGrain HOT 4
- StreamRef and ImplicitStreamSubscription issues HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from orleankka.