Git Product home page Git Product logo

jeromq's Introduction

JeroMQ

Pure Java implementation of libzmq (http://zeromq.org).

CircleCI Quality Gate Status Coverage Status Maven Central Javadocs

Features

  • Based on libzmq 4.1.7.

  • ZMTP/3.0 (http://rfc.zeromq.org/spec:23).

  • tcp:// protocol and inproc:// is compatible with zeromq.

  • ipc:// protocol works only between jeromq (uses tcp://127.0.0.1:port internally).

  • Securities

  • Performance that's not too bad, compared to native libzmq.

  • Exactly same developer experience with zeromq and jzmq.

  • TCP KeepAlive Count, Idle and Interval are known to only work with JVM 13 and later.

Unsupported

  • ipc:// protocol with zeromq. Java doesn't support UNIX domain socket.

  • pgm:// protocol. Cannot find a pgm Java implementation.

  • norm:// protocol. Cannot find a Java implementation.

  • tipc:// protocol. Cannot find a Java implementation.

  • GSSAPI mechanism is not yet implemented.

  • Interrupting threads is still unsupported: library is NOT Thread.interrupt safe.

Contributing

Contributions welcome! See CONTRIBUTING.md for details about the contribution process and useful development tasks.

Usage

Maven

Add it to your Maven project's pom.xml:

    <dependency>
      <groupId>org.zeromq</groupId>
      <artifactId>jeromq</artifactId>
      <version>0.6.0</version>
    </dependency>

    <!-- for the latest SNAPSHOT -->
    <dependency>
      <groupId>org.zeromq</groupId>
      <artifactId>jeromq</artifactId>
      <version>0.6.0</version>
    </dependency>

    <!-- If you can't find the latest snapshot -->
    <repositories>
      <repository>
        <id>sonatype-nexus-snapshots</id>
        <url>https://oss.sonatype.org/content/repositories/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
       </repository>
    </repositories>

Ant

To generate an ant build file from pom.xml, issue the following command:

mvn ant:ant

Getting started

Simple example

Here is how you might implement a server that prints the messages it receives and responds to them with "Hello, world!":

import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;

public class hwserver
{
    public static void main(String[] args) throws Exception
    {
        try (ZContext context = new ZContext()) {
            // Socket to talk to clients
            ZMQ.Socket socket = context.createSocket(SocketType.REP);
            socket.bind("tcp://*:5555");

            while (!Thread.currentThread().isInterrupted()) {
                // Block until a message is received
                byte[] reply = socket.recv(0);

                // Print the message
                System.out.println(
                    "Received: [" + new String(reply, ZMQ.CHARSET) + "]"
                );

                // Send a response
                String response = "Hello, world!";
                socket.send(response.getBytes(ZMQ.CHARSET), 0);
            }
        }
    }
}

More examples

The JeroMQ translations of the zguide examples are a good reference for recommended usage.

Documentation

For API-level documentation, see the Javadocs.

This repo also has a doc folder, which contains assorted "how to do X" guides and other useful information about various topics related to using JeroMQ.

License

All source files are copyright © 2007-2024 contributors as noted in the AUTHORS file.

Free use of this software is granted under the terms of the Mozilla Public License 2.0. For details see the file LICENSE included with the JeroMQ distribution.

jeromq's People

Contributors

bvella avatar c-rack avatar capacman avatar crocket avatar daveyarwood avatar davipt avatar dayjun avatar dermesser avatar ealgell avatar fbacchella avatar fredoboulo avatar garydgregory avatar hachikuji avatar hintjens avatar imkcy9 avatar ipechorin avatar isahkemat avatar jcoeltjen avatar jkwatson avatar knuesel avatar kylemallory avatar markif avatar miniway avatar parente avatar refactormachine avatar sappo avatar sbanacho avatar sjohnr avatar somdoron avatar trevorbernard avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jeromq's Issues

Pause / Slowdown in Sending

Hi - i am seeing strange pauses in the message processing of JeroMQ. Everything runs fine and then the system seems hiccups and the message flow is interrupted. I am using PUSH / PULL sockets and what is really strange is that the issue seems to happen if a number of messages get pushed into the queue at the same time. What I am looking for is the lowest latency dispatch of messages. The TPS i am currently executing is around 2400.

Observations
Running jhiccup from Azul shows max pause time from GC as 20ms
Timestamps placed on messages show several 100ms delay (sometimes several thousand) in the zeromq code
This seems to be synchronized with 'batches' / sending bursts into the JeroMQ API

As an additional aside - I see the same issue on zeromq with native bindings.

0.3.0 release?

Hi,

Are there any plans to make the 0.3.0 release? I have a dependent library and I'd rather target the new packages.

Cheers

asyncsrv is not working

It doesn't print anything out at all and doesn't appear functional.

I have a working copy on the "new" APIs in my own forked version. In order to get it working for me, I couldn't use the "inproc" socket type, as it ended up blocking and hanging when it hit the HWM. I switched to "ipc" and that fixed the issue for my code.

Switching to an ipc socket for the proxy did NOT help for the version here, however.

There are probably multiple issues at work here. I haven't investigated why the version here is non-functional.

ZContext not closing all open files.

If I create a new ZContext, then call .createSocket with any type -- ex. ZMQ/REQ, lsof shows that 26 new files are opened. If I call .close on the context, I see that 22 of those files are closed, leaving behind 4 open files that aren't cleaned up -- two sockets and two pipes.

What is causing this?

In clojure code...

;;; Leaves 4 files open -- two sockets and two pipes.
(let [context (org.zeromq.ZContext.) 
      socket (.createSocket context ZMQ/REQ)] 
  (.close context))

mvn clean install fails on error in testcase

Cannot compile without deactivating the test cases. Error is follows

[ERROR] .../jeromq/src/test/java/zmq/Helper.java:[25,18] error: DummySocketChannel is not abstract and does not override abstract method getRemoteAddress() in SocketChannel

error : NegativeArraySizeException

java.lang.NegativeArraySizeException
at zmq.Msg.size(Msg.java:140)
at zmq.Msg.(Msg.java:62)
at zmq.Decoder.eight_byte_size_ready(Decoder.java:151)
at zmq.Decoder.next(Decoder.java:74)
at zmq.DecoderBase.process_buffer(DecoderBase.java:124)
at zmq.StreamEngine.in_event(StreamEngine.java:306)
at zmq.IOObject.in_event(IOObject.java:101)
at zmq.Poller.run(Poller.java:232)
at java.lang.Thread.run(Thread.java:662)

private final void size (int size_)
    {
        size = size_;
        if (type == type_lmsg) {
            flags = 0;

            buf = ByteBuffer.allocate(size_);
            data = null;
        }
        else {
            flags = 0;
            data = new byte[size_];
            buf = null;
        }
    }

when size_ < 0,will alert this error.

Context doesn't expose all it's options.

I'm curious as to why org.zeromq.ZMQ.Context doesn't expose all the options that zmq.Ctx has. For example if I want to raise the ZMQ_MAX_SOCKETS from 1024 to some other value I have to work with zmq.Ctx directly instead of working with the easy to use wrapper.

jeromq spins reading HUP on epoll_wait

I have caught jeromq spinning using 100% CPU, trying to read from an epoll descriptor that is returning HUP (i.e. it has been disconnected). It was coming through java.nio.channels.Selector.select from zmq.Poller.run (trace is below). In the trace, the epollWait call at the top of the stack isn't blocking -- it is returning quickly and spinning back around. With strace, I can see the thread repeatedly calling epoll_wait and getting a HUP response.

Note that I'm not 100% sure of the diagnosis above. I have many threads in my daemon that are using epoll, so it might be one of the others. However, this is a new issue for me, and jeromq is the thing that I've added recently, so I strongly suspect that it is jeromq. Also, it seems to have gone away with a switch to jzmq.

This looks to be a known nio / JDK bug. Netty has workaround code for a very similar sounding issue. See EPOLL_BUG_WORKAROUND in io.netty.channel.socket.nio.NioEventLoop in https://github.com/netty/netty.

I haven't investigated this to completion -- I have found that jzmq is better for my needs, so I am switching and I am not looking at this problem any further. I wanted to report it though in case someone else wants to fix the problem. I recommend porting the workaround code from Netty and seeing if that solves the problem.

Ubuntu 12.04.
OpenJDK 7. openjdk-7-jdk 7u9-2.3.3-0ubuntu1~12.04.1
jeromq 0.2.0 from Maven / oss.sonatype.org

"iothread-2" prio=10 tid=0x00007f943c622000 nid=0x34d0 runnable [0x00007f94314f6000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:81)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
- locked <0x00000005857a32f8> (a sun.nio.ch.Util$2)
- locked <0x00000005857a32e8> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000005857a3058> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
at zmq.Poller.run(Poller.java:202)
at java.lang.Thread.run(Thread.java:722)

PULL socket not receiving after reconnecting to ROUTER

I've reproduced this issue in the code below. The server is a simple ROUTER that sends an enveloped message every 1s. The client side is a PULL socket using identity "puller1", it pulls a few messages from the server, disconnects itself, then reconnects to the server and starts pulling again; the PULL socket, however, is not able to receive additional messages after reconnecting. I tried replacing PULL with DEALER, but saw no change in behavior.

I'm building an instant messaging app with zeromq, and I need this functionality to send chat messages to users, asynchronously. I'm not using REQ/REP sockets due to their blocking behavior.

Java 1.6 and Jero 0.3.0-SNAPSHOT.

public class RouterPullTest {

public static void main(String[] args) {
    RouterServer server = new RouterServer();
    server.start();     
    Puller p = new Puller();
    p.start();
}

private static class Puller extends Thread {

    public void run() {         
        Context context;
        Socket puller; 

        //connect, receive a few msgs, disconnect, do that 3 times
        for( int i = 0; i < 3; i++){                
            log("(puller) connecting...");
            context = ZMQ.context(1);
            puller = context.socket(ZMQ.PULL); //tried DEALER too           
            puller.setIdentity("puller1".getBytes());
            puller.connect("tcp://localhost:5671");

            int msgsReceived = 0;               
            while( msgsReceived++ < 2){
                log("(puller) receiving...");
                String emptyFrame = puller.recvStr();
                if( "".equals(emptyFrame))
                    log( "(puller), got msg: " + puller.recvStr());
            }
            log("(puller) disconnecting...");
            //close after 2 messages received
            puller.close();
            context.term();
        }           
        log("(puller) shutting down");
    }
}

private static class RouterServer extends Thread {
    private int msgsSent;

    public void run(){
        Context context = ZMQ.context(1);
        Socket broker = context.socket(ZMQ.ROUTER);
        broker.bind("tcp://*:5671");
        try{
            while( msgsSent++ < 15){
                //create envelope manually
                broker.sendMore("puller1");
                broker.sendMore("");
                broker.send("hello from server!");

                log("(server) sent message to puller1");
                Thread.sleep(1000);
            }
        }catch(Exception e){            
            e.printStackTrace();
        }           
        broker.close();
        context.term();
    }
}

private static void log(String s){
    System.out.println(s);
}

}

Trouble downloading 0.3.0-SNAPSHOT

I can download 0.2.0 no problem, but I can't download the latest snapshot...

    <groupId>org.jeromq</groupId>
    <artifactId>jeromq</artifactId>
    <version>0.3.0-SNAPSHOT</version>

Latest dependency

Is it me or the latest dependency available (0.3.0-SNAPSHOT) doesn't work on maven?

Weird NPE on pub socket

This was my pseudo code in the beginning (in an singleton service object):
pub = ctx.createSocket(ZMQ.PUB);
pub.bind("tcp://*.5556");
While(!stop) {
MyObject o = aConcurrentQueue.poll()
if (o != null) {
pub.sendMore(e.getTopic());
pub.send(e.getBody());
} else {
Thread.sleep(10);
}
}
However, it produced NPE like:
java.lang.NullPointerException
at zmq.Mtrie.match(Mtrie.java:353)
at zmq.XPub.xsend(XPub.java:179)
at zmq.SocketBase.send(SocketBase.java:598)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1002)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:988)
(btw, I'm using jeromq 0.3.0)
Can't really figure out why this happened, any help would be thankful!

Issue with code in new org.zeromq package

The code that has been moved to org.zeromq is not identical to the code in org.jeromq. I'm not sure what's going on, but if you run the "identity" guide example, it passes with the org.jeromq import, but if you switch it to org.zeromq, it blows up with an NPE.

Can't clean exist when using a Proxy

I'm using a proxy to forward requests incoming on a router socket to several workers socket (dealer).

Them problem is that I can't clean exit, because the Proxy code hangs on the zmq_poll with a timeout of -1 (infinite waiting). https://github.com/zeromq/jeromq/blob/master/src/main/java/zmq/Proxy.java#L56

I'm setting linger = 0 on all sockets, then closing all sockets. But context.term() will still hang, because the proxy is locked on the zmq_poll.

Is there a work around for this?

socket.send() hangs forever when trying to send data to unbound socket with inproc protocol

Here is minimal testcase:

package jmqtest;

import java.util.Arrays;
import org.jeromq.ZMQ;

public class JmqTest {

    public static String endpoint = "inproc://service1";
    //public static String endpoint = "tcp://127.0.0.1:10050";
    private static ZMQ.Context ctx = ZMQ.context(1);
    private static byte[] msg = new byte[] {1, 2, 3};

    private static final Runnable r = new Runnable() {

        @Override
        public void run() {
            ZMQ.Socket server = ctx.socket(ZMQ.REP);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                System.out.println(ex);
            }
            server.bind(endpoint);
            byte[] recMsg = server.recv(0);
            boolean r = Arrays.equals(msg, recMsg);
            System.out.println(r ? "OK" : "Wrong message received");
            server.close();
        }

    };

    public static void main(String[] args) throws Exception {
        Thread t = new Thread(r);
        t.start();
        //Thread.sleep(1000);
        ZMQ.Socket client = ctx.socket(ZMQ.REQ);
        client.connect(endpoint);
        client.send(msg, 0);
        System.out.println("Waiting for thread to receive message...");
        t.join();
        client.close();
        ctx.term();
    }
}

When using jzmq instead of jeromq (change import in line 4), following exception is thrown:

Exception in thread "main" org.zeromq.ZMQException: Connection refused(0x3d)
at org.zeromq.ZMQ$Socket.connect(Native Method)
at jmqtest.JmqTest.main(JmqTest.java:37)

If bind() occurs before connect() (comment out lines 18-22 and uncomment line 35) everything works fine.

When using tcp transport instead of inproc (lines 8-9), late binding occurs and everything works fine too.

From the convenience point of view, late binding on inproc sockets would be ideal. From the interoperability point of view, exception on connect() should be thrown. In current state it's hard to diagnose, for example, misspelling of socket names in bind() and connect().

do not force log4j framework

Change the dependency scope to test for log4j and slf4j-log4j so that applications can choose their own backend to slf4j. Log4j is only used by the test classes so this shouldn't be an issue.

diff --git a/pom.xml b/pom.xml
index 4a67f3d..091aafc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,6 +31,7 @@
         <groupId>log4j</groupId>
         <artifactId>log4j</artifactId>
         <version>1.2.16</version>
+        <scope>test</scope>
         <exclusions>
           <exclusion>
             <groupId>com.sun.jdmk</groupId>
@@ -52,6 +53,7 @@
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
         <version>1.6.6</version>
+        <scope>test</scope>
       </dependency>
     <dependency>
       <groupId>junit</groupId>

ThreadLocal memory leak of type zmq.ZError on Tomcat 7

I'm running a simple pub/sub app in Tomcat 7.0.40, Windows 7 x64, JRE 1.6.0_41, and JeroMQ 0.3.0-SNAPSHOT. My code does not use ThreadLocal or any synchronization. The app starts up, sends a single message, then I reload the web container with new code/war. Upon reload, I use a shutdown hook to call context.close(), closing all sockets successfully, but afterwards I see the following error in Tomcat log, multiplying on each reload, eventually, the JVM crashes with out of memory.

SEVERE: The web application [/mcwebapp] created a ThreadLocal with key of type [zmq.ZError$1](value [zmq.ZError$1@1d48c]) and a value of type [java.lang.Integer](value [0]) but failed to remove it when the web application was stopped. Threads are going to be renewed over time to try and avoid a probable memory leak.

I've increased JVM heap and perm space settings, but this only delays the inevitable. Can I safely clean up the ThreadLocal zmq.ZError$1 that's being created so it doesn't leak?

sync with zeromq 3.2.1 RC

As the zeromq 3.2.1 RC is ready, jeromq also should compatible with it as they can communicate each other

Once socket enters exceptional state via SNDHWM or RCVHWM and nonblocking is used

version 0.3.1

The socket will never be able to come out of the exceptional state as the recv and send methods continue to return exceptional state e.g. null.

I have some new code I am about to opensource that uses the NIO selectablechannel and I found that the default HWM of 1000 under large load can cause the socket to get into the exceptional state and never come out. If I get around to it I will add a unit test to reproduce.

Valid to use PULL/PUSH with ROUTER socket?

Is PUSH/PULL valid to use with ROUTER? There's no mention of such a socket combo in the guide, I found no examples either.

I'm building a chat/im program, the server uses ROUTER/ROUTER, the clients use PUSH/PULL for sending and receiving messages from server. I also use client identities so the server can route messages to specific clients (for private chat, etc). It appears to work fine in my testing. I just want to know if this is a valid approach.

zmq.Poller.fd_table memleak

There seems to be a memory leak in zmq.Poller.fd_table.

The fd_table grows indefinately. This seems to be fixed by modifying lines 182-191 in zmq/Poller.java to the following:

if (pollset.key == null) {
    try {
        pollset.key = ch.register(selector, pollset.ops, pollset.handler);
    } catch (ClosedChannelException e) {
    }
} 


if (pollset.cancelled || !ch.isOpen()) {
    if(pollset.key != null) {
        pollset.key.cancel();
    }
    it.remove ();
}

*** if (pollset.cancelled || !ch.isOpen()) *** is the important line by also removing the entry if the channel is not open.

I'm not too familiar with the code base but maybe there is someone who might know what is causing the leak. I may possibly just be using the lib incorrectly.

Remove Thread.interrupt() restriction.

The wiki says:

"Context.term() must be called before interrupting a thread by Thread.interrupt. Otherwise internal pipe which is used to signal ZMQ commands will be shutdown unexpectedly."

However, in some cases this means that there is no clean way for an application to shut down when it is blocked on recv().

Consider a thread (thread B) that is blocked on a socket's recv().

Suppose the main thread (thread A) of the application wants to shut down, it needs to cleanly terminate thread B. This typically involves sending an interrupt to thread B to unblock, but wait! we need to call Context.term() first. However, the wiki says:

"Close all the sockets properly otherwise Context.term() will wait forever"

OK, we need to close the socket first, but wait! The ZMQ guide says:

"Isolate data privately within its thread and never share data in multiple threads. The only exception to this are ØMQ contexts, which are threadsafe."

Hmm, so the only safe way to close the socket is on the thread that is currently blocked (thread B), too bad we can't do that.

So now the only way for thread B to unblock is to wait for a new message or a timeout (which may have been set to no timeout). If there are no more messages and there is no timeout, the thread will be blocked indefinitely. But regardless, there should be a means to preemptively unblock the thread.

Even if we were to ignore the shutdown problem, this restriction also leads to more awkward code because now the context needs to be exposed to other threads where it might not have been necessary without the restriction.

In fact, if you look at the Java example in the ZMQ guide and even the examples in this repository, it breaks the interrupt rule.

This is a pretty common pattern you see throughout the code:

while (!Thread.currentThread ().isInterrupted ()) {
    ...
}
receiver.close ();
context.term ();

Notice how it expects an interrupt before closing the socket and context.

Add ByteBuffer API for send and recv

I added a bytebuffer API to JZMQ Sockets with the following signature:

int sendByteBuffer(ByteBuffer buffer, int flags)
int recvByteBuffer(ByteBuffer buffer, int flags)

Ability to detect connect / disconnect with push / pull sockets

I am not sure if this is a defect or enhancement but I do not see a way to get a list of currently connected sockets in a push / pull pattern. For example, if I bind a server to a port as push connections and connect a set of clients to pull messages how can I detect if clients connect / disconnect. I tried to use the monitor approach but it does not appear to be compatible (at least the example) with the org.zeromq interfaces and when I did get a version working it did not notify on connect / disconnect of clients. Any thoughts on this enhancement?

PUSH socket block send() calls when disconnecting one peer

The following class exhibits different behaviour for jeromq compared to jzmq.

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class SimpleZMQTest {

    public static void main(String[] args) throws InterruptedException {
        Context context = ZMQ.context(1);

        Socket push = context.socket(ZMQ.PUSH);
        push.connect("tcp://localhost:1234");

        send(push, "msg1");
        send(push, "msg2");
        send(push, "msg3");

        push.connect("tcp://localhost:2468");

        send(push, "msg4");
        send(push, "msg5");
        send(push, "msg6");

        push.disconnect("tcp://localhost:1234");
        Thread.sleep(10);

        send(push, "msg7"); // this call blocks
        send(push, "msg8");
        send(push, "msg9");

    }

    private static void send(Socket push, String msg) {
        System.out.println("Sending " + msg);
        push.send(msg);
    }

}

When the socket is disconnected from one of its two connected peers, the next send() call blocks. I would expect the send call to return (and queue the message asynchronously onto the 2468 channel), as it does in jzmq.

TestShutdownStress hangs Windows 7

While running test suite system completely freezes after several seconds of execution of TestShutdownStress. Only hard reboot helps.

Environment:

  • JeroMQ 0.3.2-SNAPSHOT
  • Windows 7 Enterprise SP1 64 bit
  • Java 1.7.0_17 (64 bit, Oracle)

I didn't test it (yet) on other environments.

socket bind fails with NPE rather than returning false when address in use 0.3.1

java.lang.NullPointerException
at zmq.ZMQ$Event.write(ZMQ.java:193)
at zmq.SocketBase.monitor_event(SocketBase.java:1181)
at zmq.SocketBase.event_closed(SocketBase.java:1156)
at zmq.TcpListener.close(TcpListener.java:123)
at zmq.TcpListener.set_address(TcpListener.java:146)
at zmq.SocketBase.bind(SocketBase.java:354)

Reaper Thread cause NullPointerException

Here is the Exception.

Exception in thread "reaper-1" java.lang.NullPointerException
at zmq.Mtrie.rm_helper(Mtrie.java:160)
at zmq.Mtrie.rm_helper(Mtrie.java:160)
at zmq.Mtrie.rm_helper(Mtrie.java:160)
at zmq.Mtrie.rm_helper(Mtrie.java:160)
at zmq.Mtrie.rm_helper(Mtrie.java:160)
at zmq.Mtrie.rm_helper(Mtrie.java:160)
at zmq.Mtrie.rm_helper(Mtrie.java:160)
at zmq.Mtrie.rm_helper(Mtrie.java:160)
at zmq.Mtrie.rm_helper(Mtrie.java:160)
at zmq.Mtrie.rm_helper(Mtrie.java:160)
at zmq.Mtrie.rm_helper(Mtrie.java:182)
at zmq.Mtrie.rm(Mtrie.java:134)
at zmq.XPub.xterminated(XPub.java:166)
at zmq.SocketBase.terminated(SocketBase.java:980)
at zmq.Pipe.process_pipe_term_ack(Pipe.java:377)
at zmq.ZObject.process_command(ZObject.java:98)
at zmq.SocketBase.process_commands(SocketBase.java:809)
at zmq.SocketBase.in_event(SocketBase.java:908)
at zmq.Poller.run(Poller.java:231)
at java.lang.Thread.run(Thread.java:722)

sorry there is no reproduce code..but i got this exception many times

Cannot create IPv6 connections

IPv6 support appears to be broken.

Scenario: try to connect a dealer to a router (although I doubt the socket type matters) using an IPv6 address. Both have the IPv4Only option set to false.

Result: the router socket never receives any messages.

The same code works fine with IPv4 addresses.

NullPointerException

with head revision(77cc1d4) when i want compile project it gives
Running zmq.TestMonitor
Exception in thread "iothread-2" java.lang.NullPointerException
at zmq.ZMQ$Event.write(ZMQ.java:186)
at zmq.SocketBase.monitor_event(SocketBase.java:1141)
at zmq.SocketBase.event_accepted(SocketBase.java:1100)
at zmq.TcpListener.accept_event(TcpListener.java:112)
at zmq.IOObject.accept_event(IOObject.java:116)
at zmq.Poller.run(Poller.java:235)
at java.lang.Thread.run(Thread.java:724)

and hangs there

How to work around invalid socket combinations

I want to integrate message flow between incompatible socket types; is there anything technically wrong with doing the following? I've got it working, all seems fine, I just want to make sure doing so will not expose me to api issues down the road.

Socket puller = ctx.createSocket(ZMQ.PULL);
Socket publisher = ctx.createSocket(ZMQ.PUB);
...
publisher.send( puller.recv());

or

Socket router = ctx.createSocket(ZMQ.ROUTER);
router.send(puller.recv());

ConcurrentModificationException of org.jeromq.ZMQ.Poller

I got a ConcurrentModificationException when

java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextEntry(HashMap.java:793)
at java.util.HashMap$KeyIterator.next(HashMap.java:828)
at java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1008)
at zmq.ZMQ.zmq_poll(ZMQ.java:653)
at org.jeromq.ZMQ$Poller.poll(ZMQ.java:1488)

I have check the code
org.jeromq.ZMQ

The selector is hold in the ThreadLocal variable.

If I create a Poller in main thread and put it to a worker thread.

Then create another Poller in main thread and put it to another worker thread.

Both the tow Poller will use the same Selector

It will throw ConcurrentModificationException.

And a thread blocked on a register method in zmq.ZMQ.java

After I change my code to create the Poller in their own worker thread, The Exception never thrown again.

I wonder if this ThreadLocal holder is necessary. And there is lack of document to indicate this. Or it is just a jdk bug?

OS: ubuntu, kenerl 2.6.38 , x64
Java: oracle jdk 1.6.0_34
jeromq: 0.2.0

Issue with ZContext in the mtrelay guide example

I ported the mtrelay guide example over to use the ZContext class. When I run it, I get:

Step 1 ready, signaling step 2
Exception in thread "reaper-1" java.lang.NullPointerException
at zmq.Ctx.destroy_socket(Ctx.java:328)
at zmq.ZObject.destroy_socket(ZObject.java:144)
at zmq.SocketBase.check_destroy(SocketBase.java:942)
Test successful!
at zmq.SocketBase.start_reaping(SocketBase.java:758)
at zmq.Reaper.process_reap(Reaper.java:133)
at zmq.ZObject.process_command(ZObject.java:114)
at zmq.Reaper.in_event(Reaper.java:90)
at zmq.Poller.run(Poller.java:231)
at java.lang.Thread.run(Thread.java:722)

I'm using the ZContext.destroy() method to clean everything up.

testcase code:

package guide;

import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
 * Multithreaded relay
 */
public class mtrelay_bugtestcase {

    private static class Step1 extends Thread
    {
        private ZContext context;

        private Step1 (ZContext context)
        {
            this.context = context;
        }

        @Override
        public void run(){
            //  Signal downstream to step 2
            Socket xmitter = context.createSocket(ZMQ.PAIR);
            xmitter.connect("inproc://step2");
            System.out.println ("Step 1 ready, signaling step 2");
            xmitter.send("READY", 0);
            xmitter.close ();
        }

    }
    private static class Step2 extends Thread
    {
        private ZContext context;

        private Step2 (ZContext context)
        {
            this.context = context;
        }

        @Override
        public void run(){
            //  Bind to inproc: endpoint, then start upstream thread
            Socket receiver = context.createSocket(ZMQ.PAIR);
            receiver.bind("inproc://step2");
            Thread step1 = new Step1 (context);
            step1.start();

            //  Wait for signal
            receiver.recv(0);
            receiver.close ();

            //  Connect to step3 and tell it we're ready
            Socket xmitter = context.createSocket(ZMQ.PAIR);
            xmitter.connect("inproc://step3");
            xmitter.send("READY", 0);

            xmitter.close ();
        }

    }
    public static void main (String[] args) {

        ZContext context = new ZContext();

        //  Bind to inproc: endpoint, then start upstream thread
        Socket receiver = context.createSocket(ZMQ.PAIR);
        receiver.bind("inproc://step3");

        //  Step 2 relays the signal to step 3
        Thread step2 = new Step2 (context);
        step2.start();

        //  Wait for signal
        receiver.recv(0);
        receiver.close ();

        System.out.println ("Test successful!");
        context.destroy();
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.