Git Product home page Git Product logo

nsqsharp's Introduction

NsqSharp

Build status  License  NuGet version  Nuget

A .NET client library for NSQ, a realtime distributed messaging platform.

Check out this slide deck for a quick intro to NSQ.

Watch Spray Some NSQ On It by co-author Matt Reiferson for an under 30-minute intro to NSQ as a messaging platform.

Project Status

  • Used in Production.
  • Maintained. Issues and Pull Requests will be responded to.

Quick Install

NsqSharp is a client library that talks to the nsqd (message queue) and nsqlookupd (topic discovery service). See the slides above for more information about their roles.

Download nsqd.exe and nsqlookupd.exe and run them from the command line:

nsqlookupd

nsqd -lookupd-tcp-address=127.0.0.1:4160

Or, to install as Windows Services:

mkdir c:\nsq\data

copy /y nsqd.exe c:\nsq
copy /y nsqlookupd.exe c:\nsq

sc create nsqlookupd binpath= "c:\nsq\nsqlookupd.exe" start= auto DisplayName= "nsqlookupd"
sc description nsqlookupd "nsqlookupd 0.3.2"
sc start nsqlookupd

sc create nsqd binpath= "c:\nsq\nsqd.exe -mem-queue-size=0 -lookupd-tcp-address=127.0.0.1:4160 -data-path=c:\nsq\data" start= auto DisplayName= "nsqd"
sc description nsqd "nsqd 0.3.2"
sc start nsqd

You can also build these files from source: https://github.com/nsqio/nsq (official), or https://github.com/judwhite/nsq (fork) to add the ability to run as a Windows Service.

C# Examples

PM> Install-Package NsqSharp

More examples are in the Examples folder.

Simple Producer

using System;
using NsqSharp;

class Program
{
    static void Main()  
    {
        var producer = new Producer("127.0.0.1:4150");
        producer.Publish("test-topic-name", "Hello!");

        Console.WriteLine("Enter your message (blank line to quit):");
        string line = Console.ReadLine();
        while (!string.IsNullOrEmpty(line))
        {
            producer.Publish("test-topic-name", line);
            line = Console.ReadLine();
        }

        producer.Stop();
    }
}

Simple Consumer

using System;
using System.Text;
using NsqSharp;

class Program
{
    static void Main()  
    {
        // Create a new Consumer for each topic/channel
        var consumer = new Consumer("test-topic-name", "channel-name");
        consumer.AddHandler(new MessageHandler());
        consumer.ConnectToNsqLookupd("127.0.0.1:4161");

        Console.WriteLine("Listening for messages. If this is the first execution, it " +
                          "could take up to 60s for topic producers to be discovered.");
        Console.WriteLine("Press enter to stop...");
        Console.ReadLine();

        consumer.Stop();
    }
}

public class MessageHandler : IHandler
{
    /// <summary>Handles a message.</summary>
    public void HandleMessage(IMessage message)
    {
        string msg = Encoding.UTF8.GetString(message.Body);
        Console.WriteLine(msg);
    }

    /// <summary>
    /// Called when a message has exceeded the specified <see cref="Config.MaxAttempts"/>.
    /// </summary>
    /// <param name="message">The failed message.</param>
    public void LogFailedMessage(IMessage message)
    {
        // Log failed messages
    }
}

NsqSharp.Bus

The classes in the NsqSharp.Bus namespace provide conveniences for large scale applications:

  • Interoperating with dependency injection containers.
  • Separation of concerns with regards to message routing, serialization, and error handling.
  • Abstracting the details of Producer and Consumer from message sending and handling.

The PingPong and PointOfSale examples highlight using:

Applications initiated with BusService.Start can be installed as a Windows Service using sc create. When in console mode the application will gracefully shutdown with Ctrl+C. When running as a Windows Service stopping the service or rebooting/shutting down the machine will do a graceful shutdown.

NsqSharp has no external dependencies. StructureMap, Autofac, and Newtonsoft.Json are supported through convenience classes which use reflection for the initial wire-up. Other containers and serializers can be used by implementing IObjectBuilder and IMessageSerializer wrappers in your code.

NsqSharp Project Goals

  • Structurally similar to the official go-nsq client.
  • Up to date with the latest stable release of go-nsq.
  • Provide similar behavior and semantics as the official package.
  • Unobtrusive. No external dependencies. Publishing message contracts does not require a reference to NsqSharp.

Pull Requests

Pull requests and issues are very welcome and appreciated.

When submitting a pull request please keep in mind we're trying to stay as close to go-nsq as possible. This sometimes means writing C# which looks more like Go and follows their file layout. Code in the NsqSharp.Bus namespace should follow C# conventions and more or less look like other code in this namespace.

License

This project is open source and released under the MIT license.

nsqsharp's People

Contributors

judwhite avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nsqsharp's Issues

Unhandled ArgumentOutOfRangeException

Check internal uses of NsqSharp.Utils.GoFunc, possibly timers

Exception Info: System.ArgumentOutOfRangeException
Stack:
at NsqSharp.Utils.GoFunc+<>c__DisplayClass1.b__0()
at System.Threading.ThreadHelper.ThreadStart_Context(System.Object)
at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
at System.Threading.ThreadHelper.ThreadStart()

Incoming message mutator

Similar to how a message mutator can be used to modify payloads before publishing to nsqd, we may want to expose the messaging pipeline to allow incoming messages to be mutated. For example, a common lookup on a field.

Today subscribing to a topic/channel is implicit through implementing IHandleMessages<>. A global incoming message mutator would be the easiest thing to do, but we may not want to pay that penalty for each message received, execute the type-checking code, or even have such logic in a central place.

When using the more primitive Consumer class it would be possible to implement this as part of the pipeline with an IHandler implementation.

Need to think about if a pipeline hook or a global incoming mutator is more appropriate.

/cc @crazybert

Refactor MessageTopicRouter GetTopics

In configuration code often has to be duplicated between IMessageTopicRouter.GetTopics and IMessageTopicRouter.GetMessageTopic. Consumers could instead be spun up by the bus dynamically by polling nsqlookupd looking for pattern matches based on the original topic from IMessageTypeToTopicProvider.

This would reduce unnecessary Consumers for topics which don't exist yet and let applications remove duplicated configuration code.

https://github.com/judwhite/NsqSharp/blob/master/NsqSharp/Bus/Configuration/IMessageTopicRouter.cs

https://github.com/judwhite/NsqSharp/blob/master/NsqSharp/Bus/Configuration/Providers/IMessageTypeToTopicProvider.cs

Add new hook for message bit manipulation (namely, encryption)

IMessageMutator - works on a message object being sent out, not appropriate
IMessageSerializer - Technically this interface works though the name isn't intuitive

If IMessageSerializer could be set at a per-topic level it could be re-used, and ties into #5 and #6.

Need to determine if another bit-manipulator is needed post serialization or if allowing more fine grained serialization settings (which we want for XML/JSON anyway) would be appropriate.

ConsoleLogger helper utility needs to be thread safe

Additional information: Probable I/O race condition detected while copying memory. The I/O package is not thread safe by default. In multithreaded applications, a stream must be accessed in a thread-safe way, such as a thread-safe wrapper returned by TextReader's or TextWriter's Synchronized methods. This also applies to classes like StreamWriter and StreamReader.

Add metrics/debug HTTP endpoint

The goal is to expose state about the process in a human and machine readable format. The user doesn't necessarily need to use Prometheus, but I'm using it as an example for how to expose metrics.

Prometheus compatible /metrics example from etcd:

.NET HttpListener:

Prometheus tutorial:

General references for API documentation:

Expose an IBus's Consumers

This will allow us to access:

  • GetStats
  • ChangeMaxInFlight (update RDY count)

Throttling can be built on top of exposing ChangeMaxInFlight. Relates to #5 if throttling abstraction is added.

Add IBus PublishRaw convenience method

void PublishRaw(string topic, byte[] message) convenience method; the bus already has the configuration needed to do a raw publish, otherwise a Producer or NsqdHttpClient needs to be created to publish raw byte messages.

Add in-memory bus mode

Allow for single "active" threaded step-through mode (useful debugging).

Coordination happens through the bus. When a message is published it should be handled on a new thread, and the current thread waits until the new thread finishes.

A new thread is probably preferred because of ThreadStatic fields.

This is separate from an in-memory nsqd / nsqlookupd, and will require skipping INsqdPublisher.

Discovery nsqd

Hi, I'm using NsqSharp with Docker! and I'm trying to scale the nsqd. I'm able to do that. I'm using nsqlookupd for getting all nsqd instances, but, it returns me a strange IP and ports, and I can't use these in my clients.

Any ideia?

Structured logging

Write structured logs such as logfmt or json to be both human readable and machine parseable

Consumer: over configured max-inflight when using RDYRedistributeOnIdle

When using RDYRedistributeOnIdle if a message comes in on a previously idle connection it's possible to receive the error:

NsqSharp.Core.ErrOverMaxInFlight: over configured max-inflight

This error is temporary and doesn't impact processing, but it should be handled more gracefully.

Documentation on CurrentThreadMessage.Finish

Comment says "indicating the message completed successfully". This may not always be the case, you may wish to Finish a non-recoverable message and then throw to log the error.

Log more info on PUB protocol error

Example:

[2016-10-07 17:13:23.8432] [Error] P1 (127.0.0.1:4150) protocol error - E_BAD_MESSAGE PUB message too big 1061489 > 1024768

It may be a duplicate of this item later in the log:

[2016-10-07 17:13:23.8602] [Error] messageAuditor.OnReceived - NsqSharp.Core.ErrProtocol: E_BAD_MESSAGE PUB message too big 1061489 > 1024768

We should be able to get the Topic and add it to the log output.

ObjectDisposedException during network error

[2016-05-14 09:20:00.7733] [Error] C7 IO error - System.ArgumentException: length of response is too small
Parameter name: response
at NsqSharp.Core.Protocol.UnpackResponse(Byte[] response, FrameType& frameType, Byte[]& body) in d:\Projects_github\NsqSharp\NsqSharp\Core\Protocol.cs:line 104
at NsqSharp.Core.Conn.readLoop() in d:\Projects_github\NsqSharp\NsqSharp\Core\Conn.cs:line 554

[2016-05-14 09:20:09.9268] [Error] C7 IO error - System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.NetworkStream'.
at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
at NsqSharp.Utils.TcpConn.Write(Byte[] b, Int32 offset, Int32 length) in d:\Projects_github\NsqSharp\NsqSharp\Utils\TcpConn.cs:line 127
at NsqSharp.Core.Command.WriteTo(IWriter w, Byte[] buf) in d:\Projects_github\NsqSharp\NsqSharp\Core\Command.cs:line 164
at NsqSharp.Core.Conn.WriteCommand(Command cmd) in d:\Projects_github\NsqSharp\NsqSharp\Core\Conn.cs:line 344

[2016-05-14 09:20:09.9268] [Error] C7
error sending CLS - System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.NetworkStream'.
at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
at NsqSharp.Utils.TcpConn.Write(Byte[] b, Int32 offset, Int32 length) in d:\Projects_github\NsqSharp\NsqSharp\Utils\TcpConn.cs:line 127
at NsqSharp.Core.Command.WriteTo(IWriter w, Byte[] buf) in d:\Projects_github\NsqSharp\NsqSharp\Core\Command.cs:line 164
at NsqSharp.Core.Conn.WriteCommand(Command cmd) in d:\Projects_github\NsqSharp\NsqSharp\Core\Conn.cs:line 351
at NsqSharp.Consumer.StopAsync() in d:\Projects_github\NsqSharp\NsqSharp\Consumer.cs:line 1528

[2016-08-12 16:12:38.6065] [Error] C12 error sending command REQ 0a8fa0dcac598001 90000 - System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.NetworkStream'.
at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
at NsqSharp.Utils.TcpConn.Write(Byte[] b, Int32 offset, Int32 length) in d:\Projects_github\NsqSharp\NsqSharp\Utils\TcpConn.cs:line 127
at NsqSharp.Core.Command.WriteTo(IWriter w, Byte[] buf) in d:\Projects_github\NsqSharp\NsqSharp\Core\Command.cs:line 164
at NsqSharp.Core.Conn.WriteCommand(Command cmd) in d:\Projects_github\NsqSharp\NsqSharp\Core\Conn.cs:line 351
at NsqSharp.Core.Conn.<writeLoop>b__2(msgResponse resp) in d:\Projects_github\NsqSharp\NsqSharp\Core\Conn.cs:line 705

Round Robin Publisher

Add a publisher which uses a list of nsqd's to round robin.

Use case: an application which doesn't have nsqd installed locally, and setting up RR through DNS is undesirable.

nsqd selection strategy could be pluggable.

Retry connect to nsqlookupd during bus start

Make the connection to nsqlookupd async and not block initialization; log errors

service failed to start due to the following error: 
The service did not respond to the start or control request in a timely fashion.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.