Git Product home page Git Product logo

stan.net's Introduction

NATS .NET C# Streaming Client

WARNING: Product reached end of life ⚠️

NATS Streaming reached its end of life.

It is no longer supported and has been replaced by Jetstream

JetStream is build into the NATS Server and supported by all major clients. Check examples here

stan.net's People

Contributors

colinsullivan1 avatar dalebingham avatar danielwertheim avatar dependabot[bot] avatar dorny avatar dungpa avatar gcolliso avatar itn3000 avatar jarema avatar jeppevammenkristensen avatar scottf avatar semax avatar sixlettervariables avatar sujitn avatar watfordgnf avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

stan.net's Issues

How do you connect to a remote streaming server?

Hi,

using (IStanConnection c = cf.CreateConnection("test-cluster", ClientId)

When creating a conenction to the server, how do you do so when the client and the server are on two different machines? CrateConnection() only appears to offer the ability to offer a ClusterId as a means of connection. How do you specify a remote IP Address?

Can't connect to nats streaming server in Docker cluster

It seems I get a connection time out every time I try to connect to a cluster on the docker container. I have no problem connecting with the Nats Client. The streaming client works until I turn on clustering.

Here's how I fire up the cluster.

docker network create --driver bridge nats-cluster
docker run --network=nats-cluster -it --name a -p 0.0.0.0:4222:4222 -p 0.0.0.0:4248:4248 -v C:/data:/data nats-streaming -cid mycluster -st FILE -dir /data/a -cluster_log_path /data/mycluster --clustered --cluster_node_id a --cluster_peers b --cluster nats://0.0.0.0:4248 -routes nats-route://b:5248 -SDV

Here's how I try to connect

var factory = new StanConnectionFactory();
var opts = StanOptions.GetDefaultOptions();
opts.NatsURL = "nats://localhost:4222";
var connection = factory.CreateConnection("mycluster", "mytestapp", opts);

Here's the exception

STAN.Client.StanConnectRequestTimeoutException: 'Connection Request Timed out.'

The non streaming client works fine. No errors doing this:

var fac = new ConnectionFactory();
var opts = ConnectionFactory.GetDefaultOptions();
opts.Url = "nats://localhost:4222";
fac.CreateConnection(opts);

navigating to localhost:8222 still brings up the monitor, I just can't make a connection using the nats streaming client.

Am I doing something wrong?

Messages get redelivered even if subscriber handler takes less than AckWait

The issue can be reproduced in the subscriber example if a small delay (like 100ms) is introduced:

            EventHandler<StanMsgHandlerArgs> msgHandler = (sender, args) =>
            {
                if (received == 0)
                    sw.Start();

                received++;

                if (verbose)
                {
                    Console.WriteLine("Received seq # {0}: {1}",
                        args.Message.Sequence,
                        System.Text.Encoding.UTF8.GetString(args.Message.Data));
                }

                Thread.Sleep(100);

                if (received >= count)
                {
                    sw.Stop();
                    ev.Set();
                }
            };

Even if the handler processing is much shorter than the default subscription AckWait time of 30s, a large part of the messages (2000-3000 out of 10000) get redelivered, some of them up to 8 times.

If the AckWait time is reduced to 1s the redelivered message are much more frequent and the program terminates with NATSSlowConsumerException. The issue does not appear if the thread sleep is not present in the message handler.

As a mention, for the nats server we use nats-streaming:0.12.2

Connection Request Timed out.

Exception

{STAN.Client.StanConnectRequestTimeoutException: Connection Request Timed out.
at STAN.Client.Connection..ctor(String stanClusterID, String clientID, StanOptions options)
at STAN.Client.StanConnectionFactory.CreateConnection(String clusterID, String clientID, StanOptions options)

I have wery simple scenario:


string url = "nats://127.0.0.1:4222";
            string clientID = "test-client";
            string clusterID = "test-cluster";

            var opt = StanOptions.GetDefaultOptions();
            opt.NatsURL = url;
            var cf = new StanConnectionFactory();
            using (var c = cf.CreateConnection(clusterID, clientID, opt))
            {
                c.Publish(Subject, ObjectToByteArray(command));
            }

and I am starting server as "nats-streaming-server.exe"

[19876] 2019/04/12 22:34:19.468122 [�[32mINF�[0m] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.2
[19876] 2019/04/12 22:34:19.470122 [�[32mINF�[0m] STREAM: ServerID: A7cAW2Grx60dK2ASZ2Plj6
[19876] 2019/04/12 22:34:19.470122 [�[32mINF�[0m] STREAM: Go version: go1.11.6
[19876] 2019/04/12 22:34:19.471123 [�[32mINF�[0m] STREAM: Git commit: [not set]
[19876] 2019/04/12 22:34:19.472123 [�[32mINF�[0m] Starting nats-server version 1.4.1
[19876] 2019/04/12 22:34:19.472123 [�[32mINF�[0m] Git commit [not set]
[19876] 2019/04/12 22:34:19.474123 [�[32mINF�[0m] Listening for client connections on 0.0.0.0:4222
[19876] 2019/04/12 22:34:19.474123 [�[32mINF�[0m] Server is ready
[19876] 2019/04/12 22:34:19.532121 [�[32mINF�[0m] STREAM: Recovering the state...
[19876] 2019/04/12 22:34:19.533123 [�[32mINF�[0m] STREAM: No recovered state
[19876] 2019/04/12 22:34:19.784123 [�[32mINF�[0m] STREAM: Message store is MEMORY
[19876] 2019/04/12 22:34:19.784123 [�[32mINF�[0m] STREAM: ---------- Store Limits ----------
[19876] 2019/04/12 22:34:19.786121 [�[32mINF�[0m] STREAM: Channels: 100 *
[19876] 2019/04/12 22:34:19.787121 [�[32mINF�[0m] STREAM: --------- Channels Limits --------
[19876] 2019/04/12 22:34:19.788121 [�[32mINF�[0m] STREAM: Subscriptions: 1000 *
[19876] 2019/04/12 22:34:19.789122 [�[32mINF�[0m] STREAM: Messages : 1000000 *
[19876] 2019/04/12 22:34:19.789122 [�[32mINF�[0m] STREAM: Bytes : 976.56 MB *
[19876] 2019/04/12 22:34:19.790122 [�[32mINF�[0m] STREAM: Age : unlimited *
[19876] 2019/04/12 22:34:19.791121 [�[32mINF�[0m] STREAM: Inactivity : unlimited *
[19876] 2019/04/12 22:34:19.792121 [�[32mINF�[0m] STREAM: ----------------------------------

What can be wrong. It is very simple scenario and should work

Disposing NATSConnection immediately after unsubscribing throws StanBadSubscriptionException

The following code throws StanBadSubscriptionException. I have 50 subscriptions in the _subscriptions array.

foreach (var subscription in _subscriptions) {
    subscription.Unsubscribe();
}
_stanConnection.NATSConnection.Dispose();
_stanConnection.Dispose();

The error is as below

Failed to stop service. STAN.Client.StanBadSubscriptionException: Invalid subscription.
   at STAN.Client.AsyncSubscription.unsubscribe(Boolean close)
   at STAN.Client.AsyncSubscription.Unsubscribe()

But if I put a delay, everything is OK.

foreach (var subscription in _subscriptions) {
    subscription.Close();
}
Task.Delay(2000).ContinueWith((t) => {
    _stanConnection.NATSConnection.Dispose();
    _stanConnection.Dispose();
});

I think, Close() method does not finish closing when exiting the method.

Publish Acks always timeout

Hi guys, I'm facing the exact same issue described earlier at:

#13

I've already tried raising my timeout limit to 30 secs, and max pubs in flight to around 30K. That resulted in the timeouts happening later, exactly at 30K messages sent.

I know I am publishing too fast, actually I'm publishing 1.5 million messages in a loop as fast as I can, but .. this is a streaming server that should handle the load, right [?]

  • Also, this behavior doesn't change even if I Thread.Sleep(...) for 50millis between publishing messages. It seems that no Acks are returned to the publisher whatsoever though, as soon as it reaches the "InFlight" limit it breaks down. Not a single Ack returned.

This is the code that initializes my connection:

var opts = StanOptions.GetDefaultOptions();
opts.PubAckWait = 30 * 1000;
opts.MaxPubAcksInFlight = 32 * 1000;

        return _connectionFactory.CreateConnection("test-cluster", clientID ?? Guid.NewGuid().ToString(), opts);

And then, my publisher uses that connection like so:

this.Connection.Publish(base.Topic, ObjectToByteArray(message));

Could it be that I'm doing something wrong ?

=============== Edit ============================

After some more searching & poking around, I used the publish overload that also accepts an Ack event handler, to get a better idea of what is going on exactly.

Now, acks return, but the code now blocks until an Ack is received or while the not Ack-ed limit has been reached [ ? not really sure which right now ].

In any case, messages now seem to get through, but I sometimes get 1 failure and 2 success callbacks for a single message. Seems like when a pub times-out, I will get one failure and two success Acks immediately after that, and that seems to be "the" behavior.

Is all this normal ? Is this the throughput I should be expecting overall ? Right now I guess posting 500 simple small string messages seems to require something like half an hour to complete !
( I'm just posting "Test message N" right now mind you )

Thanks a lot,
Angelo

Message redelivered even if already acknowledged

My subscriber can do long work. Sometimes they are greater than default ack wait time (30sec).
So I try to manually ack my message, doing like this:

`StanSubscriptionOptions sOpts = StanSubscriptionOptions.GetDefaultOptions();

        sOpts.ManualAcks = true;

        EventHandler<StanMsgHandlerArgs> msgHandler = (sender, args) =>

        {

            args.Message.Ack();

            Thread.Sleep(40000);

        };

        sOpts.DurableName = "my-durable";

        var s = c.Subscribe(subject, qGroup, sOpts, msgHandler);

`

In this code, subscriber does a work for 40sec.
If I send 2 messages, the second has always redelivered.
What is my mistake?

Document NATS connection lifetime

Add documentation describing the underlying NATS connection lifetime vs the lifetime of a connection passed in through options. Include discussion disposability and whether the app or STAN has the responsibility to do so.

Multiple connections in multithreaded environment fail

Tried to use multiple NATS clients in multithreaded environment, but connections are failing with Invalid connection or Connection Request Time out when number of items scheduled in thread pool is above 6 (the number varies between 6 and 13).

This is my test code:

class Program
{
  static void Main(string[] args)
  {
    for (int i = 1; i < 16; i++)
    {
      QueueAndExecute(i);
    }

    Console.ReadLine();
  }

  private static void QueueAndExecute(int instances)
  {
    var finished = new CountdownEvent(1);

    for (var i = 0; i < instances; i++)
    {
      var instance = i + 1;

      finished.AddCount();
      ThreadPool.QueueUserWorkItem(state =>
      {
        var clientId = Guid.NewGuid().ToString("N");

        try
        {
          using (var c = new StanConnectionFactory().CreateConnection("test-cluster", clientId))
          {
            var bytes = Encoding.UTF8.GetBytes("this is test");

            c.Publish("test-channel", bytes);
            Console.WriteLine($"{instance}/{instances}: published");
          }
        }
        catch (Exception e)
        {
          Console.WriteLine($"{instance}/{instances}: {e.Message}");
        }

        finished.Signal();
      });
    }

    Console.WriteLine($"queued {instances}");
    finished.Signal();
    finished.Wait();
  }
}

Which produces following output:

queued 1
1/1: published
queued 2
2/2: published
1/2: published
queued 3
3/3: published
2/3: published
1/3: published
queued 4
1/4: published
3/4: published
2/4: published
4/4: published
queued 5
2/5: published
3/5: published
1/5: published
4/5: published
5/5: published
queued 6
3/6: published
1/6: published
2/6: published
6/6: published
5/6: published
4/6: published
queued 7
7/7: Invalid connection.
4/7: Invalid connection.
5/7: Invalid connection.
6/7: Connection Request Timed out.
2/7: Connection Request Timed out.
1/7: Connection Request Timed out.
3/7: published
queued 8
1/8: Invalid connection.
2/8: Invalid connection.
7/8: Invalid connection.
4/8: Invalid connection.
5/8: Invalid connection.
3/8: Invalid connection.
6/8: Invalid connection.
8/8: Invalid connection.
queued 9
1/9: Invalid connection.
2/9: Invalid connection.
3/9: Invalid connection.
4/9: Invalid connection.
5/9: Invalid connection.
6/9: Invalid connection.
7/9: Invalid connection.
8/9: Invalid connection.
9/9: Invalid connection.
queued 10
1/10: Invalid connection.
2/10: Invalid connection.
3/10: Invalid connection.
4/10: Invalid connection.
5/10: Invalid connection.
6/10: Invalid connection.
7/10: Invalid connection.
8/10: Invalid connection.
9/10: Invalid connection.
10/10: Invalid connection.
queued 11
1/11: Invalid connection.
2/11: Invalid connection.
3/11: Invalid connection.
4/11: Invalid connection.
5/11: Invalid connection.
6/11: Invalid connection.
7/11: Invalid connection.
8/11: Invalid connection.
9/11: Invalid connection.
10/11: Invalid connection.
11/11: Invalid connection.
queued 12
1/12: Invalid connection.
2/12: Invalid connection.
3/12: Invalid connection.
4/12: Invalid connection.
5/12: Invalid connection.
6/12: Invalid connection.
7/12: Invalid connection.
8/12: Invalid connection.
9/12: Invalid connection.
10/12: Invalid connection.
11/12: Invalid connection.
12/12: published
queued 13
1/13: Invalid connection.
2/13: Invalid connection.
3/13: Invalid connection.
4/13: Invalid connection.
5/13: Invalid connection.
6/13: Invalid connection.
7/13: Invalid connection.
8/13: Invalid connection.
9/13: Invalid connection.
10/13: Invalid connection.
11/13: Invalid connection.
12/13: Invalid connection.
13/13: Invalid connection.
queued 14
1/14: Invalid connection.
2/14: Invalid connection.
3/14: Invalid connection.
4/14: Invalid connection.
5/14: Invalid connection.
6/14: Invalid connection.
7/14: Invalid connection.
8/14: Invalid connection.
9/14: Invalid connection.
10/14: Invalid connection.
11/14: Invalid connection.
12/14: Invalid connection.
13/14: Invalid connection.
14/14: Invalid connection.
queued 15
1/15: Invalid connection.
2/15: Invalid connection.
3/15: Invalid connection.
4/15: Invalid connection.
5/15: Invalid connection.
6/15: Invalid connection.
7/15: Invalid connection.
8/15: Invalid connection.
9/15: Invalid connection.
10/15: Invalid connection.
11/15: Invalid connection.
12/15: Invalid connection.
13/15: Invalid connection.
14/15: Invalid connection.
15/15: published

I have nats-streaming-server version 0.10.2 running on my local machine (windows 10, 64-bit). Thread safety of creating connections seems to not be mentioned in documentation anywhere so I assume this should be allowed. When I lock around CreateConnection everything works fine.

Originally submitted by @mabzd as nats-io/nats.net#224

Ack seems not to be working

Hi,

I've faced a weird issue concerning the subscriber's ack mechanisms.
It looks like if they would neither work with auto nor with manual acking.

The publisher retrieves an exception after publishing 16384 messages, which is the defualt AckWait-Value.
The weird thing is, that my subscriber obviously receives and (manually) acks messages.

Regards

Is this still viable?

Hello,
I am currently considering moving off RabbitMq due to the lack of good client libraries ( most assume that they will always talk to another c# client, which is not viable in our ecosystem).
But the lack of activity on this repro concerns me - it does not look like bugs gets fixed etc. I am kind of burned by the lack of development on the RawRabbit Client so I would like to know what your plans are?

Thanks!

One thread per subscription seems suboptimal

I have a backend system which is based on NATS Streaming and microservices. Every service listens to few subscriptions and process messages. During integration tests we run 3-5 such services in a single process. I noticed that total number of threads became very high, it tops 200 threads and 80% them are NATS async subscriptions waiting for data (stacktrace below).

NATS.Client.dll!NATS.Client.Channel<NATS.Client.Msg>.get(int timeout, NATS.Client.Msg[] buffer)
NATS.Client.dll!NATS.Client.Connection.deliverMsgs(NATS.Client.Channel<NATS.Client.Msg> ch)
NATS.Client.dll!NATS.Client.AsyncSubscription.enableAsyncProcessing.AnonymousMethod__9_0()
[External Code] (Unknown Source:0)

I imagine such huge number of threads comes from multiple subscriptions created by our services. But 200 threads is a bit too high. I wonder if there is a way to limit number of threads while still listening to all required subscription?

Messages get redelivered after a client disconnects

Hi!

I have three clients getting messages from a three node cluster. All clients are connected to the same node.

They are subscribing a channel and set to be durable and with the same queue group. I publish 200 messages to the channel and the three clients starts receiving all messages. It works fine.

If I limit a client to just receive 10 messages, then close, the remainder of the messages that the client would supposedly get from the 200 are not received until they are redelivered. If I reconnect the client it sits idly until new messages arrive.

MaxInflight is set to just 1 message ... so I would expect the other clients to "immediately" get the remaining messages in queue if a client disconnects (by using close), not wait for redelivery.

What am I missing?

Erratic behavior when durable name is set

When I use a simple console app with simple example everything seems ok but on my actual code with the same configurations it's somehow erratic when durable name is set. I went through the STAN's client code and saw that it doesn't get any callback from internal nats connection's subscription:

inboxSub = sc.NATSConnection.SubscribeAsync(inbox, sc.processMsg);

internal void processMsg(object sender, MsgHandlerEventArgs args)
{
bool isClosed = false;
AsyncSubscription sub = null;
Msg raw = null;

    MsgProto mp = new MsgProto();
    ProtocolSerializer.unmarshal(args.Message.Data, mp);

    raw = args.Message;

    lock (mu)
    {
        isClosed = (nc == null);
        subMap.TryGetValue(raw.Subject, out sub);
    }

    if (isClosed || sub == null)
        return;

    sub.processMsg(mp);
}

It seems that some delay or time dependent issue is causing this.

NATSTimeoutException when we supply a non existing or wrong cluster id

Hi,
not sure if i m going to get my answer as i m not seeing a lot of activities around Nats Client using .net,

while i m trying to use Nats streaming server i start getting a misleading exception ( NATSTimeoutException )

at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout)
at NATS.Client.Connection.request(String subject, Byte[] data, Int32 offset, Int32 count, Int32 timeout)

when i provided a wrong cluster id , is this the intent or just triggering the wrong exception
Thanks

SubscriptionOptions.StartAt(DateTime time) should take in a UTC not a local time

I was using the DateTime overloaded method for subscriptionOptions.startAt() and I noticed that you have to pass in a local DateTime object to get the correct behavior. Looking at AsyncSubscription.cs line 77 (DateTime.Now - options.startTime)); shouldn't this be using DateTime.UtcNow? so the user has to pass in a UTC DateTime?

FWIW java and node stan clients do StartAtTime with respect to UTC.
Thanks!

adding option for choosing Close() or Unsubscribe() on Disposing IStanSubscription

I wrote the following code.

namespace StanTest
{
    using System;
    using STAN.Client;
    using System.Threading.Tasks;
    using System.Threading;
    using System.Text;
    static class Tests
    {
        public static async Task CreateProducerTask(StanConnectionFactory cf, StanOptions opts)
        {
            using (var c = cf.CreateConnection("test-cluster", "stantest-producer", opts))
            {
                for (int i = 0; i < 1000; i++)
                {
                    await c.PublishAsync("foo", Encoding.UTF8.GetBytes($"x{i}")).ConfigureAwait(false);
                    await Task.Delay(10).ConfigureAwait(false);
                }
            }
            Console.WriteLine($"producer done");
        }
        public static async Task CreateConsumerTask(StanConnectionFactory cf, StanOptions opts, CancellationToken ct)
        {
            var subopts = StanSubscriptionOptions.GetDefaultOptions();
            // set DurableName for resuming.
            subopts.DurableName = "standurable";
            while (!ct.IsCancellationRequested)
            {
                using (var c = cf.CreateConnection("test-cluster", "stantest-consumer", opts))
                {
                    using (var sub = c.Subscribe("foo", subopts, (sender, ev) =>
                    {
                        Console.WriteLine($"recv:{Encoding.UTF8.GetString(ev.Message.Data)}");
                    }))
                    {
                        await Task.Delay(1000).ConfigureAwait(false);
                        // sub.Close();
                    }
                }
                // simulate reboot service
                Console.WriteLine($"rebooting subscription");
                await Task.Delay(1000).ConfigureAwait(false);
                Console.WriteLine($"rebooting subscription done");
            }
        }
        public static async Task DoTest(string natsUrl)
        {
            var cf = new StanConnectionFactory();
            var opts = StanOptions.GetDefaultOptions();
            opts.NatsURL = natsUrl;
            using (var cts = new CancellationTokenSource())
            {
                await Task.WhenAll(
                    Task.Run(async () =>
                    {
                        await CreateProducerTask(cf, opts).ConfigureAwait(false);
                        cts.Cancel();
                    })
                    ,
                    CreateConsumerTask(cf, opts, cts.Token)
                ).ConfigureAwait(false);
            }
        }
    }
}

I expected to output string "x[number]" continuously.
but some messages sent while closing subscriptions cannot be received by subscription.
This can be avoided by executing Close() before Dispose().(by surrounding try-finally)
So I suggest the option for choosing Close() or Unsubscribe() when disposing subscription.

InvalidOperationException in cleanupOnClose

Reported via Slack:

We got another error now thrown from the STAN.Client ...

    at System.Collections.Generic.Dictionary`2.KeyCollection.Enumerator.MoveNext()
    at STAN.Client.Connection.cleanupOnClose(Exception ex)
    at STAN.Client.Connection.closeDueToPing(Exception ex)
    at STAN.Client.Connection.pingServer(Object state)
    at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
 --- End of stack trace from previous location where exception was thrown ---
    at System.Threading.TimerQueueTimer.CallCallback()
    at System.Threading.TimerQueueTimer.Fire()
    at System.Threading.TimerQueue.FireNextTimers()

It's a bit of code that in a timer reports the current connection state of the underlying NATS connection:

            if (_stanConnection?.NATSConnection != null)
                return _stanConnection.NATSConnection.State;

.NET 4.5 support

This may be required for some users; so long as there are no compatibility issues, support should be added.

Connection fails when clusterId is anything but "test-cluster"

This works:
var natsConnection = new StanConnectionFactory().CreateConnection("test-cluster", "something-else");

This does not
var natsConnection = new StanConnectionFactory().CreateConnection("my-cluster", "something-else");

Is this somehow tied to the configuration of the cluster or otherwise intended behaviour?

Connection state is still Connected after loosing connection

For some reason Connection.NATSConnection.State is still CONNECTED after i kill the server.

In NATS Client i had a same issue but i managed to solve it with:

opts.PingInterval = 1000;
opts.MaxPingsOut = 2;

But this options are not available in csharp-nats-streaming.

Have to close IConnection manually when closing ISTANConnection

I saw some rogue connection on my NATS-server and they kept increasing in numbers.

I noticed that simply closing the ISTANConnection wouldn't unsubscribe the internal IConnection. I have to manually close the IConnection before closing the ISTANConnection to release the connection to my server.

_stanConnection.NATSConnection.Close();
_stanConnection.Close();

Even if i try to dispose the ISTANConnection the connection to the NATS server remains.

Why StanConnection blocks on PublishAck delete?

I struggle with one deadlock in my software and I think NATS Streaming is to blame. ASK timeout handler removes PublishAck object from blocking collection right before ACK finally arrives. This makes ACL processing to forever block on empty collection.

I plan to fix this by making that collection non blocking on remove. But I wonder if this is a good idea in the first place. I've spent just a few hours learning this codebase and don't want to ruin my cluster with hard to detect issues due to this changes. Hopefully somebody with better understanding of internals can tell my why 'pubAckMap' in StanConnection blocks on deletes.

Sample piece of my log file showing how whole thing ends up in a deadlock.
[02:47:26.027785 031 DBG] STAN before publish
[02:47:26.031718 031 DBG] STAN Locked mu
[02:47:26.032388 031 DBG] STAN TryAdd
[02:47:26.032536 031 DBG] STAN TryAdd DONE
[02:47:26.032737 031 DBG] STAN after publish
[02:47:28.039002 032 DBG] STAN Remove ackTimerCb
[02:47:28.039228 032 DBG] STAN Remove DONE

[02:47:28.470116 111 DBG] STAN Remove processAck <<< At this point calls to 'publish' will lock forever on 'mu' object resulting in a deadlock.

Having trouble detecting when server connection is lost

I have been playing around with different methods of detecting when the connection to the server has been lost. None seem that great.

Would you be open to a pull request based on server-to-client heartbeats? I'm thinking something in the direction of an event or callback that is invoked when heartbeats has not been received for X seconds Y times in a row?

After losing connection client is in invalid state

This problem is similar to #73 .
After losing connection to NATS Streaming server client is in invalid state. It can't close and/or dispose connection (instance of STAN.Client.IStanConnection), in this case application just stops working, waiting for connection to close. If one doesn't close the connection, application can't close as in #73 case.
Code to reproduce error with NATS Streaming server deployed on localhost:

class Program
{
  static void Main(string[] args)
  {
    var opts = StanOptions.GetDefaultOptions();
    opts.NatsURL = "127.0.0.1:4222";
    var cf = new StanConnectionFactory();
    var conn = cf.CreateConnection("test-cluster", "TEST_CLIENT_SENDER", opts);
    conn.Subscribe("test1", (obj, msgArgs) =>
    {
                    Console.Write($"[{System.Threading.Thread.CurrentThread.ManagedThreadId}] Message from subscription \"{msgArgs.Message.Subscription}\": ");
                    Console.WriteLine($"{System.Text.Encoding.UTF8.GetString(msgArgs.Message.Data)} \"");
    });
    
    while (true)
    {
      string inp = Console.ReadLine();
      if (inp is null || inp == "q") break;
      try
      {
        conn.Publish("test1", System.Text.Encoding.UTF8.GetBytes(msg));
      }
      catch
      {
        //If connection is lost, first StanException will occur (because of timeout),
        //then we try to close old connection and release resources. Yet we fail.
        //To simulate loss of connection, one can simply restart NATS Streaming server.
        //Leave one of two lines below
        conn.Close();
        //conn.Dispose();
        conn = cf.CreateConnection("test-cluster", "TEST_CLIENT_SENDER", opts);
        conn.Publish("test1", System.Text.Encoding.UTF8.GetBytes(msg));
      }
    }
  }
}

I found a workaround: instead of closing connection in STAN.Client.IStanConnection one can call STAN.Client.IStanConnection.NATSConnection.Close() and client will be no longer in invalid state. But there's still an error: when one is closing connection that way and connecting again to NATS Streaming there is still memory leak, something around 1.5 MB.

NATSTimeoutException in combination with PureWebsocket

If I want to use PureWebSocket (https://github.com/Coinigy/PureWebSockets) and OnMessage send messages to nats sreaming I am always getting NATSTimeoutException exception in this code:


try
            {
                cr = nc.Request(discoverSubject,
                    ProtocolSerializer.marshal(req),
                    opts.ConnectTimeout);
            }
            catch (NATSTimeoutException)
            {
                protoUnsubscribe();
                throw new StanConnectRequestTimeoutException();
            }

Can I use nats-streaming when I get message from web socket?

StanConnectRequestTimeoutException leaves client in invalid state

After StanConnectRequestTimeoutException is thrown client is in invalid state with resources hanging in the void. This prevents application from closing gracefully.

class Program
{
  static void Main(string[] args)
  {
    TestTimeout();
  }

  private static void TestTimeout()
  {
    var options = StanOptions.GetDefaultOptions();
    options.ConnectTimeout = 1;

    try
    {
    new StanConnectionFactory().CreateConnection("test-cluster", "test", options);
    }
    catch (StanConnectRequestTimeoutException)
    {
    Console.WriteLine("Timeout");
    }
  }
}

After executing this code application stays open. My machine runs Windows 10 64-bit with nats-streaming-server 0.10.2.

Originally filed as nats-io/nats.net#225 by @mabzd

System.IO.FileNotFoundException running on Mono/Linux

I am trying out the NATS and NATS streaming system with mono on linux. I added STAN.Client via NuGet and my IDE (Rider) finds the reference without any problems. The build also works. But when trying to run the project, it aborts with an error:

Unhandled Exception:
System.IO.FileNotFoundException: Could not load file or assembly 'STAN.CLIENT, Version=0.1.4.0, Culture=neutral, PublicKeyToken=null' or one of its dependencies.
File name: 'STAN.CLIENT, Version=0.1.4.0, Culture=neutral, PublicKeyToken=null'
  at DataRetriever.Program.Main (System.String[] args) [0x00001] in <f1b0724d24c64d46b687f47c67d5f0f3>:0 

Although the NuGet package is installed and located where all the others are.

I had the same issue with csharp-nats earlier today and found their issue 176 which deals with case-sensitive os. I presume this is the same problem.

But unpacking the NuGet package, changing the file extensions from .DLL and .XML to lower case and then repackaging didn't solve it for me. I guess it's not the proper way to do it.

Has anyone else experienced this? Is there an easy solution for it? Or might it just be that same case-sensitive os issue?

Thanks in advance

Which should I use csharp-nats or csharp-nats-streaming?

Hi.

I develop an application that sends messages to a NATS server with C#.

So, I look for NATS client library for C# and I find two libraries.
One of them is this repository and another is here.

I'm confused because I don't know what is the difference.
Which library should I use and cloud you tell me what is the difference?

NuGet restore for .NET core

Hi,

I get the following exception when trying to restore the NuGet package in an .NET Core project (VS2017):

Restoring packages for D:\Projects\stanTest\stanTest\stanTest.csproj...
Package STAN.Client 0.1.3 is not compatible with netcoreapp1.1 (.NETCoreApp,Version=v1.1). Package STAN.Client 0.1.3 supports: net45 (.NETFramework,Version=v4.5)
Package NATS.Client 0.6.1 is not compatible with netcoreapp1.1 (.NETCoreApp,Version=v1.1). Package NATS.Client 0.6.1 supports: net45 (.NETFramework,Version=v4.5)
One or more packages are incompatible with .NETCoreApp,Version=v1.1.
Package restore failed. Rolling back package changes for 'stanTest'.

This should be working according to #37 correct?

Thanks

Connection Lost Handler and error handling strategy.

Hey guys!

I've been playing around a little with the library. I'd love to hear your thoughts on the strategy I should use to gracefully handle the errors when creating connections and other connectivity issues I might face.

Looking at the golang client you can see a ConnectionLostHandler function you can pass to the Options and take actions when any error happens. That is not present in this client at the moment.

So here some questions:

  1. Is this by design?
  2. Are you handling this internally to retry the connections?
  3. Should I handle the possible exceptions in my integration logic?
  4. What's the recommended approach/strategy?

Thanks in advance and keep up the good work!
Cheers,
Karel.

StanMsgHandlerArgs internal constructor and no interface makes unit-testing impossible

StanMsgHandlerArgs currently has an internal constructor, and it is not implementing an interface.
This makes it impossible to create unit-tests on methods that in some way rely on an object of this type.

Would it be possible for it to be opened up, or for there to be some factory that can produce a "default" version?

Edit: This also applies to StanMsg

How to: durable queue subscriber

I'm trying to use a durable queue subscriber. I just download the example and modified that.
In the StanSubscriber, I set the "DurableName" in the "StanSubscriptionOptions" and passed the "qGroup" parameter in the "Subcribe" method.
It doesn't work.

After many attempts, I removed the using that dispose the "IStanSubscription" and the durable feature worked.

I wonder to know if this is the way or not.
And if it is the way: if I never dispose a "IStanSubscription", will I get troubles?

Messages not arriving for client....

Hi,

I am just getting started with NATS Streaming and building a simple framework around what is available directly out the STAN Client. I have a basic publisher:

public abstract class SpYtPublisherBase : ISpYtPublisher
{
    private readonly StanOptions _StanOptions;

    protected SpYtPublisherBase(SpYtPublisherSubscriberBaseOptions aOptions)
    {
        Options = aOptions;
        _StanOptions = StanOptions.GetDefaultOptions();
        _StanOptions.NatsURL = $"{Options.Connection.ServerAddress}:{Options.Connection.ServerPort}";
    }

    public SpYtPublisherSubscriberBaseOptions Options { get; }

    public virtual void Publish(SpYtMessageContainer aMessage)
    {
        StanConnectionFactory cf = new StanConnectionFactory();
        using (IStanConnection c = cf.CreateConnection(Options.Connection.ClusterId, Options.Connection.ClientId, _StanOptions))
        {
            c.Publish(Options.Queue.Subject, SpYtMessageContainerHelper.SerializeToArray(aMessage));
        }
    }
}

and a basic subscriber:

public class SpYtSubscriberBase
{
    private readonly StanOptions _StanOptions;

    public SpYtPublisherSubscriberBaseOptions Options { get; }

    public SpYtSubscriberBase(SpYtPublisherSubscriberBaseOptions aOptions)
    {
        Options = aOptions;
        _StanOptions = StanOptions.GetDefaultOptions();
        _StanOptions.NatsURL = $"{Options.Connection.ServerAddress}:{Options.Connection.ServerPort}";
    }

    public virtual void Subscribe()
    {
        StanSubscriptionOptions subscriptionOptions = StanSubscriptionOptions.GetDefaultOptions();
        subscriptionOptions.DeliverAllAvailable();

        StanConnectionFactory cf = new StanConnectionFactory();
        using (IStanConnection c = cf.CreateConnection(Options.Connection.ClusterId, Options.Connection.ClientId, _StanOptions))
        {
            c.Subscribe(Options.Queue.Subject, subscriptionOptions, MessageHandler);
        }

    }

    private void MessageHandler(object aSender, StanMsgHandlerArgs aStanMessageHandlerArgs)
    {
        //Message handler is never called :-(
        throw new NotImplementedException();
    }
}

A couple of descendants which publish / subscribe to the low subject/queue/channel name:

public class SpYtPublisherLow : SpYtPublisherBase
{
    public SpYtPublisherLow(SpYtPublisherSubscriberBaseOptions aOptions) : base(aOptions)
    {
        Options.Queue.Subject = "low";
    }
}

public class SpYtSubscriberLow : SpYtSubscriberBase
{
    public SpYtSubscriberLow(SpYtPublisherSubscriberBaseOptions aOptions) : base(aOptions)
    {
        Options.Queue.Subject = "low";
    }
}

I then exercise the above using a simple NUnit test case:

[TestFixture]
public class SpYtPublisherLowTests : SpYtTestsBase
{
    private SpYtPublisherLow _PublisherLow;
    private SpYtSubscriberLow _SubscriberLow;

    /// <summary>
    ///     Called at the commencement of all the unit tests.
    /// </summary>
    public override void Init()
    {
        SpYtPublisherSubscriberBaseOptions options = new SpYtPublisherSubscriberBaseOptions();

        options.Connection.ClientId = "NUnit_Test_Publisher";
        _PublisherLow = new SpYtPublisherLow(options);

        options.Connection.ClientId = "NUnit_Test_Subscriber";
        _SubscriberLow = new SpYtSubscriberLow(options);

        base.Init();
    }

    [Test]
    public void Publish()
    {
        //Arrange
        SpYtMessageContainer noopMessage = SpYtMessageBuilder.BuildNoOpMessage("956D26F9-4EB1-46FB-ABA9-8B7FB53182EF");

        //Act
        _SubscriberLow.Subscribe();
        _PublisherLow.Publish(noopMessage);

        //Assert
        
    }
}

The NATS streaming server is running (memory based) and I can see the low channel has been created:

STREAM: Starting nats-streaming-server[splyce-cluster] version 0.12.0
STREAM: ServerID: 8PD60Lare7fPJUIpBapEGE
STREAM: Go version: go1.11.5
Starting nats-server version 1.4.1
Git commit [not set]
Starting http monitor on 0.0.0.0:8222
Listening for client connections on 0.0.0.0:4222
Server is ready
STREAM: Recovering the state...
STREAM: No recovered state
STREAM: Message store is MEMORY
STREAM: ---------- Store Limits ----------
STREAM: Channels:                  100 *
STREAM: --------- Channels Limits --------
STREAM:   Subscriptions:          1000 *
STREAM:   Messages     :       1000000 *
STREAM:   Bytes        :     976.56 MB *
STREAM:   Age          :     unlimited *
STREAM:   Inactivity   :     unlimited *
STREAM: ----------------------------------
STREAM: Channel "low" has been created

According to the debug end points I have SEVEN messages in the server (all in the low channel) and if I break point in SpYtSubscriberBase.Subscribe() I can see I have a client connected:

/streaming/serverz
{
  "cluster_id": "splyce-cluster",
  "server_id": "8PD60Lare7fPJUIpBapEGE",
  "now": "2019-03-19T21:23:53.2810254+11:00",
  "offset": 0,
  "limit": 1024,
  "count": 1,
  "total": 1,
  "clients": [{
    "id": "NUnit_Test_Subscriber",
    "hb_inbox": "_INBOX.1R9CBRNURNZW8W65PLV7XJ"
  }]
}

/streaming/storez
{
  "cluster_id": "splyce-cluster",
  "server_id": "8PD60Lare7fPJUIpBapEGE",
  "now": "2019-03-19T21:26:32.4109619+11:00",
  "type": "MEMORY",
  "limits": {
    "max_channels": 100,
    "max_msgs": 1000000,
    "max_bytes": 1024000000,
    "max_age": 0,
    "max_subscriptions": 1000,
    "MaxInactivity": 0
  },
  "total_msgs": 7,
  "total_bytes": 455
}

/streaming/channelsz
{
  "cluster_id": "splyce-cluster",
  "server_id": "8PD60Lare7fPJUIpBapEGE",
  "now": "2019-03-19T21:27:08.3272515+11:00",
  "offset": 0,
  "limit": 1024,
  "count": 1,
  "total": 1,
  "names": ["low"]
}

However at no point is SpYtSubscriberBase.MessageHandler() called. Can someone point me in the right direction as to why this is the case? All help appreciated.

Durable subscription not receiving more messages

I have a console app that subscribes to a subject/topic using a durable subscription, as part of my testing I was constantly stopping and starting the app and I was getting all the messages that were published while the app was down, as expected from a durable subscription. One day I stopped the app and since then when starting it again I have not been able to receive any more messages, even though messages are being published to that subject/topic, and I am sure of it because I have another app subscribing to the same subject/topic using a different client that is receiving them.

The following is some data I got from the monitoring endpoint on my cluster:

{
"client_id": "dc-dev-streaming-to-fluentd_HTOLED-BH64062-buytelco-com",
"inbox": "_INBOX.93PXC0KQT5A3IWUNNOF818",
"ack_inbox": "_STAN.ack.bv-cluster.dc.dev.falcon-windows-service.offer.created.Souo0W4QWFVSlTjrT83kdf",
"queue_name": "Streaming-To-Fluentd:Streaming-To-Fluentd",
"is_durable": true,
"is_offline": false,
"max_inflight": 1024,
"ack_wait": 30,
"last_sent": 138578,
"pending_count": 0,
"is_stalled": false
}

The fact that the "is_offline" property is set to false indicates that the server is aware of the subscription from the client, hence my question, why is the server not delivering the messages to that subscription? Any clue or something I can do in my side to avoid or solve this issue?

Should Timestamp of messages be in UTC Format?

Hi, I was looking through the StanMsg.cs file and noticed that the received timestamp of the message gets constructed like this
return new DateTime(1970, 1, 1, 0, 0, 0, 0)
but according to msdn documentation this results in an 'unspecified' DateTimeKind, should it be set to
new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc)?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.