Git Product home page Git Product logo

nsq-j's Introduction

Maven Central

nsq-j

Java client for the NSQ realtime distributed messaging platform

Install

Add a dependency using Maven

<dependency>
  <groupId>com.sproutsocial</groupId>
  <artifactId>nsq-j</artifactId>
  <version>1.4.2</version>
</dependency>

or Gradle

dependencies {
  compile 'com.sproutsocial:nsq-j:1.4.2'
}

Publish

Publisher publisher = new Publisher("nsqd-host");

byte[] data = "Hello nsq".getBytes();
publisher.publishBuffered("example_topic", data);

Buffers messages to improve performance (to 16k or 300 milliseconds by default),

publisher.publish("example_topic", data) publishes synchronously and returns after nsqd responds OK

You can batch messages manually and publish them all at once with publish(String topic, List<byte[]> messages)

Single NSQ-d host publishing

When we have a single NSQ-d host specified (failoverNsqd is null or not specified when constructing a publisher) we will reattempt a failed publish after 10 seconds by default. This happens in line with synchronous publishes or when publishing buffered and the batch size is reached.

If this second attempt fails, the call to publish will throw an NSQException.

Failover publishing

nsq-j supports failover publishing. If you specify a non-null failoverNsqd parameter or manually construct a failover balance strategy with ListBasedBalanceStrategy#getFailoverStrategyBuilder

In fail over mode, nsq-j prefers publishing the first element of the provided list of NSQD. It will fail over to the next nsqd if a publish fails. After the failover duration, the next publish will attempt to reconnect to the first nsqd. Failover duration defaults to 5 min.

If all nsqd are in a failed state (have all failed within the failover duration), the publish will throw an NSQException.

Round-robin publishing

To use round robin, construct a balance strategy with ListBasedBalanceStrategy#getRoundRobinStrategyBuilder providing a list of nsqd to use.

All the hosts that are included in the list will be added to a rotation. Each publish action is sent to the next host in the rotation. If a publishing fails, the host is marked "dead" for the failover duration (5 min default) before it will be added back to the rotation. If all hosts are marked dead, an NSQException will be thrown out of publish.

Subscribe

public class PubExample {

    public static void handleData(byte[] data) {
        System.out.println("Received:" + new String(data));
    }

    public static void main(String[] args) {
        Subscriber subscriber = new Subscriber("lookup-host-1", "lookup-host-2");
        subscriber.subscribe("example_topic", "test_channel", PubExample::handleData);
    }
}

Uses the lookup-hosts to discover any servers publishing example_topic.

If handleData returns normally FIN is sent to nsqd to finish the message, if the handler throws any exception then the message is requeued and processing is delayed using exponential backoff. Delay times and max attempts are configurable.

For complete control handle the Message directly:

    public static void handleMessage(Message msg) {
        try {
            byte[] data = msg.getData();
            // ... complicated work ...
            msg.finish();
        }
        catch (Exception e) {
            msg.requeue();
        }
    }

Publishers and Subscribers are thread safe and should be reused. Your handler methods should be thread safe, make them synchronized if you are unsure.

Client.getDefaultClient().stop() waits for in-flight messages, closes all connections and allows all threads to exit.

Javadocs

Development

You must have at least JDK 8 installed. A locally running docker install is also required to execute the full test suite. The test suite boots a small, local nsq clutser to exercise the full publish / subscribe flow, including failure modes.

You can run the full test suite, and compile a working jar with:

make clean all

This will do the necessary setup (pulling docker images), run the test suite, and build the final jar artifact.

If you just want to execute the test suite, you can use:

make clean test

nsq-j's People

Contributors

blakesmith avatar cswingler avatar erik-helleren avatar ezy023 avatar jensrantil avatar jferretti-life avatar jsadn avatar mrtrumbe avatar patrickgombert avatar prestonhansen avatar robseed avatar ulaskeles avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nsq-j's Issues

Add Project Jigsaw support

I have a modular project and the nsq-j dependency broke my project.

Could you, please add (jigsaw) project support?

For this feature, the project's jdk should upgrade to jdk 9 or higher

Message timeout error from nsqd causes socket connection to nsqd to be closed, even when it is not a connection error

Hello. While testing, I noticed some exceptions in my consumer service related to the E_FIN_FAILED error from nsqd.

The nsqd --msg-timeout argument is set to 1 minute, and it is timing out of my messages due to a slow consumer. nsq-j, on the consumer side of things, gets the following error when invoking finish on one of these timed out messages.

 2020-01-05 03:59:09.851 [ERROR] [nsq-read-0] c.s.n.Connection : read thread exception. con:SubCon:nsqd-internal circleObserved.geoshort-write#ephemeral
 com.sproutsocial.nsq.NSQException: error from nsqd:E_FIN_FAILED FIN 0c9644943b7e6029 failed ID not in flight
     at com.sproutsocial.nsq.Connection.readResponse(Connection.java:239) ~[nsq-j-0.9.4.jar!/:?]
     at com.sproutsocial.nsq.Connection.read(Connection.java:255) [nsq-j-0.9.4.jar!/:?]
     at com.sproutsocial.nsq.Connection.access$400(Connection.java:25) [nsq-j-0.9.4.jar!/:?]
     at com.sproutsocial.nsq.Connection$2.run(Connection.java:94) [nsq-j-0.9.4.jar!/:?]

The problem is that nsq-j is checking if the message equals E_FIN_FAILED, but instead, the message equals E_FIN_FAILED FIN 0c9644943b7e6029 failed ID not in flight. This causes the following NSQException to be thrown.

From com.sproutsocial.nsq.Connection:

private static final Set<String> nonFatalErrors = Collections.unmodifiableSet(new HashSet<String>(
        Arrays.asList("E_FIN_FAILED", "E_REQ_FAILED", "E_TOUCH_FAILED")));
...

String error = readAscii(size - 4);
if (nonFatalErrors.contains(error)) {
    logger.warn("non fatal nsqd error:{} probably due to message timeout", error);
}
else {
    throw new NSQException("error from nsqd:" + error);
}

Since a NSQException is thrown, the connection to nsqd is closed. All of the inflight messages are associated with this connection, and now they get the following error when they complete their message.

2020-01-05 03:59:36.811 [ERROR]    53 --- [       nsq-co-0] c.s.n.SubConnection                      : finish error. SubCon:nsqd-internal circleObserved.geoshort-write#ephemeral lastFlush:27.0 lastHeartbeat:54.9 unflushedCount:8        inFlight:16 maxInFlight:25 fin:24 req:0
 java.net.SocketException: Socket closed
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) ~[?:?]
     at java.net.SocketOutputStream.write(SocketOutputStream.java:150) ~[?:?]
     at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81) ~[?:?]
     at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142) ~[?:?]
     at java.io.DataOutputStream.flush(DataOutputStream.java:123) ~[?:?]
     at com.sproutsocial.nsq.Connection.flush(Connection.java:220) ~[nsq-j-0.9.4.jar!/:?]
     at com.sproutsocial.nsq.SubConnection.checkFlush(SubConnection.java:104) ~[nsq-j-0.9.4.jar!/:?]
     at com.sproutsocial.nsq.SubConnection.messageDone(SubConnection.java:74) ~[nsq-j-0.9.4.jar!/:?]
     at com.sproutsocial.nsq.SubConnection.finish(SubConnection.java:47) ~[nsq-j-0.9.4.jar!/:?]
     at com.sproutsocial.nsq.NSQMessage.finish(NSQMessage.java:48) ~[nsq-j-0.9.4.jar!/:?]
     at com.services.ui.nsq.CircleObservedConsumer$MessageResult$1.onComplete(CircleObservedConsumer.java:64) ~[classes!/:0.0.1-SNAPSHOT]
     at com.services.ui.nsq.CircleObservedConsumer.handleMessage(CircleObservedConsumer.java:200) ~[classes!/:0.0.1-SNAPSHOT]
     at com.sproutsocial.nsq.SubConnection$3.run(SubConnection.java:178) [nsq-j-0.9.4.jar!/:?]
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
     at java.lang.Thread.run(Thread.java:834) [?:?]

What I expect to happen:
I expect the E_FIN_FAILED FIN 0c9644943b7e6029 failed ID not in flight error to be considered non-fatal, and only logged at the warning level.

What actually happens:
The E_FIN_FAILED FIN 0c9644943b7e6029 failed ID not in flight causes a fatal exception, causing the socket connection to nsqd to be closed.

Repeatable, 100%. To reproduce:

  1. run nsqd (nsqio/nsq:v1.2.0) with the default timeout (1m)
  2. create a consumer that sleeps for 5s, and then invokes message.finish()
  3. run that consumer with one thread, 25 in-flight
  4. post 100 messages to nsqd for that topic
  5. wait for NSQException to occur after a minute
  6. as each in-flight is consumed, wait for SocketException to be thrown during the message.finish() call

[Feature] Use a Selector (NIO), rather than a thread per connection

I would like to see the Connection utilize a Selector and a single thread rather than unique reader threads per connection/socket. This will make the library work a lot better when scaling up the number of Topics/Channels being utilized.

I am willing to make/submit this change as a PR.

Nsq-j client blocks until maxWaitForShutdown on shutdown due to bad nsqd connections

Hello. I am fairly new to this nsq-j client, and I ran into an issue when shutting down my service. The problem occurs when nsqlookupd contains an address for a nsqd service that is not network accessible to my service (I get java.net.UnknownHostException). When this is the case, my service (the nsq-j client) freezes until the shutdown timeout value is reached.

My service subscribes to a single topic, on a single channel. The nsqlookupd service has two nsqd services registered for the topic, and one of the services is accessible (GOOD_NSQD) to my service, and the other one is not (BAD_NSQD).

Upon looking at the code for Subscription, I see the following:

From Subscription:

public synchronized void checkConnections(Set<HostAndPort> activeHosts) {
...
  for (HostAndPort activeHost : activeHosts) {
      if (!connectionMap.containsKey(activeHost)) {
          try {
              logger.info("adding new connection:{} topic:{}", activeHost, topic);
              SubConnection con = new SubConnection(client, activeHost, this);
              con.connect(subscriber.getConfig());
              connectionMap.put(activeHost, con);
          }
          catch (Exception e) {
              logger.error("error connecting to:{}", activeHost, e);
          }
      }
  }
...
}

The connectionMap does not contain my BAD_NSQD connection, so it creates a new SubConnection. The SubConnection.connect() fails with an IOException, so the connection is never added to the connectionMap. This seems correct, until I noticed the first line of SubConnection.connect():

From SubConnection:

public synchronized void connect(Config config) throws IOException {
    client.addSubConnection(this);
    super.connect(config);        // throws IOException

The first line of connect() adds the SubConnection to the client's subConnection set. The second line, throws the IOException. Nothing removes the bad connection from the Client.

Over time, the client's connection set becomes filled with BAD_NSQD SubConnections.

On shutdown, the Client.stopSubscribers() method blocks, waiting until Client.subConnections is empty, which it never becomes due to the bad connections. The Subscription.stop() method closes only the connections in its connectionMap, which do not include the failed connections.

What actually happens:
Client.stop() blocks until maxWaitForShutdown on shutdown due to bad nsqd connections

What I expect to happen:
I expect the nsq-j client to gracefully terminate after all in-flight messages are emptied out.

100% reproducible.

  1. run a nsqlookupd that is network accessible to your service
  2. run a nsqd that is network accessible to your service
  3. run a nsqd that is accessible to nsqlookupd, but not your service
  4. Run your nsq-j service that consumes a single topic. In my case, I have 1 consumer thread, and 25 in-flight, with a maxWaitForShutdown set to 5 minutes. The nsq-j service invokes Client.stop() the service is shutdown.
  5. From your terminal, kill your nsq-j service (e.g. kill -INT )
  6. Notice the following in the logs:
2020-01-06 20:08:24.092 [INFO ]    23 --- [extShutdownHook] c.s.n.Client                             : stopping nsq client
2020-01-06 20:08:24.093 [INFO ]    23 --- [extShutdownHook] c.s.n.Subscriber                         : subscriber stopped
2020-01-06 20:08:24.093 [INFO ]    23 --- [extShutdownHook] c.s.n.Client                             : waiting for subscribers to finish in-flight messages

Sample usage for pubsub

Hi, I'm totally new to NSQ, but trying around this library I can't seem to make a simple example in which I publish something on the a topic and get that back on the subscriber.
Am I missing something?
I'm literally just using the code on the README.

Thanks

dead lock occur between Subscription and Subscriber

Subscription.distributeMaxInFlight() lock itself with sync lock, and than access Subscriber.getLookupIntervalSecs() to get sync lock of Subscriber.

another thread Subscriber.lookup() lock itself with sync lock and than access Subscription.checkConnections() to get sync lock of Subscription.

the thread dump as follow:

"nsq-sub-0":
at com.sproutsocial.nsq.Subscriber.getLookupIntervalSecs(Subscriber.java:213)
- waiting to lock <0x00000003efa47af8> (a com.sproutsocial.nsq.Subscriber)
at com.sproutsocial.nsq.Subscription.distributeMaxInFlight(Subscription.java:76)
at com.sproutsocial.nsq.Subscription.setMaxInFlight(Subscription.java:38)
- locked <> 0x00000003efa47ff8(a com.sproutsocial.nsq.Subscription)
at com.sproutsocial.nsq.BackoffHandler.successDuringBackoff(BackoffHandler.java:103)
- locked <0x00000003efa480a0> (a com.sproutsocial.nsq.BackoffHandler)
at com.sproutsocial.nsq.BackoffHandler.accept(BackoffHandler.java:45)
at com.sproutsocial.nsq.SubConnection$3.run(SubConnection.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"nsq-sched-1":
at com.sproutsocial.nsq.Subscription.checkConnections(Subscription.java:42)
- waiting to lock <0x00000003efa47ff8> (a com.sproutsocial.nsq.Subscription)
at com.sproutsocial.nsq.Subscriber.lookup(Subscriber.java:112)
- locked <0x00000003efa47af8> (a com.sproutsocial.nsq.Subscriber)
at com.sproutsocial.nsq.Subscriber.access$000(Subscriber.java:27)
at com.sproutsocial.nsq.Subscriber$1.run(Subscriber.java:51)
at com.sproutsocial.nsq.Client$1.run(Client.java:162)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

subscriber.getLookupIntervalSecs() should not be synchronized?

how to use auth in nsq-j?

I saw from nsq official documentation that this client supports authentication but I can't use it.how to use auth in nsq-j?

Can't set MaxRdyCount!

Hi,
I'm using I use your library for a while, but now I need to set maxRdyCount property of nsq subscribtion, and setting maxInFlight doesn't seems to change fixed 2500 maxRdyCount value.

How can I do?

[bug?] Application could not terminate correctly

When I use the sample code to publish message in README.md, I found the application could not terminate correctly. The following is my code:

    public void putMessage(String message) {
        Publisher publisher = new Publisher("localhost");
        byte[] data = "Hello nsq".getBytes();
        publisher.publish("example_topic", data);
        publisher.stop();
        //publisher.getClient().stop();
    }

I found this is due to client is not stopped when publisher is stopped. When I call publisher.getClient().stop(); explicitly, the application could terminate correctly.

It does not make sense that I did not use a client, but I should close it(I used publisher only). I think this should be fixed.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.