Git Product home page Git Product logo

mbassador's People

Contributors

bdavisx avatar bennidi avatar bigbear3001 avatar chriskingnet avatar cyberoblivion avatar dependabot[bot] avatar durron597 avatar kenfromnn avatar kolybelkin avatar leif81 avatar lennartj avatar manish364824 avatar merjadok avatar rossi1337 avatar toddcostella avatar yaronyam avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mbassador's Issues

How to deal with event handler priorities which are not known until runtime?

In my specific use case I have a set of objects in which each of them can have one or more annotated handler methods. All handler methods of a particular object must have the same priority but this priority is not known until runtime. This problem boils down to the handlers being called in the same order as the objects were registered to the event bus (FIFO-order). Is it possible to guarantee this behavior, and, if so, how?

Improve message publication performance

I wonder if SubscriptionManager.getSubscriptionsByMessageType(Class messageType) can be improved by using a Cache. If you're willing to add Guava as a dependency, you could create a Cache<Class, Collection<Subscription>> to keep track of it, so the work only needs to be one once instead of every time. This could be especially slow for messages that have many interfaces and subclasses.

In order to keep it accurate, when subscribe and unsubscribe is called, you could use invalidateAll, something like invalidateAll(ReflectionUtils.getSuperClasses(listener)).

This is complicated enough that I haven't tried implementing it yet [so I don't know what the performance improvement would be], but I might if you think it's a good idea.

How To Use MBassador?

I have write some class to test MBassador. But I found the main thread is hand up. Unless i use system.exists(0)..Did i miss something?

testthread.JPG


public class PersonNewEvent {

private String name="cool name";
private int age = 20;
/**
 * @return the name
 */
public String getName() {
    return name;
}
/**
 * @return the age
 */
public int getAge() {
    return age;
}

/* (non-Javadoc)
 * @see java.lang.Object#toString()
 */
@Override
public String toString() {
    StringBuilder builder = new StringBuilder();
    builder.append("PersonNewEvent [name=").append(name).append(", age=")
            .append(age).append("]");
    return builder.toString();
}

}

public class PersonEventHandler {

@Listener
public void handlePersonNewEvent(PersonNewEvent event) {
    System.out.println("handle PersonNewEvent" + event.toString());
}

@Listener
public void handlePersonNewEvent2(PersonNewEvent event) {
    System.out.println("handle PersonNewEvent 2" + event.toString());
}

}
--------------------The Test Class-------------------
public class TestEvent {
public static void main(String[] args) {
MBassador bus = new MBassador(new BusConfiguration());
PersonEventHandler listener = new PersonEventHandler();
//the listener will be registered using a weak-reference
bus.subscribe(listener);
// objects without handlers will be ignored
//bus.subscribe(new ClassWithoutAnyDefinedHandlers());
PersonNewEvent event = new PersonNewEvent();
bus.publish(event);
bus.unsubscribe(listener);
System.out.println("........");
//System.exit(0);
}
}

mbassador in the GWT environment

In the GWT or SmartGWT I found two issues:

  1. The library cannot be used because there is no GWT module.
  2. If I try to include the source to be compiled in the GWT environment it won’t compile due to the reference to threading API.

My request is that one of the two solutions below to be provided:

A. A version of the library that 1) also has a GWT module for inheritance and 2) has no reference to threading API

B. A source version which includes no reference to threading API that can be included and compiled along with the application code.
A is preferred but I understand the B is less work. Either would be usable.

Just as a motivator I believe that Web applications are trending to the AJAX paradigm and also that GWT and/or SmartGWT is very popular with Java developers. As you've pointed out event driven UI applications is a great loosely coupled paradigm and helps support the MVC pattern. This is a great library that could be used with a GWT application and reduce a lot of hard-wired code.

GWT's built-in EvetBus is pretty message, complicated, hard to read.

Thanks.

Change Request

Thanks for the great software.

Is there a way to report to the publisher (the entity that calls publish()) the error that was thrown from the handler. Currently, I can register an IPublicationErrorHandler but this approach has couple limitation as below:

  1. I can add an error handler but cannot remove one.
  2. there is no way to register a per thread error handler. So, if my publisher is multithreaded e.g., a singleton, then I cannot register multiple error handlers to handle errors for each invoking thread separately.

My usecase is quite straightforward. I have a singleton which publishes synchronous events via an Mbassador object, and I want to take some action based on whether an error has occurred. Since each publication is occurring in its own thread, I need to handle each thread's error in isolation/separately.

Simply adding a version of publish that takes an IPublicationErrorHandler as extra parameter will meet my requirement. The MBassador implementation will then call handle error on all registered error handlers (as before) as well as on my passed/adhoc error handler

Thanks

Filter on listeners

I have a unique tree-like Hierarchical data structure composed of Nodes. I would like to send events from a node to all the other Nodes but the one who sent it.

I was thinking I could do something like this:
Node.java:

@Handler(filters={@Filter(IgnoreSelfFilter.class)})
public void nodeStarted(NodeMessage message) {

}
...
public void start() {
   eventBus.publish(new NodeMessage(this, "blabla"));
}

IgnoreSelfFilter.java:

public boolean accepts(Object message, MessageHandlerMetadata meta) {
    NodeMessage nm = (NodeMessage)message;
    return message.getNode() != meta.getListener();
}

Unfortunately, getListener doesn't exist--and upon further investigate the Filter is run only once, and not per listener.

I have another related usecase that I hope MBassador can solve, but perhaps you have an alternate solution? =)
Since the nodes are in a tree-like data structure, most messages should be sent ONLY to the node parents. Again, if filters were applied per listener, I could easily check if the current listener is part of nm.getNode().getParents(). A ugly workaround would be to implement this check in the message handler, but it's terribly inefficient since the message is dispatched to ALL nodes even if only 1 parent actually does something meaningful.

Perhaps a sophisticated (but very elegant) solution like MBassador isn't what's best for my usecases. Do you know of any other libraries that work well with tree-like hierarchy. Thanks.

Can I Using MBassador In Our Project..

Our project is based on spring. And we have layer like facade/application/domain described in DDD.

Is it possible to use MBassador as a infrastructure component? Like a mechanism to publish domain event?

For example: When book is purchase

book.purchase(){
//how to inject the MBassador eventBus?
EventBus.raiseEvent(bookPurchasedEvent);
}

But I can't figer out how to inject MBassador into our domain object..

Missing events

Hi,
i am benchmarking mbassador and others eventbus software, there is a bug in mbassador, we miss events, here is the reproductible test :
change the nbListener value, there are always some dozen of events missing.
fyi :same test with EventBus library work like a charm with 60k listener

public class MbassadorTest {
private AtomicInteger nbreceived = new AtomicInteger(0);
int nbListener = 2000;
@before
public void sendEvent() {
MyEventBus.getInstance().subscribe(this);
for (int i = 0; i < nbListener; i++) {
MyEventBus.getInstance().subscribe(new MyListener());
}
MyEventBus.getInstance().publish(new MyEvent());
}

@Test
public void check() {
    System.out.println("received : " + nbreceived.get());
    assertEquals(nbListener, nbreceived.get());
}

@Listener
public void endEvent(MyEndEvent event) {
    nbreceived.incrementAndGet();
    System.out.println("nb received : " + nbreceived.get());
}

}

public class MyListener {
public MyListener() {
}
@Listener
public void WorkerChangeEvent(MyEvent e) {
MyEventBus.getInstance().publish(new MyEndEvent());
}
}
public class MyEvent {
public MyEvent() {
}
}
public class MyEndEvent {
public MyEndEvent() {
}
}

Getting exception propagate from the handler to the sending code

Hi,

I'm using MBassador with great joy, currently only using its basic features.

I would like to decouple few aspects of my application, like e.g. authorization (permission checks inside my BL). For that, synchronous messages can be used. But I would like to get authorization exceptions propagated out to the code that sends the message. e.g.:

try
{
bus.publish(new PermissionCheck(object, action, currentUser));   // assume handler throws when needed
}
catch (PermissionException e)
{
...
}

As handlers can be priorities, I think such approach can make sense.
According to the usage documentation, it seems such use case / behavior is not currently supported.
Do you think it can be supported? or do you see some other way to support my use case?
Thanks
Gilad

Issue with message delivery when using method overloading and inheritance

I've recently switched from EventBus to MBassador for performance reasons. It makes a world of difference! However, I did run into an issue which results in messages not being delivered. It's easily reproducible and is best explained by an example.

In the example below, I'd expect the two message handlers that listen for event "TestEventA" to be invoked. But instead only one is invoked. However, if the event handler for event "TestEventB " is removed, both message handlers are invoked as expected. I've narrowed down the issue to it having something to do with method overloading and inheritance (also see the comments in the code for useful hints).

Also, though not really relevant for this issue, Google makes it really hard to find your library due to MBassador being "corrected" to ambassador. It's a shame that such a great library is so hard to find.

Issue.java:

import net.engio.mbassy.listener.*;

public class Issue extends IssueBase
{  
  public static void main(String[] args)
  {
    new Issue();
    mBassadorA.publishAsync( new TestEventA() );

    try
    {
      Thread.sleep(1000);
    }
    catch (InterruptedException e)
    {
      System.out.println("Interrupted");
    }

    System.out.println("Bye!");
  }

  /**
   * (!) Remove this method and BOTH message handlers in class IssueBase will be called.
   */
  @Handler
  public void handleEvent(TestEventB event)
  {
    System.out.println("Received event B");
  }
}

IssueBase.java:

import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.listener.References;

@Listener(references = References.Strong)
public class IssueBase
{
  static MBassador<BaseEventClassA> mBassadorA = new MBassador<BaseEventClassA>( BusConfiguration.Default() );

  public IssueBase()
  {
    mBassadorA.subscribe(this);
  }

  /**
   * (!) If this method is removed, NO event handler will be called.
   */
  @Handler
  public void handleEventWithNonOverloadedMethodName(TestEventA event)
  {
    System.out.println("Received event A (non-overloaded method)");
  }

  @Handler
  public void handleEvent(TestEventA event)
  {
    System.out.println("Received event A (overloaded method)");
  }

  // ---------------------------------------------------------------------------

  public static class TestEventA extends BaseEventClassA {}
  public static class TestEventB extends BaseEventClassA {}
  public static class BaseEventClassA {}
}

Potential for integer overflow in SubscriptionByPriorityDesc comparator which can result in handlers being called in the wrong order

There's a bug in the SubscriptionByPriorityDesc comparator:

public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
    @Override
    public int compare(Subscription o1, Subscription o2) {
        int result = o1.getPriority() - o2.getPriority();
        return result == 0 ? o1.id.compareTo(o2.id) : result;
    }
};

The following line has potential for integer overflow:

int result = o1.getPriority() - o2.getPriority();

When o1.getPriority() returns -2 and o2.getPriority() returns Integer.MAX_VALUE, then result will evaluate to Integer.MIN_VALUE and the handlers will get called in the wrong order.

I verified this behavior by setting a breakpoint in the comparator and using the testcode shown below.

ListeningBean.java:

public class ListeningBean {

    @Handler(priority = -2)
    public void handleTestMessageHighestPriority(TestMessage message) {
        System.out.println("handleTestMessageHighestPriority");
    }

    @Handler(priority = Integer.MAX_VALUE)
    public void handleTestMessageLowestPriority(TestMessage message) {
        System.out.println("handleTestMessageLowestPriority");
    }

}

App.java:

public class App {

    public static void main(String[] args ) {
        MBassador<TestMessage> bus = new MBassador<TestMessage>(BusConfiguration.Default());
        ListeningBean listener = new ListeningBean();
        bus.subscribe(listener);
        TestMessage message = new TestMessage();
        bus.publish(message);
    }

}

TestMessage.java:

public class TestMessage {}

Output:

handleTestMessageLowestPriority
handleTestMessageHighestPriority

Unit Tests failing

Just downloaded mbassador to try out in code I'm writing. The following tests are failing for me:

  1. net.engio.mbassy.MBassadorTest.testAsynchronousMessagePublication()
  2. net.engio.mbassy.SyncBusTest.testSynchronousMessagePublication()
  3. net.engio.mbassy.DeadMessageTest.testDeadMessage()

They all seem to fail in the same way: the expected count and the actual count of iterations run are not equal. Each time I run, I get different values for the actual iterations, so my expectation is that something isn't waiting around for the concurrent tests to complete. I've glanced briefly at the code and it looks as though it ought to work, but it still fails.

In addition, if I run the tests using the debugger some of the failing tests succeed (although its not the same set every time), but running outside of the debugger all three fail every time.

My configuration is as follows:

Win 7, 32GB RAM, IBM RSA 8.5 (running Java 1.6)

Handlers priority

The priority of the handlers doesn't work correctly.

Subscription.java

public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
        @Override
        public int compare(Subscription o1, Subscription o2) {
            int byPriority = ((Integer)o1.getPriority()).compareTo(o2.getPriority());
            return byPriority == 0 ? o1.id.compareTo(o2.id) : byPriority;
        }
    };

Comparator name is SubscriptionByPriorityDesc, but it sorts by asc.
It seems to me that, the correct code is :

int byPriority = ((Integer)o2.getPriority()).compareTo(o1.getPriority());

What is the difference between post and publish?

I am familiar with Guava's EventBus library. But it seems mbassador's publish is the same as EventBus's post. However, it seems to me, from briefly looking at the source code, that the post seems to return a 'packaged' message that hasn't been published yet.

I wanted some clarification on this. Perhaps this should be added to the wiki (i.e terminology section)?

Probably filtered events should be treated as dead events

Hello Benjamin,

I just thought that it would be nice to have possibility to process filtered events as dead events.

Consider the following example.
I need to have several listeners which should filter event objects depending on different criteria. For example Listener1 should process events with their member variable equal to 'A', Listener2 - "B", Listener3 - "C" and so on.

Also I need to have ListenerX which should process all other events which are not covered by Listener1, Listener2 ... ListenerX-1.

Currently I can achieve it by creating specific filter for ListenerX which will filter all events satisfying Listener1, Listener2 ... ListenerX-1.

Such approach is not very comfortable because I need to have too many conditions in ListenerX filter.

It would be much easier for me to handle all events which have been filtered by Listener1, Listener2 ... ListenerX-1 as dead events.

What do you think?

'pendingMessages' LinkedBlockingQueue capacity

Hello,

Is there any plans to allow configuring the 'pendingMessages' LinkedBlockingQueue capacity?
I think it would be nice to have such possibility to prevent excessive queue expansion when producer thread puts more elements than the consumer consumes.

Sample application never stops

I created the following sample application in order to test how Mbassador works and found that it never stops, even after all application's threads exit.

Applications source code is below.

Event class:

package org.mbassy.tests;

public class Event {

    private int eventId;
    private String eventName;

    public Event(final int eventId, final String eventName) {
        this.eventId = eventId;
        this.eventName = eventName;
    }

    public void setEventId(final int eventId) {
        this.eventId = eventId;
    }

    public int getEventId() {
        return eventId;
    }

    public void setEventName(final String eventName) {
        this.eventName = eventName;
    }

    public String getEventName() {
        return eventName;
    }

    @Override
    public String toString() {
        return "Event [ID = " + eventId + ", Name = " + eventName + "]";
    }

}

Another event class:

package org.mbassy.tests;

public class MoreSpecificEvent extends Event {

    public MoreSpecificEvent(final int eventId, final String eventName) {
        super(eventId, eventName);
    }

    @Override
    public String toString() {
        return "MoreSpecificEvent [" + super.toString() + "]";
    }

}

Event handler:

package org.mbassy.tests;

import java.util.concurrent.atomic.AtomicInteger;

import org.mbassy.listener.Listener;
import org.mbassy.listener.Mode;

public class EventHandler {

    /**
     * I am going to count this class methods calls.
     */
    private final AtomicInteger counter = new AtomicInteger(0);

    /**
     * Event handler need to implement equals and hashCode method since they stored in
     * WeakHashMap inside the org.mbassy.common.ConcurrentSet<T> class.
     * So, I need to have add at least one member variable.
     */
    private final int uid;

    public EventHandler(final int uid) {
        this.uid = uid;
    }

    @Listener(dispatch = Mode.Asynchronous)
    public void handleEvent(final Event event) {
        System.out.println(counter.incrementAndGet() + " handlings. Current event: " + event.toString());
    }

    @Listener(dispatch = Mode.Asynchronous)
    public void handleEvent(final MoreSpecificEvent event) {
        System.out.println(counter.incrementAndGet() + " handlings. Current event: " + event.toString());
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + uid;
        return result;
    }

    @Override
    public boolean equals(final Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        final EventHandler other = (EventHandler) obj;
        if (uid != other.uid) {
            return false;
        }
        return true;
    }

}

Main class:

package org.mbassy.tests;

import org.mbassy.BusConfiguration;
import org.mbassy.MBassador;

public class Launcher {

    public static void main(final String[] args) throws InterruptedException {
        final MBassador<Event> eventBus = new MBassador<Event>(new BusConfiguration());

        // registering
        eventBus.subscribe(new EventHandler(1));

        // publishing
        for (int i = 0; i < 10; i++) {
            Event myEvent;
            if (i % 2 == 0) {
                myEvent = new Event(i, "Event ID: " + i);
            }
            else {
                myEvent = new MoreSpecificEvent(i, "Event ID: " + i);
            }
            eventBus.publishAsync(myEvent);
        }

        System.out.println("Launcher finished its job.");
    }

}

All works just fine except the fact that this application never stops.

I put a breakpoint at System.out.println("Launcher finished its job.") line and inspected running threads. They are:

EventBus Launcher [Java Application]    
    org.mbassy.tests.Launcher at localhost:4227 
        Thread [main] (Suspended (breakpoint at line 26 in Launcher))   
        Daemon Thread [Thread-0] (Running)  
        Daemon Thread [Thread-1] (Running)  
        Thread [pool-1-thread-1] (Running)  
        Thread [pool-1-thread-2] (Running)  
        Thread [pool-1-thread-3] (Running)  
        Thread [pool-1-thread-4] (Running)  
        Thread [pool-1-thread-5] (Running)  
    C:\DEV\Programs\Java\jdk1.6.0_20\bin\javaw.exe (12.12.2012 16:18:32)    

Looks like we have:

  • the main thread, suspended at breakpoint;
  • two daemon threads, which corresponds to message dispatchers
  • and five threads belonging to the ThreadPoolExecutor object which was created in the org.mbassy.BusConfiguration class constructor.

After inspecting all this information I just resumed the main thread and found that it finished its job and disappeared from running treads list:

EventBus Launcher [Java Application]    
    org.mbassy.tests.Launcher at localhost:4227 
        Daemon Thread [Thread-0] (Running)  
        Daemon Thread [Thread-1] (Running)  
        Thread [pool-1-thread-1] (Running)  
        Thread [pool-1-thread-2] (Running)  
        Thread [pool-1-thread-3] (Running)  
        Thread [pool-1-thread-4] (Running)  
        Thread [pool-1-thread-5] (Running)  
        Thread [DestroyJavaVM] (Running)    
    C:\DEV\Programs\Java\jdk1.6.0_20\bin\javaw.exe (12.12.2012 16:18:32)    

Also we can see new thread called 'DestroyJavaVM' in this stack.
Application remains in this state forever, and the only way to stop it is to terminate the VM.

I found only one way to avoid it: get the ExecutorService object by calling the org.mbassy.MBassador.getExecutor() method and call the shutdown() on it:

Main class:

public class Launcher {

    public static void main(final String[] args) throws InterruptedException {

        ... create MBassador instance, register, publish ...

        Thread.sleep(1000); // just to make sure that all events are processed
        ((ExecutorService) eventBus.getExecutor()).shutdown(); // force shutdown
        System.out.println("Launcher finished its job.");
    }

}

Unsubscribe

Hi,

For some reason I still get messages published to my listeners after I unsubscibed them. I looked at the code and I found following:

The message is published to all listeners which are in following collection: subscriptionsPerMessage. This collection is filled when calling subscribe(), but it is not cleared when calling unsubscribe().

The unsubscribe() only looks at the subscriptionsPerListener collection.

Can someone help me with this? Am I understanding/using it wrong?

Thanks!!

Some suggestions abount BusConfiguration and @Handler.

#1:

net.engio.mbassy.bus.BusConfiguration has only one default constructor:

    public BusConfiguration() {
        super();
        this.numberOfMessageDispatchers = 2;
        this.maximumNumberOfPendingMessages = Integer.MAX_VALUE;
        this.executor = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), DaemonThreadFactory);
    }

so, when i construct it and i want to set it's executor to another executor, i have to first get the default one and shutdown it. the code is tedious:

        BusConfiguration eventBusConfig = new BusConfiguration();
        // used in synchronous environment only!
        eventBusConfig.setMaximumNumberOfPendingMessages(1);
        eventBusConfig.setNumberOfMessageDispatchers(0);

        ExecutorService defaultExecutor = eventBusConfig.getExecutor();
        defaultExecutor.shutdown();
        try {
            defaultExecutor.awaitTermination(1, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            logger.error("Fail to closed default executor service!", e);
            // ignored
        }
        eventBusConfig.setExecutor(new ThreadPoolExecutor(1, 1,
                                                          0L, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<Runnable>(), new ThreadFactory()
        {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = Executors.defaultThreadFactory().newThread(r);
                thread.setDaemon(true);
                return thread;
            }
        }));
        eventBusConfig.setMetadataReader(new SubscribeConfigChangedMedataReader());
        this.eventBus = new MBassador<>(eventBusConfig);

may be it need more suitable design, for example: move the code in constructor to Default() method:

 public static BusConfiguration Default() {
      BusConfiguration defaultConfig =  new BusConfiguration();
      // initialize default value
     return defaultConfig;
 } 

#2:

the test case: src\test\java\org\mbassy\ConcurrentSetTest.java has a method called testIteratorCleanup():

    @Test
    public void testIteratorCleanup(){
        final HashSet<Object> persistingCandidates = new HashSet<Object>();
        final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
        Random rand = new Random();

        for (int i = 0; i < numberOfElements; i++) {
            Object candidate = new Object();

            if (rand.nextInt() % 3 == 0) {
                persistingCandidates.add(candidate);
            }
            testSet.add(candidate);
        }

        // this will remove all objects that have not been inserted into the set of persisting candidates
        runGC();


        ConcurrentExecutor.runConcurrent(new Runnable() {
            @Override
            public void run() {
                for (Object testObject : testSet) {
                    // do nothing
                    // just iterate to trigger automatic clean up
                    System.currentTimeMillis();
                }
            }
        }, numberOfThreads);

        assertEquals(persistingCandidates.size(), testSet.size());
        for (Object test : testSet) {
            assertTrue(persistingCandidates.contains(test));
        }
    }

in my machine(windows 8 x64 + jdk 1.7_10), this test case will fail due to GC threads is running slowly. i have to add some time-pause to get around it.

    @Test
    public void testIteratorCleanup() throws Exception{
        final HashSet<Object> persistingCandidates = new HashSet<Object>();
        final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
        Random rand = new Random();

        for (int i = 0; i < numberOfElements; i++) {
            Object candidate = new Object();

            if (rand.nextInt() % 3 == 0) {
                persistingCandidates.add(candidate);
            }
            testSet.add(candidate);
        }

        // this will remove all objects that have not been inserted into the set of persisting candidates
        runGC();
        // add this to prevent some case that GC Threads are processed too slow.
        Thread.sleep(2000);

        ConcurrentExecutor.runConcurrent(new Runnable() {
            @Override
            public void run() {
                for (Object testObject : testSet) {
                    // do nothing
                    // just iterate to trigger automatic clean up
                    System.currentTimeMillis();
                }
            }
        }, numberOfThreads);

        assertEquals(persistingCandidates.size(), testSet.size());
        for (Object test : testSet) {
            assertTrue(persistingCandidates.contains(test));
        }
    }

#3

i think it may be a good idea to add Custome annotation that can subscribe the event. for example, change @handle's defination, let it can annotate to another Custome annotation that can subscribe event:

@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD,ElementType.ANNOTATION_TYPE})
public @interface Handler {}

then, i can define my custome event handle annotation:

@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD})
@Handle
public @interface ConfigChangedEventHandle

Messages pooling

Hi!
Is it possible to pool created messages? I mean, that I can simply obtain new messages/events from my own pool, but I can't find a proper moment to free those objects (put them back into the pool).

Setting the Dispatcher thread count via the AbstractMessageBus constructor

Hi Benni,

I found the following in the code yesterday when my thread settings didn't seem to have the expected affect.

Starting at line 82 in org.mbassy.AbstractMessageBus is currently;

public AbstractMessageBus(int dispatcherThreadCount) {
    this(2, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()));
}

but I'm guessing it should be;

public AbstractMessageBus(int dispatcherThreadCount) {
    this(dispatcherThreadCount, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()));
}

Hope I've got this right.

Regards,
David E.

No way to shutdown an MBassador instance

First of all let me say how impressed I am at the ease of use of MBassador. Thanks for all the great work!

Now onto my issue.

I am integrating the message bus into a Tomcat application. I am storing the bus as a singleton in the ServletContext and accessing it from there.

I have a ServletContextListener that creates an MBassador instance and starts it.

This all works rather well.

Now, when I shutdown my application the Threads that MBassador uses as dispatchers are not shut down.

After a little investigation I found that these threads are not accessible in any way so I cannot kill them. I found that MBassador has a "finalize()" method that call a private "shutdown()" method that stops these threads.

The only solution I could come up with for stopping these threads when the application is stopped was to subclass MBassador and create a method that call "finalize()"

private static final class StoppableMBassador<T> extends MBassador<T> {

        public StoppableMBassador(BusConfiguration configuration) {
            super(configuration);
        }

        public void stop() {
            try {
                finalize();
            } catch (Throwable ex) {
                throw new RuntimeException(ex);
            }
            try {
                ((ExecutorService) getExecutor()).awaitTermination(1, TimeUnit.MINUTES);
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
    }

This seems a little hacky to me. Could you make a public "shutdown()" method that can be called in this situation?

AbstractMessageBus.addAsynchronousDeliveryRequest(SubscriptionDeliveryRequest<T> request) method improvement

I just noticed that introduction of the MaximumNumberOfPendingMessages option was a little bit incomplete.

The AbstractMessageBus.addAsynchronousDeliveryRequest(SubscriptionDeliveryRequest<T> request) method internally calls the LinkedBlockingQueue.offer(E e) method.

According to Javadoc, the offer(E e) method does not make the running thread wait for space to become available. It simply return false when the queue's capacity is exceeded.

So, several published events may be lost when queue size is small and there is no way to detect it.

I suggest the following improvement:

  1. The MBassador.publishAsync(T message) should return true or false, just like the LinkedBlockingQueue.offer(E e) method. In this case the method's caller will be able to determine whether the message was published or not.
  2. Add new method boolean MBassador.publishAsync(T message, long timeout, TimeUnit unit) throws InterruptedException which should internally call the LinkedBlockingQueue.offer(E e, long timeout, TimeUnit unit) method (see Javadoc).

filter usage

Hi everyobdy.

would know if I'm using filters in the correct way.
Here
public class OutcomingMessage extends ZTCFrame implements net.engio.mbassy.listener.IMessageFilter

the purpose is to recive just the instances of classes that extends OutcomingMessage class (i need to receive just messages to let out on a stream..)
//
@handler(filters = {@filter(OutcomingMessage.class)})
public void outcomingMessageHandler (ZTCFrame ztcFrame){

    //need for treating the outcoming message as a ZTCFrame 
}

//
is it properly written??

and more.. what should i do with the IMessage method "accepts" ?

//
public class OutcomingMessage implements IMessageFilter{

@Override
public boolean accepts(Object arg0, MessageHandlerMetadata arg1) {
    // TODO Auto-generated method stub
    return false;
}

}
//

thank you for helping

Possible bug in the org.mbassy.listener.MetadataReader.isHandler(Method m) method

This method have the following condition: annotation.equals(Listener.class) i.e. java.lang.annotation.Annotation.equals(Class). So, the objects being compared are unlikely to be members of the same class at runtime.

Normally, such conditions looks like annotation.annotationType().equals(Listener.class).
Could you please check that everything is OK with this method?

Fire a single Listener with multiple/different Events

Hi Ben,

I'm facing the situation where several different events should/could fire the same Listener (handler).

So far, and if I understand correctly, MBassador allows one Event to fire one or more Listener(s), but a Listener may only be fired by one Event.

Is there a smart way to make it so a given Listener can be fired by multiple Events ?

Feel free to edit my post as needed. Thanks for the library, it's easy and fun to use so far !

Vetoing feature

Hi!

Is it possible to have vetoing capability in a future release? In vetoing I mean there would be a special Handler annotation like VetoingHandler which would be invoked before Handlers are, and every VetoingHandler annotated method should return a boolean if they veto or not the handling of the event to normal Handlers. If any of the VetoingHandlers return false value than the handling of the current event stops and no Handler is invoked.

Thank you!

Use of interfaces for messages

Perhaps I am just dense or missing the documentation, but it seems like MBassador does not support interface message types. i.e.

@Handler
public void handleTestMessageStrong(TestMessage message) {
   // do something
}

In this example, if TestMessage is an interface the handler will not be invoked. Is that correct? (It seems to be so in my testing.) If so, it might be worthwhile to clarify that in the documentation. I think class hierarchy sub-types are generally considered to include interfaces in Java.

Subscription Comparator is limiting the number of classes that are notified of events

Hi Benni,

I spent half of yesterday debugging an issue I was seeing when I would subscribe multiple classes to the same event but only see the first 2 registered listeners being notified so I thought I'd share my findings with you.

I've forked your repo and updated it with the debugging log entries I created and a bit of hack code to get around the cause of the problem I was seeing.

I've also added a new unit test class (org.mbassy.MBassadorExtendedTest) which exposes the issue for you to see.

In the Subscription class I've included a boolean switch in the SubscriptionByPriorityDesc comparator so you can turn the original code on and off to see it's affect on the unit test results.

Hope this is useful.

Regards,
David E.

Inherit @Listener annotation

To me it would make sense if the @Listener annotation would be automatically inherited. I'd therefore propose that the @inherited annotation is added to the Listener.java class:

@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = {ElementType.TYPE})
@Inherited
public @interface Listener {
[...]

child event bus

Hello,
I love your work on mbassador and particularly its flexibility but there isn't any hint to create a child event bus, maybe this is something missing and will lead to a new feature :)
Given an A event bus, I'd like to create a B event bus receiving all A's events but keeping its own event B for itself.
Maybe I'm not clear enough here as I'm not a native english speaker.

Delayed message dispatch?

I just found MBassador and am really enjoying it. Thanks, bennidi, for the excellent work on it.

I have a use case in which I post messages to the bus in response to user input in a text editor. However, I'd like to delay dispatch of the message (by, say, 250 milliseconds) to ensure that user typing is complete. The message posted to the bus will kick off a (potentially) long-running task that I'd like to be able to interrupt if additional user input is received before the task is complete.

Do you think that it would be possible to add a method to such as

MessagePublication later(long delay, long timeout, TimeUnit units)

to IMessageBus.IPostCommand? Furthermore, would it be possible to put a method interrupt() on MessagePublication that interrupts the Thread (kills the task) that is asynchronously delivering the message?

Or, is there, perhaps, a better way to do this? I suppose I could dispatch the message asynchronously and then simply wait 250 milliseconds at the top of the Handler method. But I think it might be more elegant to have MBassador handle the delay in a consistent manner.

I'm wondering what would be considered best practice here.

Thanks again for the excellent code.

subscription objects and subscribing

I have a spring based application and the proxies are jamming up subscription. I can get the target class of the proxy using spring techniques but I need to be able to add a subscription to an event bus that is not just passing the actual object itself that has the @subscribe annotation, but add a subscription to a bus by creating a "subscription" object directly then registering.

Does the event bus support this type of direct registration?

sync/async publish/handling

A question over sync/async publish/handling.
My need is to ALWAYS publish in a 'fire and forget' way with a FIFO politics, thus guess to always use .publishAsync() method. If i got it correctly, mbassy system will manage these messages by a pool of internal threads.
What I think I don't get properly (after reading the jdoc) is the @handler politics: in particular about the delivery mode and the @synchronization option.. and how it affects either the mbassy and listener thread behavioural.. hope made my doubt clear.

Setting custom Executor Service doesn't actually work

I found this in a more realistic example, but here's my program that demonstrates that your ExecutorService (specified in the configuration) is never used. Each method is wrapped with a System.out.println statement, none of the Executor methods are called, ever. Further, you call new Thread in AbstractSyncAsyncMessageBus.java#L40, and I see no code in that file that actually calls service.submit.

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;

public class MultithreadedTest {
    public static void main(String... args) {
        BusConfiguration config = BusConfiguration.Default();
        ExecutorService defaultService = config.getExecutor();
        config.setExecutor(new WrappedExecutorService(defaultService));

        MBassador<Object> bus = new MBassador<>(config);
        bus.publishAsync("Nothing gets printed to the console, ever!");
    }

    public static class WrappedExecutorService implements ExecutorService {
        private final ExecutorService wrappedService;

        public WrappedExecutorService(ExecutorService wrappedService) {
            this.wrappedService = wrappedService;
        }

        public void execute(Runnable command) {
            System.out.println("Calling execute: " + command);
            wrappedService.execute(command);
        }

        public void shutdown() {
            System.out.println("Calling shutdown.");
            wrappedService.shutdown();
        }

        public List<Runnable> shutdownNow() {
            System.out.println("Calling shutdownNow.");
            return wrappedService.shutdownNow();
        }

        public boolean isShutdown() {
            System.out.println("Calling isShutdown.");
            return wrappedService.isShutdown();
        }

        public boolean isTerminated() {
            System.out.println("Calling isTerminated.");
            return wrappedService.isTerminated();
        }

        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
            System.out.println("Calling awaitTermination: " + timeout + " " + unit);
            return wrappedService.awaitTermination(timeout, unit);
        }

        public <T> Future<T> submit(Callable<T> task) {
            System.out.println("Calling submit: " + task);
            return wrappedService.submit(task);
        }

        public <T> Future<T> submit(Runnable task, T result) {
            System.out.println("Calling submit: " + task + " " + result);
            return wrappedService.submit(task, result);
        }

        public Future<?> submit(Runnable task) {
            System.out.println("Calling submit: " + task);
            return wrappedService.submit(task);
        }

        public <T> List<Future<T>> invokeAll(
                Collection<? extends Callable<T>> tasks)
                throws InterruptedException {
            System.out.println("Calling invokeAll: " + tasks);
            return wrappedService.invokeAll(tasks);
        }

        public <T> List<Future<T>> invokeAll(
                Collection<? extends Callable<T>> tasks, long timeout,
                TimeUnit unit) throws InterruptedException {
            System.out.println("Calling invokeAll: " + tasks + " " + timeout + " " + unit);
            return wrappedService.invokeAll(tasks, timeout, unit);
        }

        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                throws InterruptedException, ExecutionException {
            System.out.println("Calling invokeAny: " + tasks);
            return wrappedService.invokeAny(tasks);
        }

        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            System.out.println("Calling invokeAny: " + tasks + " " + timeout + " " + unit);
            return wrappedService.invokeAny(tasks, timeout, unit);
        }       
    }
}

ConcurrentSetTest yields an error when run within the suite

Running mvn clean test -Dtest=ConcurrentSetTest results in the test passing OK.

However, running mvn clean test yields a build failure:
Failed tests: testIteratorCleanup(net.engio.mbassy.ConcurrentSetTest): expected:<33278> but was:<53438>

The surefire report provides little else in terms of information:


Test set: net.engio.mbassy.ConcurrentSetTest

Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 7.159 sec <<< FAILURE! testIteratorCleanup(net.engio.mbassy.ConcurrentSetTest) Time elapsed: 1.256 sec <<< FAILURE! java.lang.AssertionError: expected:<33278> but was:<53438> at org.junit.Assert.fail(Assert.java:93) at org.junit.Assert.failNotEquals(Assert.java:647) at org.junit.Assert.assertEquals(Assert.java:128) at org.junit.Assert.assertEquals(Assert.java:147) at net.engio.mbassy.common.UnitTest.assertEquals(UnitTest.java:70) at net.engio.mbassy.ConcurrentSetTest.testIteratorCleanup(ConcurrentSetTest.java:61) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) at org.junit.runners.ParentRunner.run(ParentRunner.java:300) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189) at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165) at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)

Remote delivery ?

Remote delivery feature would be very cool to enable easy distributed system design. As now, I only know Akka that does such.

Dead message handler doesn't work correctly with unsubscribe

The dead message handler doesn't work correctly after the last bus unsubscribes. Here is a test case to demonstrate. I am using 1.1.7.

import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.listener.Handler;

import static org.junit.Assert.*;
import org.junit.Test;

public class BusTest {
    @Test
    public void testBus() {
        // Some strings to play with
        String first = "Hello World!";
        String second = "Another String!";

        // Make a bus
        MBassador<Object> bus = new MBassador<>(BusConfiguration.Default());

        // Make some stuff to handle messages
        MsgHandler handler = new MsgHandler();
        DeadHandler deadHandler = new DeadHandler();

        // Only add the dead message handler
        bus.subscribe(deadHandler);
        bus.post(first).now();

        // The message should be caught, as it's the only listener
        assertNotNull(deadHandler.deadMessage);

        // Clear deadmessage for future tests
        deadHandler.deadMessage = null;

        // Add the real handler, and publish the message again
        bus.subscribe(handler);
        bus.post(first).now();

        // Verify we got it
        assertEquals(first, handler.theString);
        // And that the dead handler didn't get it.
        assertNull(deadHandler.deadMessage); 

        // Unsubscribe, publish a different string
        bus.unsubscribe(handler);
        bus.post(second).now();

        // We should still be the first message, because we unsubscribed
        assertEquals(first, handler.theString);

        // One message while the handler was subscribed, two while it wasn't.
        assertEquals(1, handler.counter);
        assertEquals(2, deadHandler.counter); // This fails!

        // This should be caught again, as there are no listeners.
        assertNotNull(deadHandler.deadMessage); // This fails!
    }

    public static class MsgHandler {
        public String theString;

        public int counter = 0;

        @Handler
        public void handleMessage(String theMessage) {
            counter++;
            theString = theMessage;
        }
    }

    public static class DeadHandler {
        public DeadMessage deadMessage;
        public int counter = 0;

        @Handler
        public void handleDead(DeadMessage message) {
            counter++;
            deadMessage = message;
        }
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.