Git Product home page Git Product logo

smallrye-mutiny's Introduction

Build Status Build status (1.x branch) License Maven Central Javadoc

⚡️ Mutiny, an Intuitive Event-Driven Reactive Programming Library for Java

Mutiny is a modern reactive programming library for Java.

Mutiny provides a simple but powerful asynchronous development model to build reactive applications.

This project is sponsored by Red Hat.

🚀 Overview

Mutiny can be used in any Java application exhibiting asynchrony.

From reactive microservices, data streaming, event processing to API gateways and network utilities, Mutiny is a great fit.

Event-Driven

Mutiny places events at the core of its design. With Mutiny, you observe events, react to them, and create elegant and readable processing pipelines.

💡 A PhD in functional programming is not required.

Navigable

Even with smart code completion, classes with hundred of methods are confusing.

Mutiny provides a navigable and explicit API driving you towards the operator you need.

Non-Blocking I/O

Mutiny is the perfect companion to tame the asynchronous nature of applications with non-blocking I/O.

Declaratively compose operations, transform data, enforce progress, recover from failures and more.

Quarkus and Vert.x native

Mutiny is integrated in Quarkus where every reactive API uses Mutiny, and Eclipse Vert.x clients are made available using Mutiny bindings.

Mutiny is however an independent library that can ultimately be used in any Java application.

Reactive Converters Built-In

Mutiny is based on the Reactive Streams protocol, and so it can be integrated with any other reactive programming library.

In addition, Mutiny offers converters to interact with other popular libraries and Kotlin.

👓 Mutiny workshop examples

You can learn about Mutiny from the documentation and website.

This repository also contains the Mutiny workshop examples that cover the common concerns through self-contained executable JBang scripts.

📦 Build instructions

Mutiny is built with Apache Maven, so all you need is:

./mvnw install

If you want to run a quick build without running tests or generating API docs, then run:

./mvnw -Dquickly
Git branch Versions Baseline Compliance
main 2.x.y (in development) Java 11, java.util.concurrent.Flow Reactive Streams TCK 1.0.4
2.5.x 2.5.y (backports, bug fixes) Java 11, java.util.concurrent.Flow Reactive Streams TCK 1.0.4
1.x 1.x.y (sunset, rare bug fixes) Java 8, Reactive Streams 1.0.4 Reactive Streams TCK 1.0.4

✨ Contributing

See the contributing guidelines

Mutiny is an open project, feel-free to:

👋 Discussions and support

For anything related to the usage of Mutiny in Quarkus, please refer to the Quarkus support

For more general discussions about Mutiny, you can:

🧪 Publications

Julien Ponge, Arthur Navarro, Clément Escoffier, and Frédéric Le Mouël. 2021. Analysing the Performance and Costs of Reactive Programming Libraries in Java. In Proceedings of the 8th ACM SIGPLAN International Workshop on Reactive and Event-Based Languages and Systems (REBLS ’21), October 18, 2021, Chicago, IL, USA. ACM, New York, NY, USA, 10 pages. (PDF)

smallrye-mutiny's People

Contributors

acanda avatar alexandreguidin avatar andreas-eberle avatar andrezimmermann avatar avivmu avatar cescoffier avatar dependabot-preview[bot] avatar dependabot[bot] avatar edeandrea avatar fromage avatar geoand avatar gwenneg avatar heubeck avatar inego avatar jponge avatar kenfinnigan avatar lhauspie avatar machi1990 avatar manofthepeace avatar nmcl avatar nryanov avatar oliver-brm avatar ozangunalp avatar pcasaes avatar radcortez avatar rgmz avatar sanne avatar smallrye-ci avatar stepan-romankov avatar stuartwdouglas avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

smallrye-mutiny's Issues

Consider adding a `onItem().ifNotNull()` group

That would provide symmetric API for ifNull().

We should also consider changing map to be only called if the item is non-null. This would ease migration from RX Maybe or Reactor Mono.

RxJava's PublishSubject in Mutiny

What is the idiomatic way to recreate RxJava's PublishSubject in Mutiny?
(http://reactivex.io/RxJava/javadoc/io/reactivex/subjects/PublishSubject.html)
Suppose we need to have a Multi<Long> that starts broadcasting ticks every second upon its creation, without any subscribers, so that subsequent subscribers start receiving the ongoing ticks.

Multi<Long> multi = Multi
                .createFrom().ticks().every(Duration.ofSeconds(1))
                .broadcast().toAllSubscribers();

TimeUnit.SECONDS.sleep(3);

multi.subscribe().with(tick -> System.out.println("tick " + tick));

Here, even after waiting for 3 seconds, the printed out ticks start from 0, obviously because that's when the subscription happens. How can I make multi a hot broadcast without any subscribers?

A side question. Are this repository's issues the proper place to ask questions about Mutiny? There is no "mutiny" tag on StackOverflow. Also, the official homepage does not mention such a place.

java.lang.IllegalStateException: The subscription to events has been cancelled

Hi!

I'm trying to use mutiny to achieve reactive/parallel execution of multiple operations and merge them into a single async operation as seen from caller:

  • single write to a SQL database
  • multiple writes to a Kafka topic

I've been struggling with the API (it seems a bit quirky) to achieve this, and the typing seems wrong -- I would just like a single Uni<Void> as a return type.

@ApplicationScoped
@Slf4j
public class BeaconService {

  @Inject
  BeaconDatasource beaconDatasource;

  @Inject
  PageDatasource pageDatasource;

  @Inject
  @Channel(EventsChannel.NAME)
  Emitter<AbstractBrowserEvent> eventsEmitter;

  public Uni<Uni<Void>> process(UUID sessionID, UUID uid, UUID pageID, Beacon beacon) {
    return pageDatasource.pageExists(sessionID, uid, pageID).onItem().produceUni(exists -> {
      if (!exists) {
        log.warn("Unlinked beacon sessionID={} uid={} pageId={}", sessionID, uid, pageID);
        throw Boom.badRequest().message("Unlinked beacon").exception();
      }

      Multi<Uni<Void>> beaconWrite = Multi.createFrom()
          .uni(Uni.createFrom().item(beaconDatasource.store(beacon)));

      Multi<Uni<Void>> eventWrites = Multi.createFrom().iterable(beacon.getEvents())
          .onItem()
          .apply(event -> Uni.createFrom().completionStage(eventsEmitter.send(event))
              .onFailure()
              .apply(throwable -> {
                log.error("Something went wrong while sending event to Kafka topic", throwable);
                return null;
              })
              .onItem()
              .apply(item -> null));

      return Multi
          .createBy()
          .concatenating()
          .streams(eventWrites, beaconWrite)
          .collectItems()
          .last();
    });
  }
}

Moreover, Im seeing some errors:

2020-04-14 16:40:34,362 ERROR [org.jbo.res.res.i18n] (vert.x-eventloop-thread-22) RESTEASY002020: Unhandled asynchronous exception, sending back 500: java.lang.IllegalStateException: The subscription to events has been cancelled
	at io.smallrye.reactive.messaging.extension.EmitterImpl.verify(EmitterImpl.java:138)
	at io.smallrye.reactive.messaging.extension.EmitterImpl.send(EmitterImpl.java:112)
	at com.meemaw.rec.beacon.service.BeaconService.lambda$process$2(BeaconService.java:49)
	at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:42)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
	at io.smallrye.mutiny.operators.AbstractMulti$1.onNext(AbstractMulti.java:90)
	at io.smallrye.mutiny.subscription.SerializedSubscriber.onItem(SerializedSubscriber.java:69)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti$IteratorSubscription.fastPath(IterableBasedMulti.java:118)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti$BaseRangeSubscription.request(IterableBasedMulti.java:68)
	at io.smallrye.mutiny.subscription.SerializedSubscriber.request(SerializedSubscriber.java:121)
	at io.smallrye.mutiny.operators.AbstractMulti$1$1.request(AbstractMulti.java:63)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.request(MultiOperatorProcessor.java:73)
	at io.smallrye.mutiny.subscription.SerializedSubscriber.request(SerializedSubscriber.java:121)
	at io.smallrye.mutiny.operators.AbstractMulti$1$1.request(AbstractMulti.java:63)
	at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.setOrSwitchUpstream(SwitchableSubscriptionSubscriber.java:189)
	at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.onSubscribe(SwitchableSubscriptionSubscriber.java:95)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onSubscribe$0(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onSubscribe(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onSubscribe$0(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onSubscribe(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.mutiny.operators.AbstractMulti$1.onSubscribe(AbstractMulti.java:55)
	at io.smallrye.mutiny.subscription.SerializedSubscriber.onSubscribe(SerializedSubscriber.java:43)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onSubscribe(MultiOperatorProcessor.java:44)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onSubscribe$0(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onSubscribe(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onSubscribe$0(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onSubscribe(ContextPropagationMultiInterceptor.java:31)
	at io.smallrye.mutiny.operators.AbstractMulti$1.onSubscribe(AbstractMulti.java:55)
	at io.smallrye.mutiny.subscription.SerializedSubscriber.onSubscribe(SerializedSubscriber.java:43)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti.subscribe(IterableBasedMulti.java:48)
	at io.smallrye.mutiny.operators.multi.builders.IterableBasedMulti.subscribe(IterableBasedMulti.java:32)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:48)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:37)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:68)
	at io.smallrye.mutiny.operators.multi.MultiMapOp.subscribe(MultiMapOp.java:24)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:48)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.mutiny.operators.multi.MultiConcatOp$ConcatArraySubscriber.onCompletion(MultiConcatOp.java:109)
	at io.smallrye.mutiny.operators.multi.MultiConcatOp.subscribe(MultiConcatOp.java:62)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:48)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:37)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:68)
	at io.smallrye.mutiny.operators.multi.MultiLastItemOp.subscribe(MultiLastItemOp.java:18)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:48)
	at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
	at io.smallrye.mutiny.operators.UniCreateFromPublisher.subscribing(UniCreateFromPublisher.java:64)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
	at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
	at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
	at io.smallrye.mutiny.operators.UniFlatMapOnItem.invokeAndSubstitute(UniFlatMapOnItem.java:48)
	at io.smallrye.mutiny.operators.UniFlatMapOnItem$2.onItem(UniFlatMapOnItem.java:65)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
	at io.smallrye.mutiny.operators.UniMapOnResult$1.onItem(UniMapOnResult.java:39)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
	at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
	at io.smallrye.mutiny.vertx.AsyncResultUni.lambda$subscribing$1(AsyncResultUni.java:34)
	at io.vertx.mutiny.pgclient.PgPool$3.handle(PgPool.java:212)
	at io.vertx.mutiny.pgclient.PgPool$3.handle(PgPool.java:209)
	at io.vertx.sqlclient.impl.SqlResultBuilder.handle(SqlResultBuilder.java:92)
	at io.vertx.sqlclient.impl.SqlResultBuilder.handle(SqlResultBuilder.java:33)
	at io.vertx.sqlclient.impl.PoolBase$1$1.lambda$schedule$0(PoolBase.java:97)
	at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:210)
	at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init$0(SocketConnectionBase.java:81)
	at io.vertx.core.net.impl.NetSocketImpl.lambda$new$2(NetSocketImpl.java:100)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
	at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:356)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:369)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:232)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:173)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write$0(PgEncoder.java:78)
	at io.vertx.pgclient.impl.codec.PgCommandCodec.handleReadyForQuery(PgCommandCodec.java:124)
	at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:229)
	at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:87)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:830)

Need TupleX

Hi,

Currently, the API has Tuple from 1 to 5. It means that we can join max 5 async calls. I work on a project where I need more API calls (Async call to a Postgresql DB).
It would be great if the API extends it with Tuple up to 10 or more. (6 in my case but can ben more in future)
I'm not able to extends Tuple5 because it's constructor is package private.
May be that a var args solution is possible?
Thank you

Blocking operation causes exception although `subscribeOn` is used.

A blocking operation causes exception although subscribeOn is used. It might be due to multiple blocking operations, but I cannot be sure.

This causes exception:

.onItem().apply(file -> blockingOperation(file))
.subscribeOn(Infrastructure.getDefaultWorkerPool())

Exception:

2020-03-17 16:31:17,831 ERROR [asc.import-aservice] () [vert.x-eventloop-thread-2] [ApiUploadMessageListener.java:115] - CollectorID 6: Exception while trying to process tlv file from a queue.: java.lang.IllegalStateException: You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.

This does not cause exception:

.onItem().produceUni(file1 -> Uni.createFrom().item(file1)
		.onItem().apply(file -> blockingOperation(file))
		.subscribeOn(Infrastructure.getDefaultWorkerPool()))
.concatenate()

The whole chain looks somewhat like this:

return Multi.createFrom().item(status)
		.transform().byFilteringItemsWith(status1 -> status1.getStatus() == Status.UPLOADED)
		.onItem().apply(status1 -> getFile(fileId))
		.subscribeOn(Infrastructure.getDefaultWorkerPool())
		.onItem().produceCompletionStage(fileEntity -> update().thenApply(aVoid -> fileEntity))
		.concatenate()
		.onItem().produceUni(file1 -> Uni.createFrom().item(file1)
				.onItem().apply(file -> process(file))
				.subscribeOn(Infrastructure.getDefaultWorkerPool()))
		.concatenate()
		.onItem().produceCompletionStage(someEntity ->
				update().thenApply(aVoid -> someEntity)
		)
		.concatenate()
		.transform().byFilteringItemsWith(SomeEntity::isValid)
		.onItem().apply(someEntity -> KafkaRecord.of(key, value))
		.onFailure(throwable -> throwable instanceof InvalidDataException)
		.recoverWithMulti(throwable -> Multi.createFrom().empty())
		.onFailure().invoke(throwable -> {
			// some processing
			if (throwable instanceof RuntimeException) {
				throw (RuntimeException) throwable;
			} else {
				throw new RuntimeException(throwable);
			}
		});

Why not RxJava for smallrye-*?

I have a question related to the reason why you are introducing another reactive library? I thought that well adopted reactive libraries like RxJava3 are good enough. I can see that Mutiny is simpler, but is there any other reason behind your choice not to go with RxJava? I'm just trying to wrap my head around this.

smallrye-reactive-types instead of -operators?

IMHO, Uni and Multi are reactive types, not operators. Operators being filter, map, flatMap, window, debounce, etc. Reactive Streams Operators are indeed operators for already existing reactive types (Publisher etc.), but here, we mainly create new reactive types.

Thoughts?

Add subscribe method with callbacks on both Uni and Multi

Need to be added:

  • .subscribe().with((val, x) -> {}) - Uni and Multi
  • .subscribe().with((val, x) -> {}, f -> {}) - Uni and Multi
  • .subscribe().with((val, x) -> {}, f -> {}, () -> {}) - Multi
  • .subscribe().with(onsub, onval, onerr, oncomplete) - Multi
  • .subscribe().with(onsub, onval, onerr) - Uni

Can you please help me understanding how to handle Uni in these cases?

Hi,
I am creating a Quarkus app and therefore I am using Mutiny. I am still getting used to it but it seems interesting.
Nevertheless, I think I am missing something, because there are two concepts I am missing.
Mutiny uses Uni and Multi, ok, fine.

1- How do I create a method that "just completes"? I would like to use something like a Completable: it has no items, but I need to know when it completes and whether it failed or not.
2- How do I generate a Uni that completes but with no item? (Uni, as defined in the documentation can return 0 or 1 items). I could only generate a unit that returns a null item Uni.createFrom().nullItem() (but it is still an items), or a Uni that never completes (Uni.createFrom().nothing()). How can I create a Uni that returns 0 items?

thanks!

PS:
Case 1 is useful for "void" methods. Let's say that I have an update method that either returns or fails.
Case 2 is harder to find an example from the top of my head, but it should be possible since the documentation says it is possible.

thanks!

Implement the MultiOnResult group

It's the group retrieved using multi.onResult(). It should contain:

Multi<R> mapToresult(Function<? super T, ? extends V> mapper)
Multi<R> scan(Supplier<S> initialStateProducer, Function<S, ? super T, S> scanner)
Multi<R> scan(BiFunction<T, T ,T> scanner)
<R> Multi<R> castTo(Class<R> clazz)
Multi<T> delay() - same as uniDelay
Multi<T> filterWith(Predicate<? super T>)
Multi<T> filterWith(Function<? super T, Uni<Boolean>)

MultiFlattenGroup<T> mapToMulti(Function<? super T, Multi<? extends R> mapper) // Same 
with Publisher ?
MultiFlattenGroup<T> mapToUni(Function<? super T, Uni<? extends R> mapper) 
MultiFlattenGroup<T> mapToIterable(Function<? super T, Iterable<? extends R> mapper)
MultiFlattenGroup<T> mapToMulti(BiConsumer<? super T, MultiEmitter<? super R>> mapper)


FlatMapGroup<T>:

FlatMapGroup<T> withConcurrency(...)
FlatMapGroup<T> preserveOrder()
FlatMapGroup<T> awaitCompletionToFireFailure() // or just awaitCompletion ?
Multi<T> flatten()
  • Should be something like keepIfNotSame be added (like distinctUntilChange) ?

Implement the MultiCollect group

It would be the group retrieved with: Multi.collect()

Here are some of the methods from this group (on a Multi<T>):

Uni<T> first(); 
Uni<T> last();
Uni<List<T>> asList(); // Accumulate the results in a list
Uni<X> with(Collector<? super T, A, ? extends X>) // see https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html
Uni<X> in(Supplier<X> producer, BiConsumer<X, T> add)
Uni<Map<K, T>> asMap(Function<? super T, ? extend K> keyProducer)
Uni<Map<K, V>> asMap(Function<? super T, ? extend K> keyProducer, Function<? super T, ? extend V> valueProducer)
Uni<Map<K, Collection<T>>> asMultiMap(...)
Uni<Map<K, Collection<V>>> asMultiMap(...)

Uni with Optional

How to map to Optionals using Unis?

  @Override
  public Uni<Optional<PageDTO>> getPage(UUID pageID, UUID sessionID, UUID deviceID) {
    Tuple values = Tuple.of(pageID, sessionID, deviceID);
    return pgPool
        .preparedQuery(SELECT_PAGE_RAW_SWL, values)
        .map(
            rowSet -> {
              if (!rowSet.iterator().hasNext()) {
                return Optional.empty();
              }
              Row row = rowSet.iterator().next();
              return Optional.of(
                  new PageDTO(
                      row.getString("org_id"),
                      row.getUUID("uid"),
                      row.getString("url"),
                      row.getString("referrer"),
                      row.getString("doctype"),
                      row.getInteger("screen_width"),
                      row.getInteger("screen_height"),
                      row.getInteger("width"),
                      row.getInteger("height"),
                      row.getInteger("compiled_timestamp")));
            })
        .onFailure()
        .invoke(
            throwable -> {
              log.error("Failed to get page id={}", pageID);
              throw new DatabaseException();
            });
  }

This code will fails with the error:

Required type: Uni<Optional<PageDTO>>
Provided: Uni<Optional<?>>

Document difference between MultiFlatten concatenate and merge

Both methods have same signature, same return type, and almost the same description. They call the same MultiFlatMapOp constructor with a different concurrency parameter. It is hard to figure out what is the difference and what to use.

Version: 0.4.0

Rethink release process

  • master should use 999-SNAPSHOT
  • versions should be computed during the release process in a reliable way (right now it works by accident)
  • the documentation upload should be fixed (#58)

How to collect a list of Unis?

What is the best way to collect a list of Unis? For example I have a List<Uni> and would like to collect those and produce Uni<List>. Another task might be to sum all the integers and result in Uni

Finalize the vocabulary for events

Right now we use:

  • result - could be element, item, value
  • failure - could be error, exception
  • completion - I don't have alternatives
  • cancellation - I don't have alternatives
  • requests - I don't have alternatives

Invoke CompletionStage

It is hard to invoke function that returns CompletionStage (in comparision to producing from CompletionStage). An example would be calling reactive messaging from stream.

I tried to simulate it like this:

.onItem().produceCompletionStage(entity -> 
    methodReturningCompletionStage()
        .thenApply(aVoid -> shiftEntity)
)
.concatenate()

Please add invoke that accepts CompletionStage.

Mutiny 0.4.0

Checked exceptions in subscribe().with(...) consumer

When subscribing to an item, we provide a java.util.function.Consumer which does not declare any throws. Because of that, any calls made from that consumer which might throw checked exceptions must be wrapped in try blocks.
In RxJava, the item subscription is done by providing their own Consumer which supports calling any code that might throw checked exceptions:

/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
@FunctionalInterface
public interface Consumer<@NonNull T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Throwable if the implementation wishes to throw any type of exception
     */
    void accept(T t) throws Throwable;
}

I wonder whether something like this can be supported in Mutiny.

Fix documentation update during the release process

At the moment it fails with:

2019-12-19T15:11:07.9347234Z Cloning repo
2019-12-19T15:11:07.9364989Z Cloning into 'site'...
2019-12-19T15:11:15.6335143Z Warning: Permanently added the RSA host key for IP address '192.30.253.112' to the list of known hosts.
2019-12-19T15:11:15.6939235Z [email protected]: Permission denied (publickey).
2019-12-19T15:11:15.6957046Z fatal: Could not read from remote repository.
2019-12-19T15:11:15.6958973Z Copy content
2019-12-19T15:11:15.6960007Z 
2019-12-19T15:11:15.6962044Z Please make sure you have the correct access rights
2019-12-19T15:11:15.6964436Z and the repository exists.
2019-12-19T15:11:15.6982052Z cp: target 'site' is not a directory
2019-12-19T15:11:15.6984101Z yes: standard output: Broken pipe
2019-12-19T15:11:15.6985964Z Pushing
2019-12-19T15:11:15.6987855Z .build/doc.sh: line 14: cd: site: No such file or directory
2019-12-19T15:11:15.7140754Z HEAD detached at 0.1.2
2019-12-19T15:11:15.7142481Z nothing to commit, working tree clean
2019-12-19T15:11:15.7223939Z error: src refspec gh-pages does not match any
2019-12-19T15:11:15.7226207Z error: failed to push some refs to 'https://github.com/***/***-mutiny'

because it does not use the correct git url (it should use the https with token)

Is grouping too much?

The question has been slightly discussed in #1, but better having an issue for this.

Right now, we do something like this:

getSome() // produce an uni
   .onNoResult().after(ofHours(1)).fail()
   .onResult().mapToResult(i -> i*2)
   .onFailure().recoverWith(42)

Should we consider removing the grouping to be like:

getSome() // produce an uni
   .onTimeout(ofHours(1)).fail()
   .onResultMap(i -> i*2)
   .onFailureRecoverWith(42)

The second approach has the advantage to use a single class, which could make a bit easier than with the current chaining. Most of the method will start with onX, where X is the type of events triggering the behavior. It would also slightly reduce object creation (at assembly time).

So, to be discussed....

Renaming to Mutiny

The named has been chosen: SmallRye Mutiny is will be.

  • Rename repository
  • Rename package
  • Rename artifactId and groupId

Converters switch from SPI to converter method

Currently we handle conversions between types by requiring an SPI is implemented and it's registered as a Service.

We want to switch to a model whereby we can do something like:

Uni<T> uni = Uni.createFrom().converter(mySingleToUniConverter, instance)
Single<T> single = uni.adapt().with(myUniToSingleConverter)

Implement Multi.onTimeout

Same method as UniOnTimeOut, but first the group access must be clarified between:

  • onTimeout().of(Duration).on(Executor).... and onTimeout().before(Uni<X>)...
  • onNoResult().after(Duration).on(Executor)... and onNoResult().before(Uni<X>)...

Implement the MultiOnEmpty group

Group retrieved using multi.onCompletion().ifEmpty() with the following methods:

* failWith (Throwable, Supplier<Throwable>)
* continueWith (T, Supplier<T>)
* switchTo (Multi<T>, Supplier<Multi<T>>)

Add Uni.flatMap

A flatMap method providing the default traditional behavior should be added.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.