Git Product home page Git Product logo

scalecube / scalecube-services Goto Github PK

View Code? Open in Web Editor NEW
600.0 50.0 104.0 6.57 MB

Microservices library - scalecube-services is a high throughput, low latency reactive microservices library built to scale. it features: API-Gateways, service-discovery, service-load-balancing, the architecture supports plug-and-play service communication modules and features. built to provide performance and low-latency real-time stream-processing

Home Page: http://scalecube.github.io/

License: Apache License 2.0

Java 100.00%
microservices reactive-streams distributed-systems service-mesh reactive-microservices ipc low-latency backpressure swim-protocol cluster-membership

scalecube-services's Introduction

scalecube-services

Maven Central SourceSpy Dashboard

MICROSERVICES 2.0

ScaleCube is a library that simplifies the development of reactive and distributed applications by providing an embeddable microservices library. It connects distributed microservices in a way that resembles a fabric when viewed collectively. It greatly simplifies and streamlines asynchronous programming and provides a tool-set for managing microservices architecture. ScaleCube is built based on ScaleCube Cluster, which provides a built-in service discovery. The discovery uses SWIM protocol and gossip that scales better and has inherent failure detection and superior coherent understanding of the cluster state and cluster membership taking part in a swarm of services. ScaleCube cluster is a membership protocol that maintains membership amongst processes in a distributed system

An open-source project that is focused on streamlining reactive-programming of Microservices Reactive-systems that scale, built by developers for developers.

ScaleCube Services provides a low latency Reactive Microservices library for peer-to-peer service registry and discovery based on gossip protocol, without single point-of-failure or bottlenecks.

Scalecube more gracefully address the cross cutting concernes of distributed microservices architecture.

ScaleCube Services Features:
  • Provision and interconnect microservices peers in a cluster
  • Fully Distributed with No single-point-of-failure or single-point-of-bottleneck
  • Fast - Low latency and high throughput
  • Scaleable over- cores, jvms, clusters, regions.
  • Built-in Service Discovery and service routing
  • Zero configuration, automatic peer-to-peer service discovery using SWIM cluster membership protocol
  • Simple non-blocking, asynchronous programming model
  • Reactive Streams support.
    • Fire And Forget - Send and not wait for a reply
    • Request Response - Send single request and expect single reply
    • Request Stream - Send single request and expect stream of responses.
    • Request bidirectional - send stream of requests and expect stream of responses.
  • Built-in failure detection, fault tolerance, and elasticity
  • Routing and balancing strategies for both stateless and stateful services
  • Embeddable into existing applications
  • Natural Circuit-Breaker via scalecube-cluster discovery and failure detector.
  • Support Service instance tagging.
  • Support Service discovery partitioning using hierarchy of namespaces in a multi-cluster deployments.
  • Modular, flexible deployment models and topology
  • pluggable api-gateway providers (http / websocket / rsocket)
  • pluggable service transports (tcp / aeron / rsocket)
  • pluggable encoders (json, SBE, Google protocol buffers)
  • pluggable service security authentication and authorization providers.

User Guide:

Basic Usage:

The example provisions 2 cluster nodes and making a remote interaction.

  1. seed is a member node and provision no services of its own.
  2. then microservices variable is a member that joins seed member and provision GreetingService instance.
  3. finally from seed node - create a proxy by the GreetingService api and send a greeting request.
// service definition
@Service("io.scalecube.Greetings")
public interface GreetingsService {
  @ServiceMethod("sayHello")
	  Mono<Greeting> sayHello(String name);
	}
}
// service implementation
public class GreetingServiceImpl implements GreetingsService {
 @Override
 public Mono<Greeting> sayHello(String name) {
   return Mono.just(new Greeting("Nice to meet you " + name + " and welcome to ScaleCube"));
	}
}

//1. ScaleCube Node node with no members (container 1)
Microservices seed = Microservices.builder()
  .discovery("seed", ScalecubeServiceDiscovery::new)
	.transport(RSocketServiceTransport::new)
	.startAwait();

// get the address of the seed member - will be used to join any other members to the cluster.
final Address seedAddress = seed.discovery("seed").address();

//2. Construct a ScaleCube node which joins the cluster hosting the Greeting Service (container 2)
Microservices serviceNode = Microservices.builder()
  .discovery("seed", ep -> new ScalecubeServiceDiscovery(ep)
		.membership(cfg -> cfg.seedMembers(seedAddress)))
	.transport(RSocketServiceTransport::new)
	.services(new GreetingServiceImpl())
	.startAwait();

//3. Create service proxy (can be created from any node or container in the cluster)
//   and Execute the service and subscribe to incoming service events
seed.call().api(GreetingsService.class)
  .sayHello("joe").subscribe(consumer -> {
    System.out.println(consumer.message());
  });

// await all instances to shutdown.
Mono.whenDelayError(seed.shutdown(), serviceNode.shutdown()).block();

Basic Service Example:

  • RequestOne: Send single request and expect single reply
  • RequestStream: Send single request and expect stream of responses.
  • RequestBidirectional: send stream of requests and expect stream of responses.

A service is nothing but an interface declaring what methods we wish to provision at our cluster.

@Service
public interface ExampleService {

  @ServiceMethod
  Mono<String> sayHello(String request);

  @ServiceMethod
  Flux<MyResponse> helloStream();

  @ServiceMethod
  Flux<MyResponse> helloBidirectional(Flux<MyRequest> requests);
}

API-Gateway:

Available api-gateways are rsocket, http and websocket

Basic API-Gateway example:

    Microservices.builder()
        .discovery(options -> options.seeds(seed.discoveryAddress()))
        .services(...) // OPTIONAL: services (if any) as part of this node.

        // configure list of gateways plugins exposing the apis
        .gateway(options -> new WebsocketGateway(options.id("ws").port(8080)))
        .gateway(options -> new HttpGateway(options.id("http").port(7070)))
        .gateway(options -> new RSocketGateway(options.id("rsws").port(9090)))

        .startAwait();

        // HINT: you can try connect using the api sandbox to these ports to try the api.
        // https://scalecube.github.io/api-sandbox/app/index.html

Maven

With scalecube-services you may plug-and-play alternative providers for Transport,Codecs and discovery. Scalecube is using ServiceLoader to load providers from class path,

You can think about scalecube as slf4j for microservices - Currently supported SPIs:

Transport providers:

  • scalecube-services-transport-rsocket: using rsocket to communicate with remote services.

Message codec providers:

Service discovery providers:

Binaries and dependency information for Maven can be found at http://search.maven.org.

https://mvnrepository.com/artifact/io.scalecube

To add a dependency on ScaleCube Services using Maven, use the following:

Maven Central

 <properties>
   <scalecube.version>2.x.x</scalecube.version>
 </properties>

 <!-- -------------------------------------------
   scalecube core and api:
 ------------------------------------------- -->

 <!-- scalecube apis   -->
 <dependency>
  <groupId>io.scalecube</groupId>
  <artifactId>scalecube-services-api</artifactId>
  <version>${scalecube.version}</version>
 </dependency>

 <!-- scalecube services module   -->
 <dependency>
  <groupId>io.scalecube</groupId>
  <artifactId>scalecube-services</artifactId>
  <version>${scalecube.version}</version>
 </dependency>


 <!--

     Plugins / SPIs: bellow a list of providers you may choose from. to constract your own configuration:
     you are welcome to build/contribute your own plugins please consider the existing ones as example.

  -->

 <!-- scalecube transport providers:  -->
 <dependency>
  <groupId>io.scalecube</groupId>
  <artifactId>scalecube-services-transport-rsocket</artifactId>
  <version>${scalecube.version}</version>
 </dependency>

Sponsored by:

We Hire at exberry.io

https://exberry.io/career/

website

https://scalecube.github.io/

scalecube-services's People

Contributors

aharonha avatar alexkarezin avatar alexlikho avatar artem-v avatar codacy-badger avatar dependabot[bot] avatar dmytro-lazebnyi avatar harry-hao avatar igorperikov avatar io-scalecube-ci avatar jlleitschuh avatar matyasberry avatar monergeim avatar olegdokuka avatar ronenhamias avatar scooter72 avatar segabriel avatar snripa avatar snyk-bot avatar stingion avatar zhou-hao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

scalecube-services's Issues

Consider to use only Message instead of TransportMessage and ClusterMessage

Current solution

I have changed API for cluster at #37 to make it work with members and as a result was introduced ClusterMessage class to return from listen method which have message and member fields instead of message and endpoint fields on TransportMessage. Also added API to resolve member by ID.

Problems

  • We introduced many message classes Message, TransportMessage, ClusterMessage which is kind of confusing.
  • Membership information disseminate over the cluster in eventually consistent way so it may lead to errors when not possible to transform TransportMessage to ClusterMessage since member not available yet on this node.
  • Creating of ClusterMessage and TransportMessage objects on each message will produce more garbage pressure since even so this classes are small the messages are often.

Proposed solution

We need to know for incoming messages from where it was sent in order to do some specific on sender logic or reply. This is the reason why we have this other messages since this information is not available on Message class.

So I was thinking it can be done in a different way. For example, when we are sending message to each message add additional system header with key โ€œoriginโ€ and value which corresponds to endpoint/member ID. Also alternatively it can be not a header, but just additional field since any message have someone who sends it (?).

So in such case we can use everywhere Message class. For convenience on Message class we can provide some additional methods like origin() or something like this.

@myroslavlisniak @artem-v @ronenhamias WDYT?

TransportChannel not sensitive to connection appearance

Get TransportChannel using Transport.to() to a server endpoint which doesn't exist. And save a reference. Send messages in a loop using saved reference and check settable_futures. After some time start remote server endpoint. Now you will see settable_futures continue failing despite remote endpoint is up.

Like this:

transportChannel = transport.to(...)
for(;;) {
transportChannel.send(...)
}

The issue is -- owning a reference to a TransportChannel is dangerous; client should always do transport.to(...) and shouldn't keep references around.

Transport port binding issues causing "TransportBrokenException: Detected duplicate.."

Transport binds and listen to all network interfaces (0.0.0.0) for specific port. On the other hand it defines its address using IpAddressResolver.resolveIpAddress() which determines local IP by looping over NetworkInterface.networkInterfaces() and taking first one IPv4 interface (excluding loopback). This is not good, because networkInterfaces() doesn't guarantee order of returning collection. This IP address then used by cluster membership. But seed members can use another network interface to connect to the same endpoint so from the point of Cluster it will be viewed as a separate addresses, but they will lead to same endpoint which causes as a result "TransportBrokenException: Detected duplicate...".

Case 1. Several network interfaces (NI)

On member there're 2 network interfaces. One private, one external. Our code in TransportFactory listens both interfaces.

From logs:

I 0727-1325:09,312 c.p.o.c.t.i.ExceptionCaughtChannelHandler Broken transport: NettyTransport{status=CONNECTED, channel=[id: 0x7ba35e97, /10.144.178.10:35789 => /10.144.176.10:4830]}, cause: TBE: Detected duplicate NettyTransport{status=READY, channel=[id: 0xbe66e44b, /172.31.178.10:57668 => /172.31.176.10:4830]} for key=tcp://172.31.178.10:4830 in accepted_map [oapi-transport-io-0@tcp://172.31.176.10:4830]

Caused by the following flow:

  • Established conn 10.144.178.10:35789 => /10.144.176.10:4830 โ€“ but local address is tcp://172.31.176.10:4830 (as you can see from thread name), it means Transport listens both *.144 and *.31 addresses.
  • Then duplicate detected because there's already - NettyTransport{status=READY, channel=[id: 0xbe66e44b, /172.31.178.10:57668 => /172.31.176.10:4830]} under key tcp://172.31.178.10:4830 (i.e. connection was established earlier by interface with IP *.31, but connect attempt is made using interface with IP *.144).

Case 2. DNS names not supported

Env: 2 nodes with 1 NI each.

Conditions: both nodes restarted.

What happens: From node 1 to node 2 transport (port=59915) established, after that the new transport (port 59916) is attempting to establish, which results in TBE (duplicates not allowed: there's already 59915).

From logs:

I 0728-1135:13,753 i.n.h.l.LoggingHandler [id: 0xe634f142, /172.29.49.109:59915 => app10.scalecube.io/172.29.49.177:4830] ACTIVE [oapi-transport-io-0@tcp://172.29.49.109:4830]
...
I 0728-1135:14,009 i.n.h.l.LoggingHandler [id: 0xebc94f97, /172.29.49.109:59916 => /172.29.49.177:4830] ACTIVE [oapi-transport-io-0@tcp://172.29.49.109:4830]

Note: putting IP addresses instead of DNS names to seed members addresses on cluster initialization fixing this issue all together.

ClusterMembership listen method should return observable of cluster update events

Subscribe to change notifications of the cluster membership The events to track the life-cycle of members are:

ADDED- A new member has joined the cluster and its status has been changed to Up.
LEAVING - A member is leaving the cluster and its status has been changed to Exiting Note that the node might already have been shutdown when this event is published on another node.
LEFT - Member completely removed from the cluster.
UNREACHABLE- A member is considered as unreachable, detected by the failure detector of at least one other node.
REACHABLE- A member is considered as reachable again, after having been unreachable. All nodes that previously detected it as unreachable has detected it as reachable again.

Improve ClusterMembership API. Listen updates should return Observable of events and event should include information about event type: ADDED, LEFT, UNREACHABLE, REACHABLE, and

ClusterMember.

Observable<ClusterMemberEvent> listenUpdates();

Where

class ClusterMemberEvent{
   private EventType type;
   private ClusterMember member;
}

Wait seed microservice to up and running

Another microservices are crushed with error below and are not waiting for seed alive.

Exception in thread "sc-membership-4802" java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:57)
	at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: rx.exceptions.OnErrorNotImplementedException: Transport is stopped
	at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
	at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
	at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
	at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:153)
	at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
	at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:212)
	at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:139)
	at rx.internal.operators.NotificationLite.accept(NotificationLite.java:135)
	at rx.internal.operators.OperatorOnBackpressureBuffer$BufferSubscriber.accept(OperatorOnBackpressureBuffer.java:156)
	at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:198)
	at rx.internal.operators.OperatorOnBackpressureBuffer$BufferSubscriber.onNext(OperatorOnBackpressureBuffer.java:151)
	at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:304)
	at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:219)
	at rx.subjects.PublishSubject.onNext(PublishSubject.java:72)
	at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
	at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
	at io.scalecube.cluster.membership.MembershipProtocolImpl.updateMembership(MembershipProtocolImpl.java:511)
	at io.scalecube.cluster.membership.MembershipProtocolImpl.onMembershipGossip(MembershipProtocolImpl.java:427)
	at rx.observers.Subscribers$3.onNext(Subscribers.java:147)
	at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
	at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
	at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:224)
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
	... 8 more
Caused by: java.lang.IllegalStateException: Transport is stopped
	at com.google.common.base.Preconditions.checkState(Preconditions.java:444)
	at io.scalecube.transport.TransportImpl.listen(TransportImpl.java:184)
	at io.scalecube.services.ServiceTransport.listen(ServiceTransport.java:37)
	at io.scalecube.services.RemoteServiceInstance.<init>(RemoteServiceInstance.java:54)
	at io.scalecube.services.ServiceRegistryImpl.lambda$loadMemberServices$42(ServiceRegistryImpl.java:100)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1575)
	at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
	at java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1600)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at io.scalecube.services.ServiceRegistryImpl.loadMemberServices(ServiceRegistryImpl.java:87)
	at io.scalecube.services.ServiceRegistryImpl.lambda$listenCluster$39(ServiceRegistryImpl.java:70)
	at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
	at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
	... 24 more

Implement graceful shutdown of cluster member

When member is leaving cluster via Cluster.shutdown we can notify members about shutdown so they can remove member immediately from membership table without detecting it failed and waiting required timeouts. Consider either to do it via regular membership gossips to spread status DEAD or use dedicated message type for this case.

Data type can be lost if message is passed via proxy node (e.g. gossip)

Since message doesn't contain field dataType, but only in deserialized form at MessageSchema when message pass via some proxy which can't resolve dataType via class by name (e.g. gossip which pass to some node which doesn't have required data model) then data type will be lost on this node and won't be transferred on next hop.

Transport tests are not 100% stable

@antonkharenko

#Tests run: 18, Failures: 3, Errors: 0, Skipped: 0, Time elapsed: 155.228 sec <<< FAILURE! - in io.scalecube.transport.TransportTest
testBlockAndUnblockTraffic(io.scalecube.transport.TransportTest) Time elapsed: 4.612 sec <<< FAILURE!
java.lang.AssertionError: expected:<1> but was:<2>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:555)
at org.junit.Assert.assertEquals(Assert.java:542)
at io.scalecube.transport.TransportTest.testBlockAndUnblockTraffic(TransportTest.java:486)

testNetworkSettings(io.scalecube.transport.TransportTest) Time elapsed: 3.068 sec <<< FAILURE!
java.lang.AssertionError: expectedMax=550, actual size=1000
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at io.scalecube.transport.TransportTest.testNetworkSettings(TransportTest.java:339)

testUnresolvedHostConnection(io.scalecube.transport.TransportTest) Time elapsed: 2.018 sec <<< FAILURE!
java.lang.AssertionError: Expected classes [class java.nio.channels.UnresolvedAddressException], but actual: class io.netty.channel.ConnectTimeoutException
at org.junit.Assert.fail(Assert.java:88)
at io.scalecube.transport.TransportTest.assertAmongExpectedClasses(TransportTest.java:600)
at io.scalecube.transport.TransportTest.testUnresolvedHostConnection(TransportTest.java:178)

Results :

Failed tests:
TransportTest.testBlockAndUnblockTraffic:486 expected:<1> but was:<2>
TransportTest.testNetworkSettings:339 expectedMax=550, actual size=1000
TransportTest.testUnresolvedHostConnection:178->assertAmongExpectedClasses:600 Expected classes [class java.nio.channels.UnresolvedAddressException], but actual: class io.netty.channel.ConnectTimeoutException

Tests run: 20, Failures: 3, Errors: 0, Skipped: 0

java.lang.NullPointerException at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:39)

~ Received 'sayHello' RPC request: Message {headers: {service=io.scalecube.services.examples.AdSelectorService/selectAd, serviceMethod=selectAd, cid=rpc-8369cbf5-0381-43f4-906f-e3977fed2a85}, sender: 169.254.49.148:4801, data: HelloRequest{name='World'}}
Sep 28, 2016 7:42:09 PM com.google.common.util.concurrent.Futures$ImmediateFuture addListener
SEVERE: RuntimeException while executing runnable com.google.common.util.concurrent.Futures$6@3b36a546 with executor MoreExecutors.directExecutor()
java.lang.NullPointerException
at io.scalecube.cluster.Cluster.send(Cluster.java:208)
at io.scalecube.services.ServiceDispatcher$2$1.onSuccess(ServiceDispatcher.java:52)
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1773)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
at com.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:153)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:45)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)

~ Received 'sayHello' RPC request: Message {headers: {service=io.scalecube.services.examples.AdSelectorService/selectAd, serviceMethod=selectAd, cid=rpc-62299191-ea0c-43d3-b550-61982dd1a09f}, sender: 169.254.49.148:4801, data: HelloRequest{name='World'}}
Sep 28, 2016 7:42:09 PM com.google.common.util.concurrent.Futures$ImmediateFuture addListener
SEVERE: RuntimeException while executing runnable com.google.common.util.concurrent.Futures$6@6b687f64 with executor MoreExecutors.directExecutor()
java.lang.NullPointerException
at io.scalecube.cluster.Cluster.send(Cluster.java:208)
at io.scalecube.services.ServiceDispatcher$2$1.onSuccess(ServiceDispatcher.java:52)
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1773)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
at com.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:153)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:45)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)

java.lang.NullPointerException
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:39)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)
java.lang.NullPointerException
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:39)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)
java.lang.NullPointerException
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:39)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)
java.lang.NullPointerException
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:39)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)

Support IPv6

Setup transport system compatibility with IPv6.

Cluster test ClusterTest.testUpdateMetadata fails on Travis build

Build where it failed: https://travis-ci.org/scalecube/scalecube/builds/192176381

Failed tests: 
  ClusterTest.testUpdateMetadata:96 expected:<{key1=value3}> but was:<{key1=value1, key2=value2}>

Test logs:

I 0115-2026:59,341 i.s.t.BaseTest ***** Test started  : ClusterTest.testUpdateMetadata ***** [main]
I 0115-2026:59,343 i.s.t.TransportImpl Bound to: 172.17.0.8:4801 [sc-boss-449-1]
I 0115-2026:59,358 i.s.t.TransportImpl Bound to: 172.17.0.8:4802 [sc-boss-451-1]
I 0115-2026:59,362 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}] [sc-membership-4802]
I 0115-2026:59,365 i.s.t.TransportImpl Bound to: 172.17.0.8:4803 [sc-boss-453-1]
I 0115-2026:59,368 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}, {m: [email protected]:4803, s: ALIVE, inc: 0}] [sc-membership-4803]
I 0115-2026:59,371 i.s.t.TransportImpl Bound to: 172.17.0.8:4804 [sc-boss-455-1]
I 0115-2026:59,375 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4804, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}, {m: [email protected]:4803, s: ALIVE, inc: 0}] [sc-membership-4804]
I 0115-2026:59,378 i.s.t.TransportImpl Bound to: 172.17.0.8:4805 [sc-boss-457-1]
I 0115-2026:59,381 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4804, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}, {m: [email protected]:4803, s: ALIVE, inc: 0}, {m: [email protected]:4805, s: ALIVE, inc: 0}] [sc-membership-4805]
I 0115-2026:59,385 i.s.t.TransportImpl Bound to: 172.17.0.8:4806 [sc-boss-459-1]
I 0115-2026:59,388 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4806, s: ALIVE, inc: 0}, {m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4804, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}, {m: [email protected]:4803, s: ALIVE, inc: 0}, {m: [email protected]:4805, s: ALIVE, inc: 0}] [sc-membership-4806]
I 0115-2026:59,399 i.s.t.TransportImpl Bound to: 172.17.0.8:4807 [sc-boss-461-1]
I 0115-2026:59,403 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4806, s: ALIVE, inc: 0}, {m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4804, s: ALIVE, inc: 0}, {m: [email protected]:4807, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}, {m: [email protected]:4803, s: ALIVE, inc: 0}, {m: [email protected]:4805, s: ALIVE, inc: 0}] [sc-membership-4807]
I 0115-2026:59,407 i.s.t.TransportImpl Bound to: 172.17.0.8:4808 [sc-boss-463-1]
I 0115-2026:59,410 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4806, s: ALIVE, inc: 0}, {m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4804, s: ALIVE, inc: 0}, {m: [email protected]:4808, s: ALIVE, inc: 0}, {m: [email protected]:4807, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}, {m: [email protected]:4803, s: ALIVE, inc: 0}, {m: [email protected]:4805, s: ALIVE, inc: 0}] [sc-membership-4808]
I 0115-2026:59,416 i.s.t.TransportImpl Bound to: 172.17.0.8:4809 [sc-boss-465-1]
I 0115-2026:59,419 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4806, s: ALIVE, inc: 0}, {m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4804, s: ALIVE, inc: 0}, {m: [email protected]:4808, s: ALIVE, inc: 0}, {m: [email protected]:4807, s: ALIVE, inc: 0}, {m: [email protected]:4809, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}, {m: [email protected]:4803, s: ALIVE, inc: 0}, {m: [email protected]:4805, s: ALIVE, inc: 0}] [sc-membership-4809]
I 0115-2026:59,424 i.s.t.TransportImpl Bound to: 172.17.0.8:4810 [sc-boss-467-1]
I 0115-2026:59,427 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4806, s: ALIVE, inc: 0}, {m: [email protected]:4810, s: ALIVE, inc: 0}, {m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4804, s: ALIVE, inc: 0}, {m: [email protected]:4808, s: ALIVE, inc: 0}, {m: [email protected]:4807, s: ALIVE, inc: 0}, {m: [email protected]:4809, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}, {m: [email protected]:4803, s: ALIVE, inc: 0}, {m: [email protected]:4805, s: ALIVE, inc: 0}] [sc-membership-4810]
I 0115-2026:59,432 i.s.t.TransportImpl Bound to: 172.17.0.8:4811 [sc-boss-469-1]
I 0115-2026:59,435 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4806, s: ALIVE, inc: 0}, {m: [email protected]:4811, s: ALIVE, inc: 0}, {m: [email protected]:4810, s: ALIVE, inc: 0}, {m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4804, s: ALIVE, inc: 0}, {m: [email protected]:4808, s: ALIVE, inc: 0}, {m: [email protected]:4807, s: ALIVE, inc: 0}, {m: [email protected]:4809, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}, {m: [email protected]:4803, s: ALIVE, inc: 0}, {m: [email protected]:4805, s: ALIVE, inc: 0}] [sc-membership-4811]
I 0115-2026:59,454 i.s.t.TransportImpl Bound to: 172.17.0.8:4812 [sc-boss-471-1]
I 0115-2026:59,459 i.s.c.m.MembershipProtocol Joined cluster 'default': [{m: [email protected]:4806, s: ALIVE, inc: 0}, {m: [email protected]:4811, s: ALIVE, inc: 0}, {m: [email protected]:4810, s: ALIVE, inc: 0}, {m: [email protected]:4802{key1=value1, key2=value2}, s: ALIVE, inc: 0}, {m: [email protected]:4812, s: ALIVE, inc: 0}, {m: [email protected]:4804, s: ALIVE, inc: 0}, {m: [email protected]:4808, s: ALIVE, inc: 0}, {m: [email protected]:4807, s: ALIVE, inc: 0}, {m: [email protected]:4809, s: ALIVE, inc: 0}, {m: [email protected]:4801, s: ALIVE, inc: 0}, {m: [email protected]:4803, s: ALIVE, inc: 0}, {m: [email protected]:4805, s: ALIVE, inc: 0}] [sc-membership-4812]
I 0115-2026:59,747 i.s.t.BaseTest Received membership update event: MembershipEvent{type=UPDATED, [email protected]:4802{key1=value3}, [email protected]:4802{key1=value1, key2=value2}} [sc-membership-4806]
I 0115-2026:59,747 i.s.t.BaseTest Received membership update event: MembershipEvent{type=UPDATED, [email protected]:4802{key1=value3}, [email protected]:4802{key1=value1, key2=value2}} [sc-membership-4805]
I 0115-2026:59,747 i.s.t.BaseTest Received membership update event: MembershipEvent{type=UPDATED, [email protected]:4802{key1=value3}, [email protected]:4802{key1=value1, key2=value2}} [sc-membership-4808]
I 0115-2026:59,765 i.s.t.BaseTest Received membership update event: MembershipEvent{type=UPDATED, [email protected]:4802{key1=value3}, [email protected]:4802{key1=value1, key2=value2}} [sc-membership-4812]
I 0115-2026:59,765 i.s.t.BaseTest Received membership update event: MembershipEvent{type=UPDATED, [email protected]:4802{key1=value3}, [email protected]:4802{key1=value1, key2=value2}} [sc-membership-4811]
I 0115-2027:26,741 i.s.c.ClusterImpl Cluster member [email protected]:4801 is shutting down... [main]
I 0115-2027:26,813 i.s.t.BaseTest Received membership update event: MembershipEvent{type=UPDATED, [email protected]:4802{key1=value3}, [email protected]:4802{key1=value1, key2=value2}} [sc-membership-4807]
I 0115-2027:27,024 i.s.t.BaseTest Received membership update event: MembershipEvent{type=UPDATED, [email protected]:4802{key1=value3}, [email protected]:4802{key1=value1, key2=value2}} [sc-membership-4804]
I 0115-2027:27,040 i.s.t.BaseTest Received membership update event: MembershipEvent{type=UPDATED, [email protected]:4802{key1=value3}, [email protected]:4802{key1=value1, key2=value2}} [sc-membership-4809]
I 0115-2027:27,045 i.s.t.BaseTest Received membership update event: MembershipEvent{type=UPDATED, [email protected]:4802{key1=value3}, [email protected]:4802{key1=value1, key2=value2}} [sc-membership-4803]
I 0115-2027:27,045 i.s.t.BaseTest Received membership update event: MembershipEvent{type=UPDATED, [email protected]:4802{key1=value3}, [email protected]:4802{key1=value1, key2=value2}} [sc-membership-4810]
I 0115-2027:27,114 i.s.c.ClusterImpl Cluster member [email protected]:4802{key1=value3} is shutting down... [main]
I 0115-2027:27,133 i.s.c.ClusterImpl Cluster member [email protected]:4803 is shutting down... [main]
I 0115-2027:27,149 i.s.c.ClusterImpl Cluster member [email protected]:4804 is shutting down... [main]
I 0115-2027:27,157 i.s.c.ClusterImpl Cluster member [email protected]:4805 is shutting down... [main]
I 0115-2027:27,165 i.s.c.ClusterImpl Cluster member [email protected]:4806 is shutting down... [main]
I 0115-2027:27,175 i.s.c.ClusterImpl Cluster member [email protected]:4807 is shutting down... [main]
I 0115-2027:27,184 i.s.c.ClusterImpl Cluster member [email protected]:4808 is shutting down... [main]
I 0115-2027:27,193 i.s.c.ClusterImpl Cluster member [email protected]:4809 is shutting down... [main]
I 0115-2027:27,204 i.s.c.ClusterImpl Cluster member [email protected]:4810 is shutting down... [main]
I 0115-2027:27,211 i.s.c.ClusterImpl Cluster member [email protected]:4811 is shutting down... [main]
I 0115-2027:27,217 i.s.c.ClusterImpl Cluster member [email protected]:4812 is shutting down... [main]
I 0115-2027:27,225 i.s.t.BaseTest ***** Test finished : ClusterTest.testUpdateMetadata ***** [main]

Transport can connect to itself

For some reason we have transport connecting to itself. Possibly this is related to DNS names issue in Transport/TransportEndpoint

Logs available

I 0727-1844:31,656 i.n.h.l.LoggingHandler [id: 0x16a4f615, /172.29.49.109:53848 => /172.29.49.109:4830] WRITE: Message [qualifier=pt.openapi.core.cluster/fdetector/ping, correlationId=6, contextId=null, subscriptionId=null, dataFormat=null] [oapi-transport-io-0@tcp://172.29.49.109:4830]

I 0727-1844:31,657 i.n.h.l.LoggingHandler [id: 0x16a4f615, /172.29.49.109:53848 => /172.29.49.109:4830] FLUSH [oapi-transport-io-0@tcp://172.29.49.109:4830]

I 0727-1844:31,657 i.n.h.l.LoggingHandler [id: 0xde7d6517, /172.29.49.109:53848 => /172.29.49.109:4830] RECEIVED: Message [qualifier=pt.openapi.core.cluster/fdetector/ping, correlationId=6, contextId=null, subscriptionId=null, dataFormat=PROTOSTUFF] [oapi-transport-io-0@tcp://172.29.49.109:4830]

I 0727-1844:19,968 i.n.h.l.LoggingHandler [id: 0xde7d6517, /172.29.49.109:53848 => /172.29.49.109:4830] ACTIVE [oapi-transport-io-0@tcp://172.29.49.109:4830]

D 0727-1844:19,968 c.p.o.c.t.i.AcceptorRegistratorChannelHandler Registered acceptor: NettyTransport{status=CONNECTED, channel=[id: 0xde7d6517, /172.29.49.109:53848 => /172.29.49.109:4830]} [oapi-transport-io-0@tcp://172.29.49.109:4830]

could not emit value due to lack of requests

D 0921-2012:52,361 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4806, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4833, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4824, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4808, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4810, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4832, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4829, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requestsD 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4822, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]

D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4811, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
at rx.observers.Subscribers$2.onError(Subscribers.java:98)D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4819, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4805, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4826, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4812, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4807, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4810]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4815, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]

at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4802, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]

at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)

D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4809, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onError(OnSubscribeFilter.java:90)
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4823, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4825, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4816, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
... 8 more
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4835, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4818, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests
D 0921-2012:52,366 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4828, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4811]
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308)D 0921-2012:52,362 i.s.l.RaftLeaderElection Received RAFT_HEARTBEAT[sc/raft/heartbeat] from 192.168.56.1:4801 [sc-leaderelection-192.168.56.1:4810]

D 0921-2012:52,362 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4814, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4810]
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)D 0921-2012:52,368 i.s.l.RaftLeaderElection FOLLOWER Node: 192.168.56.1:4810 received heartbeat from 192.168.56.1:4801 [sc-leaderelection-192.168.56.1:4810]

at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.cluster.gossip.GossipProtocol.onGossipReq(GossipProtocol.java:208)

D 0921-2012:52,368 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4820, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4810]
at rx.observers.Subscribers$2.onNext(Subscribers.java:103)
D 0921-2012:52,368 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4821, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4810]
D 0921-2012:52,368 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4834, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4810]
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:227)
... 9 more
D 0921-2012:52,368 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4831, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4810]
D 0921-2012:52,368 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4804, s: ALIVE, inc: 0} [sc-membership-192.168.56.1:4810]
D 0921-2012:52,368 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]

Define formatting settings

Need to agreed about formatter settings and do next steps:

  • define default formatting settings
  • add configuration of formatter to repository,
  • add checkstyle plugin to build process

Setup project infrastructure for integration tests

All tests so far are unit tests even though they are running few minutes. In this ticket:

  • make separation between unit tests and integration ones
  • prohibit IT from running at release phase
  • ITs must be activated by mvn integration-test

java.lang.NullPointerException at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:39)

~ Received 'sayHello' RPC request: Message {headers: {service=io.scalecube.services.examples.AdSelectorService/selectAd, serviceMethod=selectAd, cid=rpc-8369cbf5-0381-43f4-906f-e3977fed2a85}, sender: 169.254.49.148:4801, data: HelloRequest{name='World'}}
Sep 28, 2016 7:42:09 PM com.google.common.util.concurrent.Futures$ImmediateFuture addListener
SEVERE: RuntimeException while executing runnable com.google.common.util.concurrent.Futures$6@3b36a546 with executor MoreExecutors.directExecutor()
java.lang.NullPointerException
at io.scalecube.cluster.Cluster.send(Cluster.java:208)
at io.scalecube.services.ServiceDispatcher$2$1.onSuccess(ServiceDispatcher.java:52)
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1773)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
at com.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:153)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:45)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)

~ Received 'sayHello' RPC request: Message {headers: {service=io.scalecube.services.examples.AdSelectorService/selectAd, serviceMethod=selectAd, cid=rpc-62299191-ea0c-43d3-b550-61982dd1a09f}, sender: 169.254.49.148:4801, data: HelloRequest{name='World'}}
Sep 28, 2016 7:42:09 PM com.google.common.util.concurrent.Futures$ImmediateFuture addListener
SEVERE: RuntimeException while executing runnable com.google.common.util.concurrent.Futures$6@6b687f64 with executor MoreExecutors.directExecutor()
java.lang.NullPointerException
at io.scalecube.cluster.Cluster.send(Cluster.java:208)
at io.scalecube.services.ServiceDispatcher$2$1.onSuccess(ServiceDispatcher.java:52)
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1773)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
at com.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:153)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:45)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)

java.lang.NullPointerException
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:39)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)
java.lang.NullPointerException
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:39)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)
java.lang.NullPointerException
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:39)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)
java.lang.NullPointerException
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:39)
at io.scalecube.services.ServiceDispatcher$2.call(ServiceDispatcher.java:1)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:305)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at io.scalecube.transport.MessageReceiverHandler.channelRead(MessageReceiverHandler.java:36)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)

Scalecube + Docker

I might be going about this wrong, but I'm having some issues with running scale cube within docker.

So let's say I have 2 docker hosts: api1 and api2. I have a seed service which just initializes the Cluster, I'm setting the port and using syncGroup, and passing in the list of seeds (pretty standard stuff). Bake the service into a docker container and run it on api1 and api2 passing the seeds of api1:20000,api2:20000.

Each service starts up ok, and then I get a whole bunch of warnings like:

Failed to connect from 172.17.0.5:20000 to 172.17.0.8:20000

So I wired up some endpoints that spit out some info around the Cluster (members).

The first instance of the service will only have itself as a member, the second instance will have both instances as a member of the cluster (but it still warns every second failing to connect).

I noticed that the ips are all 172.17.x.x which makes sense, that's what localhost resolves to for each of the docker containers. I figured that was the problem (they aren't on the same network), so I saw I could set what address/interface gossip should bind to. I tried several combinations without luck (using hostname or local ip of the docker hosts throws an exception, failing to bind within the docker container because it's not part of the network interfaces). Without bridging the docker networks between docker hosts, is there a way I can set this up to work between docker hosts, or from a docker host to a non docker host within the same network?

capturing service level metrics

following metrics would be useful to monitor the status of the service.
request meter for service/function
Gauge for pending function call per service
timer metric for function call execution latency measurement
exception rate per function.
circuit breaker status if applicable.

Starting many nodes at once throw exception address already in use

Code:

    for (int i = 0 ; i < 50; i++) {
      exec.execute(new Runnable() {
        @Override
        public void run() {
          ICluster member = Cluster.joinAwait(seed.address());
        }
      });
    }

Output:

I 0921-1835:43,635 i.s.t.Transport Connected from 10.150.4.21:4816 to 10.150.4.21:4807: [id: 0xa5f9fce7, L:/10.150.4.21:51671 - R:/10.150.4.21:4807] [sc-io-52-2]
D 0921-1835:43,667 i.s.c.m.MembershipProtocol Received membership gossip: {m: [email protected]:4803, s: SUSPECT, inc: 0} [sc-membership-10.150.4.21:4801]
Exception in thread "pool-1-thread-8" java.lang.RuntimeException: java.net.BindException: Address already in use: bind
    at com.google.common.base.Throwables.propagate(Throwables.java:160)
    at io.scalecube.cluster.Cluster.joinAwait(Cluster.java:94)
    at io.scalecube.leaderelection.LeaderElectionIT$1.run(LeaderElectionIT.java:97)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.BindException: Address already in use: bind
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Unknown Source)
    at sun.nio.ch.Net.bind(Unknown Source)
    at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
    at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source)
    at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:125)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:501)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1218)
    at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:505)
    at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:490)
    at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:965)
    at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:210)
    at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:353)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:402)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more

Improve transport to write in batches to increase throughput

currently transport sends messages and write and flush on each request

private void send(Channel channel, Message message, CompletableFuture<Void> promise) {
    if (promise == COMPLETED_PROMISE) {
      channel.writeAndFlush(message, channel.voidPromise());
    } else {
      composeFutures(channel.writeAndFlush(message), promise);
    }
  }

write and flush create a syscall on each message reducing throughput an alternative approach might be by write in batches so flush will be done periodical on favor with the right balance between latency and throughput.

Add new transport settings: listen_address, listen_interface

  • Cfg must go with listen_address or listen_interface, setting both is not allowed.
  • If listen_interface specified then preferIPv6 system property/cfg setting is taken in account (false by default)
  • Any local address (i.e. an address 0.0.0.0) is not allowed.
  • Approach for transport bind address: if no specific address in cfg then take InetAddress.getLocalHost()

NOTE: for a reference look into org.apache.cassandra.config.DatabaseDescriptor.applyAddressConfig()

Consider to use static void promise or optional instead of passing 'null' to async methods

Since then we can make further code which don't need to make non null checks since it is guarantee that non null value was passed at API level of failed fast.

Following API methods affected:

public interface ITransport {
  ...
  void stop(@Nullable SettableFuture<Void> promise);
  void disconnect(@CheckForNull TransportEndpoint endpoint, @Nullable SettableFuture<Void> promise);
  void send(@CheckForNull TransportEndpoint endpoint, @CheckForNull Message message,
      @Nullable SettableFuture<Void> promise);
  ...
}

And also corresponding ICluster delegating methods.

Make available network emulator out of cluster API

Currently you can configure to use network emulator via ClusterConfig.transportConfig.isUseNetworkEmulator, but actually it is not available from cluster API. Define some way to get access to network emulator from cluster API in order to test components which is build on top of cluster.

Change slf4j implementation to log4j2

Logback is not good at basic formatting. For example in following line we want log msg + exception with out stack trace:

logger.error("Something failed, cause: {}", new Object[] {throwable})

With logback we would get log msg and stack trace.

Improve Cluster membership messages API

in the examples of sending message between nodes its shown that its required to use cluster

// Send greetings message to other members
Greetings greetings = new Greetings("Greetings from ClusterMember B");
Message greetingsMessage = new Message(greetings);
List<ClusterMember> members = clusterB.membership().members();
for (ClusterMember member : members) {
    if (!clusterB.membership().isLocalMember(member)) {
        clusterB.send(member, greetingsMessage);
    }
}

since cluster member is always part of a cluster (else its some lonely not relevant node) i suggest to change the api as following:

// Send greetings message to other members
Greetings greetings = new Greetings("Greetings from ClusterMember B");
Message greetingsMessage = new Message(greetings);
List<ClusterMember> members = clusterB.membership().members();
for (ClusterMember member : members) {
    if (!member.isLocal()) {
        member.send(greetingsMessage);
    }
}

in this case member has reference to cluster and transport

MissingBackpressureException and several other exceptions on gossips and membership

Under some conditions possible:

GossipState{gossip=Gossip{gossipId=897CF2FD84CE291D96EE-5, message=Message {headers: {q=sc/membership/gossip}, sender: null, data: {m: 6AE7216A4A   at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:57)
    at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests
    at rx.observers.Subscribers$2.onError(Subscribers.java:98)
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
    at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onError(OnSubscribeFilter.java:90)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    ... 8 more
Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests
    at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308)
    at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
    at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
    at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
    at io.scalecube.cluster.gossip.GossipProtocol.onGossipReq(GossipProtocol.java:208)
    at rx.observers.Subscribers$2.onNext(Subscribers.java:103)
    at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
    at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:227)
    ... 9 more

Also log containts:

Exception in thread "sc-membership-192.168.56.1:4811" java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:57)
    at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)

See attached log:

ScaleCube_ERROR.txt

[Transport] Infinite loop when binding on auto-incremented address

Transport getting into infinite loop when binding in auto-incremented mode. To reproduce an issue one needs be unlucky to bind transport when operation system has a socket in close_wait or time_wait state. In this case Addressing.isAvailablePort() would give true but once flow proceed to bind0() it would trap into infinite loop.

Next program gives half-closed socket:

  public static void main(String[] args) throws Exception {
    ServerSocket serverSocket = new ServerSocket(6001);
    new Thread(() -> {
      while (true) {
        try {
          Socket accept = serverSocket.accept();
          InputStream inputStream = accept.getInputStream();
          OutputStream outputStream = accept.getOutputStream();
          int read = inputStream.read();
          accept.close();
          System.out.println(accept);
        } catch (Exception e) {
          e.printStackTrace(System.err);
        }
      }
    }).start();
    Socket socket = new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
    InputStream inputStream = socket.getInputStream();
    OutputStream outputStream = socket.getOutputStream();
    outputStream.write(1);
    inputStream.read();
  }

Don't terminate this program, take a port from console, and put it into the following test:

  @Test
  public void testBindExceptionWithPortAutoIncrement() throws Exception {
    TransportConfig config = TransportConfig.builder()
            .port(PORT_OF_THE_HALF_CLOSED_SOCKET)
            .portAutoIncrement(true)
            .portCount(100)
            .build();
    Transport transport1 = null;
    Transport transport2 = null;
    try {
      transport1 = Transport.bindAwait(config);
      transport2 = Transport.bindAwait(config);
      fail("Didn't get expected bind exception");
    } catch (Throwable throwable) {
      // Check that get address already in use exception
      assertTrue("" + throwable, throwable instanceof BindException || throwable.getMessage().contains("Address already in use"));
    } finally {
      destroyTransport(transport1);
      destroyTransport(transport2);
    }
  }

Run it, in console make sure you see a lot of logs like this:

W 0111-1821:59,809 i.s.t.TransportImpl Can't bind to address LOCAL_HOST_IP:PORT_OF_THE_HALF_CLOSED_SOCKET, try again on different port [cause=io.netty.channel.unix.Errors$NativeIoException: bind() failed: Address already in use] [sc-boss-1-2]

Support cluster events

Subscribe to change notifications of the cluster membership The events to track the life-cycle of members are:

MemberUp - A new member has joined the cluster and its status has been changed to Up.
MemberExited - A member is leaving the cluster and its status has been changed to Exiting Note that the node might already have been shutdown when this event is published on another node.
MemberRemoved - Member completely removed from the cluster.
UnreachableMember - A member is considered as unreachable, detected by the failure detector of at least one other node.
ReachableMember - A member is considered as reachable again, after having been unreachable. All nodes that previously detected it as unreachable has detected it as reachable again.

Binding transport to IPv6 address causes IllegalFormatConversionException

When starting Scalecube and one of the addresses only has an IPv6 address (for various reasons, in my case the DHCP didn't complete, but we could really be using only an IPv6 network) then Addressing uses NetworkInterface.getInetAddresses() to get an Inet6Address which looks something like this: /fe80:0:0:0:8cf6:f5c8:c946:2c30%eno1.

Then when MembershipProtocol constructor tries to use the address to create a "name format" for ThreadFactoryBuilder, the % in the address causes the thread factory builder to crash with a java.util.IllegalFormatConversionException when it tries to create thread names using String.format().

The full stack trace looks like this:

java.util.IllegalFormatConversionException: e != java.lang.Integer
	at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4302)
	at java.util.Formatter$FormatSpecifier.printFloat(Formatter.java:2806)
	at java.util.Formatter$FormatSpecifier.print(Formatter.java:2753)
	at java.util.Formatter.format(Formatter.java:2520)
	at java.util.Formatter.format(Formatter.java:2455)
	at java.lang.String.format(String.java:2981)
	at com.google.common.util.concurrent.ThreadFactoryBuilder.format(ThreadFactoryBuilder.java:181)
	at com.google.common.util.concurrent.ThreadFactoryBuilder.setNameFormat(ThreadFactoryBuilder.java:70)
	at io.scalecube.cluster.membership.MembershipProtocol.<init>(MembershipProtocol.java:110)
	at io.scalecube.cluster.ClusterImpl.lambda$join0$25(ClusterImpl.java:81)
	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at io.scalecube.transport.TransportImpl.lambda$bind0$0(TransportImpl.java:104)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:514)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:488)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:427)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:111)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:897)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:570)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1258)
	at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:511)
	at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:496)
	at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:980)
	at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:250)
	at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:363)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:745)

Using scalecube-cluster-1.0.3

Router utility class

Create a class with static methods that returns routers for multiple usecases:

  • Random selection of service instances
  • Select service instance by specific tag(s) with specific value(s)
  • Select service instance by specific tag(s) with matching specific regular expression(s)
  • Select service by weighted random & round-robin algorithms

Attach logical clock to each message to maintaining some notion of message ordering

In order to not depend on time synchronization on server nodes we can use Lamport Clocks (see http://en.wikipedia.org/wiki/Lamport_timestamps) in order to synchronize logical time between servers. Lamport timestamp should be attached to each message within cluster management component and time synchronized based on this messages. Membership tables should store this logical timestamps and use them to resolve ordering of events in the system.

Some basic implementation of logical clocks: https://github.com/antonkharenko/logical-clocks

Improve Cluster join API. Join should be async

Cluster join API should be async.
Instead of :

ICluster join();

It should be something like this:

ListenableFuture<ICluster> join();

or if we don't need to return clutser it can be future of void

ListenableFuture<Void> join();

Starting member with dynamically allocated port

Motivation: streamline testing and development and deployments with zero configuration.

the enhancement is to enable creating instance without specifying specific port

Current:
....
ICluster clusterA = Cluster.newInstance(3000).join();
ICluster clusterA = Cluster.newInstance(3001).join();
....

After enhancement it will be possible to run on same vm several cluster instances:
...
ICluster clusterA = Cluster.newInstance().join(); -- > starts on default port
ICluster clusterB = Cluster.newInstance().join(); ---> starts on default port +1
...

starting a cluster node when seed node is down error

i will be good not to force initialization of the cluster order

if nodeA is seed node and nodeB looks on nodeA as seed node
it should be possible to start nodeB and then nodeA or nodeA and then nodeB

currently exception is thrown

Exception in thread "main" java.lang.NullPointerException
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:210)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:200)
at com.google.common.util.concurrent.SettableFuture.setException(SettableFuture.java:68)
at io.servicefabric.transport.TransportChannel.send(TransportChannel.java:101)
at servicefabric.cluster.examples.ClusterNodeB.main(ClusterNodeB.java:26)

after fresh clone transport test fails with mvn clean install

@antonkharenko @artem-v
clone the repository and run mvn clean install
I 1103-1731:54,574 i.s.t.TransportTest ####### testConnectorSendOrder1Thread : iteration = 8 [main]
I 1103-1731:55,597 i.s.t.Transport Bound to: 10.159.4.229:4802 [sc-boss-91-1]
I 1103-1731:55,605 i.s.t.Transport Connected from 10.159.4.229:4802 to 10.159.4.229:4801: [id: 0x4fcf6bc0, L:/10.159.4.229:53019 - R:/10.159.4.229:4801] [sc-io-92-1]
I 1103-1731:56,194 i.s.t.TransportTest ####### testConnectorSendOrder1Thread : iteration = 9 [main]
I 1103-1731:57,208 i.s.t.Transport Bound to: 10.159.4.229:4802 [sc-boss-93-1]
I 1103-1731:57,214 i.s.t.Transport Connected from 10.159.4.229:4802 to 10.159.4.229:4801: [id: 0xdb1ea334, L:/10.159.4.229:53042 - R:/10.159.4.229:4801] [sc-io-94-1]
I 1103-1731:58,086 i.s.t.BaseTest ********** Test finished: TransportTest.testConnectorSendOrder1Thread ********** [main]
I 1103-1731:58,087 i.s.t.BaseTest ********** Test started: TransportTest.testPingPongClientTFListenAndServerTFListen ********** [main]
I 1103-1731:59,104 i.s.t.Transport Bound to: 10.159.4.229:4801 [sc-boss-95-1]
I 1103-1732:00,121 i.s.t.Transport Bound to: 10.159.4.229:4802 [sc-boss-97-1]
I 1103-1732:00,125 i.s.t.Transport Connected from 10.159.4.229:4801 to 10.159.4.229:4802: [id: 0xf454f69e, L:/10.159.4.229:53086 - R:/10.159.4.229:4802] [sc-io-96-2]
I 1103-1732:00,130 i.s.t.Transport Connected from 10.159.4.229:4802 to 10.159.4.229:4801: [id: 0xb27168bd, L:/10.159.4.229:53087 - R:/10.159.4.229:4801] [sc-io-98-2]
I 1103-1732:00,138 i.s.t.BaseTest ********** Test finished: TransportTest.testPingPongClientTFListenAndServerTFListen ********** [main]
I 1103-1732:00,139 i.s.t.BaseTest ********** Test started: TransportTest.testInvalidListenAddress ********** [main]
I 1103-1732:00,154 i.s.t.BaseTest ********** Test finished: TransportTest.testInvalidListenAddress ********** [main]
I 1103-1732:00,155 i.s.t.BaseTest ********** Test started: TransportTest.testInvalidListenConfig ********** [main]
I 1103-1732:00,164 i.s.t.BaseTest ********** Test finished: TransportTest.testInvalidListenConfig ********** [main]
I 1103-1732:00,170 i.s.t.BaseTest ********** Test started: TransportTest.testUnresolvedHostConnection ********** [main]
I 1103-1732:01,182 i.s.t.Transport Bound to: 10.159.4.229:4801 [sc-boss-103-1]
W 1103-1732:04,046 i.s.t.Transport Failed to connect from 10.159.4.229:4801 to wronghost:49255 [sc-io-104-1]
I 1103-1732:04,051 i.s.t.BaseTest ********** Test finished: TransportTest.testUnresolvedHostConnection ********** [main]
Tests run: 15, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 88.908 sec <<< FAILURE! - in io.scalecube.transport.TransportTest
testBlockAndUnblockTraffic(io.scalecube.transport.TransportTest) Time elapsed: 4.627 sec <<< FAILURE!
java.lang.AssertionError: expected:<1> but was:<2>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:555)
at org.junit.Assert.assertEquals(Assert.java:542)
at io.scalecube.transport.TransportTest.testBlockAndUnblockTraffic(TransportTest.java:423)

testNetworkSettings(io.scalecube.transport.TransportTest) Time elapsed: 3.098 sec <<< FAILURE!
java.lang.AssertionError: expectedMax=550, actual size=1000
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at io.scalecube.transport.TransportTest.testNetworkSettings(TransportTest.java:276)

Define Transport and Cluster thread model

Q1: next code in Transport.start():

incomingMessagesSubject.subscribeOn(Schedulers.from(eventExecutor)); // define that we making smart subscribe

Why? Nice reference: http://www.grahamlea.com/2014/07/rxjava-threading-examples

Q3: in the TransportPipelineFactory.resetDueHandshak() we have:

pipeline.addBefore(transportSpi.getEventExecutor(), "exceptionHandler", "messageReceiver", messageHandler);

Why? Can't we allow to execute business logic right in netty thread?

Q2: we expose ITransportSpi.getEventExecutor(). Primary clients of it are:

clusterMembership =
        new ClusterMembership(localTransportEndpoint, Schedulers.from(transport.getEventExecutor()));
failureDetector =
        new FailureDetector(localTransportEndpoint, Schedulers.from(transport.getEventExecutor()));

Is it safe to expose entire eventExecutor? Can't we expose just concrete stuff (which Scheduler is)?

Q3: gossip causing creation of redudant thread pools:
In ClusterMembershipBuilder:

gossipProtocol = new GossipProtocol(transportEndpoint, Executors.newSingleThreadScheduledExecutor());

Weird constructor:

  public GossipProtocol(TransportEndpoint localEndpoint) {
    this.localEndpoint = localEndpoint;
    this.executor = createDedicatedScheduledExecutor();
  }

Let' s not allow gossip (and other objects) create their own executors, instead, make it use of transport's thread pool (which is in fact netty thread pool).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.