Git Product home page Git Product logo

Comments (22)

akarnokd avatar akarnokd commented on June 5, 2024 5

7) Parallel processing

Reactive datasources are sequential in nature; they form a processing pipeline that runs in FIFO order, even if source values are available while processing earlier values.

Still, sometimes you want to go parallel, for example, execute many service calls at once or distribute some processing over the available CPUs.

There are some experiments with such reactive stream parallelization, but the effect itself can be achieved today with common operators.

Using flatMap

One of the ways is to use flatMap for the job:

range(1, 1_000)
.flatMap(v -> just(v).subscribeOn(computation).map(Object::hashCode))

If there are a lot of source values, scheduling them one-by-one adds quite an overhead. Instead, you can buffer some of them and act on them in a batched fashion:

range(1, 1_000_000)
.buffer(256)
.flatMap(list -> 
    just(list).subscribeOn(computation)
    .map(v -> 
        v.stream().map(Object::hashCode).collect(toList())
    )
)
.flatMapIterable(c -> c)

If the final order still matters, you can use concatMapEager instead of flatMap.

Using window

The operator window creates a nested Publisher<Publisher<T>> sequences which can be batched similarly to buffer but the values themselves become immediately available in the inner Publisher.

We can use this operator with some mapping and flattening to get the parallel effect:

range(1, 1_000_000)
.window(256)
.map(w -> 
    w.observeOn(computation)
    .map(Object::hashCode)
)
.concatMapEager(w -> w);

Note the use of observeOn here: the source inner window is no longer constant like above but emits onNext events as they arrive from the source. Applying subscribeOn would be useless.

Using groupBy

The operator groupBy also creates nested Publisher instances based on some key selector to determine which value goes into which group; i.e., routing or dispatching values to various inner windows.

One way is to use the current value's hashCode module some integer to select a "bucket" for it:

range(1, 1_000_000)
.groupBy(v -> v.hashCode() % 8)
.flatMap(g -> g.observeOn(computation).map(Object::hashCode))

Given a well distributed hashCode, this should yield a fairly balanced parallel processing of values.

Alternatively, you can do round-robin dispatching by using a counter instead:

long[] counter = { 0 };

range(1, 1_000_000)
.groupBy(v -> (counter[0]++) & 7)
.concatMapEager(g -> g.observeOn(computation).map(Object::hashCode))

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 4

3) FlatMap as map or filter

The operator flatMap very versatile and comes up frequently when one wants to join different mapped sequences.

The mapper function let's you return any Publisher instance, including empty or single element (just).

We can utilize this property and implement map with flatMap:

Function<T, R> mapper = ...

map(mapper) == flatMap(v -> just(mapper.apply(v)));

Given the current value from source, we apply the mapper function and return a constant just Publisher.

We can apply the same methodology with filter: return empty if the predicate doesn't hold and return just if it does hold:

Predicate<T> predicate = ...

filter(predicate) == flatMap(v -> predicate.test(v) ? just(v) : empty());

Note, however, that using flatMap as such incurs more overhead than a direct map or filter. Modern reactive libraries still optimize for these cases because flatMap let's you short-circuit a sequence by emitting an error Publisher:

source.flatMap(v -> {
    try {
        performIO(v);
        return just(v);
    } catch (IOException ex) {
        return error(ex);
    }
})

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 4

14) Errors as non-terminal events

By default, reactive protocols treat onError events as fatal and terminal events, tearing down the whole chain you delicately assembled.

Many times, you'd want to treat an error like any other value and keep the sequences running. For this, you have to hide an error from the library by first not calling onError but create a holder class for both normal and error values and emit those. Libraries often support this via classes such as Notification<T> or Signal<T>, very similar to Java 8's Optional<T> but for values, errors and emptiness:

EmitterProcessor<Signal<Integer>> bus = EmitterProcessor.create();

bus.dematerialize().consume(System.out::println, Throwable::printStackTrace);
bus.subscribe(System.out::println, Throwable::printStackTrace);

bus.onNext(Signal.next(1));
bus.onNext(Signal.next(RuntimeException()));
bus.onNext(Signal.next(2));
bus.onNext(Signal.next(3));

You can use dematerialize to turn those notification back to regular calls to onXXX methods on your Subscriber yet have the original source still active.

from reactive-streams-commons.

smaldini avatar smaldini commented on June 5, 2024 4

17) Add a timed gap between elements

ConcatMap has the interesting property of preserving FIFO sequence. You can use that advantage to shift sequence items by a specific time or any arbitrary Publisher like interval. Since concurrent execution of the mapped Timer or Delay is limited to one, shift will be equally applied to all elements:

range(1, 1_000)
.concatMap(t -> Mono.delay(1000).map( v -> t ))
.consume(System.out::println) 
// prints 1, ..... 2, ...... 3 ..... N  with ...... =1 second

==

interval(1000)
.map( i )
.consume(System.out::println)

You can also choose to relatively shift forward using FlatMap which will let many concurrent shift run in parallel, moving all the sequence forward :

range(1, 1_000)
.flatMap(t -> Mono.delay(1000).map( v -> t ))
.consume(System.out::println) 
// prints .... 1,  2,  3 , [..] C ..... N  with ...... = 1 second and C = max concurrency 
// (shifting groups of items)

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 3

1) The merge/flatMap identity

The operators merge and flatMap are closely related and you can implement one with the other:

Given a nested Publisher, merging them is equivalent of applying flatMap with an identity function:

Publisher<Publisher<T>> sources = ...

merge(sources) == sources.flatMap(source -> source);

Given a Publisher and a mapping function T -> Publisher<R>, you can map the source with the function and merge the resulting nested sequence:

Publisher<T> source = ...
Function<T, Publisher<R>> mapper = ...

source.flatMap(mapper) == merge(source.map(mapper));

This identity is also true for other mapping operators:

  • concat and concatMap
  • concatEager and concatMapEager
  • switchOnNext and switchMap

This is a general property of the operation and works for non-reactive but functional API's as well. This can come in handy if some library doesn't offer both methods but only the one.

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 3

2) Compose at subscription time

Modern reactive libraries offer you fluent conversion operators: extend, to, as or compose. You can then apply your own transformative function which runs in assembly time and let's you customize a sequence with preset operators:

Function<Publisher<T>, Publisher<T>> addSchedulers = o -> 
    o.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

range(1, 10).compose(addSchedulers).subscribe(System.out::println);

The function addSchedulers is executed when the range.compose is executed in assembly time.

However, sometimes you want stateful operators with a sequence, for example, a counting map that updates a "global" state. Clearly, such operation doesn't properly work with multiple concurrent Subscribers to a sequence.

Luckily, you can use the composition operators above in conjunction with defer to shift the execution of the function to subscription time and allowing per-Subscriber state in the applied operators:

range(1, 10).compose(o -> defer(() -> {
    int[] counter = new int[] { 0 };
    return o.map(v -> ++counter[0]);
})).subscribe(System.out::println);

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 3

8) Executing an action if the source is empty

Sometimes you want to execute some action, such as logging, when it turns out the source Publisher is empty.

You should be familiar with the switchIfEmpty operator which switches to a new Publisher if the main Publisher is empty. We can then switch to an empty publisher and use doOnComplete with the desired action:

Publisher<T> source = ...

source.switchIfEmpty(empty().doOnComplete(() -> System.out.println("Empty source!")));

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 3

9) Processing elements pairwise

Sometimes you want to process subsequent elements of a Publisher pairwise (in triplets, etc.). A possible way of doing this is by using buffer with skip option:

range(1, 9)
.buffer(2, 1)
.filter(b -> b.size() == 2)
.map(b -> b.get(0) + b.get(1))
.subscribe(System.out::println);

Since buffer may emit a partial buffer if the number of source items is odd, you have to filter out partial buffers (otherwise, the map will blow up with IndexOutOfBoundsException)

An alternative way is to use the publish overload taking a Function<Publisher<T>,Publisher<R>>. This overload, unlike the regular publish returns a Publisher and is in fact a cold operator. What it does is that for the duration of a Subscriber, it makes the source into a hot Publisher and you can attach as many operators to it via the Function callback; thus, the source events are shared among different "paths" without subscribing to the source multiple times.

We can use this publish with skip and zip to get pairs:

range(1, 9)
.publish(o -> zip(o, o.skip(1), (a, b) -> a + b))
.subscribe(System.out::println);

There is no need for filtering as there is no list anymore and we get the pairs nicely via lambda parameters. Naturally, it works with triplets as well:

range(1, 9)
.publish(o -> zip(o, o.skip(1), o.skip(2), (a, b, c) -> a + b + c))
.subscribe(System.out::println);

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 3

13) Awaiting completion of many sources

Sometimes you have a bunch of Publishers doing various things and you want to wait until they all complete and their emitted values are irrelevant in your case.

You may think of zip or combineLatest for doing this along with ignoreElements, however, zip and combineLatest won't work with most source combinations as you'd expect. These operators are terminate eagerly: if one of them completes without elements, they complete immediately and cancel the rest of the outstadning source sequences. In addition, if zip encounters a shorter sequence, it will also terminate eagerly.

You can simply use flatMap in conjunction with ignoreElements() to join the sources' termination:

Publisher<String> source1 = just("Hello");
Publisher<Integer> source2 = range(1, 3);
Publisher<Long> source3 = timer(1, TimeUnit.MILLISECONDS);

just(source1, source2, source3)
.flatMap(o -> o.ignoreElements())
.doOnComplete(() -> System.out.println("Done"))

Some libraries like Reactor also use Mono#when(Mono<T>....) to coordinate the completion and the data from many sources.

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 3

18) Inner-join with flatMap

In SQL, we are used to write inner joins; the ones that pit one table against another table and forms all sorts of pairs. We can do such joins with flatMap:

Publisher<Integer> main = range(1, 5);
Publisher<String> slave = fromArray("a", "bb", "ccc");

main.flatMap(len -> slave.filter(s -> s.length() == len));

This will, for each main element stream through the slave Publisher and filtering out those elements of slave which have the current length from the main.

Some libraries offer overload that takes a two argument function which will receive these pairs and should produce some values out of them:

main.flatMap(v -> slave, (m, s) -> m == s.length());

Of course, one can return a Publisher from the second function and apply flatMap again to flatten out the resulting inner sequences, allowing us to "drop" irrelevant values from the final sequence:

main.flatMap(v -> slave, (m, s) -> m == s.length() ? just(s) : empty())
.flatMap(v -> v)

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 3

21) Caching and clearing

If we want to execute some code once and then hand out the generated values, such as login tokens or results of a network call, we usually can go for cache(), replay(), AsyncProcessor, etc.

However, sometimes the data gets outdated and there is no way of clearing the structures above. But we can restart the whole process and make sure new subscribers get the fresh data if we cache the cache itself and use defer to get the current caching source:

final AtomicReference<Mono<Long>> cache = new AtomicReference<>(getSource());

public Mono<Long> getSource() {
    return Mono.fromCallable(System::currentTimeMillis).cache();
}

public Mono<Long> get() {
    return Mono.defer(() -> cache.get());
}

public void reset() {
    cache.set(getSource());
}

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 2

5) When subscribeOn and observeOn/publishOn act sometimes the same

One of the confusions with reactive operators are centered around subscribeOn and observeoOn (publishOn). For short, subscribeOn acts during subscription time over subscribe() calls and its effects travel upstream whereas observeOn acts during runtime over onNext|onError|onComplete calls and its effects travel downstream.

However, many sequential data sources, such as just, empty, error, fromArray, fromIterable and range, applying either subscribeOn or observeOn will yield the same output: events signalled on the specified Scheduler's thread.

just(1).subscribeOn(scheduler) == just(1).observeOn(scheduler)

range(1, 5).subscribeOn(scheduler) == range(1, 5).publishOn(scheduler)

fromArray("a", "b", "c").subscribeOn(scheduler) == fromArray("a", "b", "c").observeOn(scheduler)

// etc.

The reason for this equivalence is that these constant sources don't really have subscription side-effects and requests to them will trigger emissions on the Scheduler's thread anyway.

Most modern libraries already exploit this in respect to just; both patterns are transformed into a custom scheduled Publisher instance.

In addition, using observeOn may give better performance with the multi-valued sources due to micro-operator-fusion in its front. However, we don't fully see the ramifications of an automatic subscribeOn -> observeOn swap when a sequence is assembled due to the so-called strong pipelining effect when both operators are applied to the sequence:

range(1, 10).subscribeOn(schedulerA).observeOn(schedulerB);

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 2

6) FlatMap as concatMap

The operator flatMap let's you run multiple sources at once while concatMap runs one sequence at a time.

However, flatMap usually offers a parameter that let's limit its concurrency level. The smallest allowed concurrency level is 1, which will functionally act the same as concatMap:

flatMap(mapper, 1) == concatMap(mapper)

This identity helped many times to rule out possible bugs in either flatMap's or concatMap's implementation as a cause of some weird behavior or exception.

Note that in many implementations of concatMap, the operator prefetches its source values (2 elements in RxJava, 32 in Reactor) whereas a concurrency-constrained flatMap will prefetch exactly one source value and thus trigger sequence dependent side-effects at different times.

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 2

10) Executing a function asynchronously

You may often want to execute some Supplier or Runnable/Action0 asynchronously. Maybe it's something blocking or just long running and don't want to block/hold up your current thread.

Many modern libraries offer the fromCallable(Callable<T>) source factory where you can use a regular old Callable to execute some action. With Java 8 lambdas, it's easy to convert your callbacks into Callables:

Supplier<Integer> supplier = () -> 1;
fromCallable(supplier::get).subscribeOn(computation);

Runnable run = () -> System.out.println("Hello!");
fromCallable(() -> { run.run(); return null; }).subscribeOn(computation);

If for some reason fromCallable is not available, you can use a combination of just and map to get the same effect:

just("whatever")
.subscribeOn(computation)
.map(ignored -> supplier.get());

Of course, you can swap the order of subscribeOn and map here; they are functionally equivalent. However, as we saw with Gem # 5, just().subscribeOn() is usually optimized in modern libraries and an intermediate just().map().subscribeOn() would most likely prevent some optimizations.

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 2

11) Caching the last element and clearing it on demand

A possible use for BehaviorSubject/BehaviorProcessor (EmitterProcessor#replayLast in Reactor), which remembers the last value and emits it to new Subscribers at the beginning of a sequence, is to use it as a single element cache - also known as a "reactive property".

Sometimes, the contents of this cache can become outdated and shouldn't be emitted to new Subscribers until a proper fresh value is generated. Unfortunately, most implementations don't offer a clear() method so until a new value gets assigned to to it via onNext the old one stays.

We will use this onNext in fact to clear out the current value. To do this, we have to establish a protocol where a special value from the Subject's value type indicates emptiness and instructs Subscribers to ignore it if encountered:

Integer CLEAR = new Integer(0);

BehaviorProcessor<Integer> cache = new BehaviorProcessor<>(CLEAR);

Publisher<Integer> front = cache.filter(v -> v != CLEAR);

front.subscribe(System.out::println);

cache.onNext(10);
cache.onNext(CLEAR);
cache.onNext(20);

front.subscribe(System.out::println);

In the example, we create a new (!) Integer instance and use it for reference comparison to determine if the cache is "empty".

Sometimes, however, you can't just create such an "empty" instance of the type you are working with. In this case, you have to revert to the lowest common denominator type: Object and downcast anything else back to your type that isn't the indicator:

Object CLEAR = new Object();

BehaviorProcessor<Object> cache = new BehaviorProcessor<>();

Publisher<String> front = cache.filter(v -> v != CLEAR).cast(String.class);

cache.onNext("abc");
cache.onNext(CLEAR);
cache.onNext("def");

front.subscribe(System.out::println);

Of course, directly exposing BehaviorSubject<Object> is a welcoming sign for all kinds of types. You can re-establish the type safety by wrapping the code into some class and allowing only type-correct onNext calls:

public final class Cache<T> implements Observer<T>, Publisher<T> {
    static final Object CLEAR = new Object();
    final BehaviorProcessor<Object> cache = new BehaviorProcessor<>(CLEAR);

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        cache.filter(v -> v != CLEAR).subscribe(subscriber);
    }

    @Override
    public void onNext(T t) {
        cache.onNext(t);
    }

    @Override
    public void onError(Throwable t) {
        cache.onError(t);
    }

    @Override
    public void onComplete() {
        cache.onComplete();
    }

    public void clear() {
        cache.onNext(CLEAR);
    }
}

Remember, Processors (and Subject) require a non-concurrent calls to their onXXX methods so you should only call them in a serialized fashion (such as a GUI thread or single threaded event-loop).

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 2

19) Eager concatenation

Sometimes, one would like to merge concurrently running sources and keep their order at the same time. The usual flatMap doesn't keep the order and the usual concatMap doesn't start the sources.

Many reactive libraries have an operator for this: concatMapEager (and concatEager) that gives the middle ground. In case your library doesn't support this operator, you can achieve a similar effect by concatenating pre-started sources via the help of an UnicastProcessor:

range(1, 10)
.map(v -> {
    Flux<T> o = getSource(v);
    UnicastProcessor<T> up = UnicastProcessor.create();
    o.subscribe(up);
    return up;
})
.concatMap(v -> v)
.subscribe(System.out::println, Throwable::printStackTrace);

Here, values 1..10 are mapped to a source which is then started by subscribing an UnicastProcessor to it. UnicastProcessor caches values (unbounded) until a subscriber, such as that of inside concat subscribes to it and dutifully replays/relays items.

In case you don't have an UnicastProcessor available, the are alternatives to it:

  1. By using ReplayProcessor instead, but note that it will retain all values until the whole sequence gets GC'd.
.map(v -> {
    Flux<T> o = getSource(v);
    ReplayProcessor<T> rp = ReplayProcessor.create();
    o.subscribe(rp);
    return rp;
})
  1. By using replay() and connect()
.map(v -> {
    Flux<T> f = getSource(v);
    ConnectableFlux<T> cf = f.replay();
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    cf.connect();
    return cf;
})

That extra subscribe is necessary because replay may not start actually running its source unless there are Subscriber(s) waiting for the items already.

  1. By using replay() and autoConnect() or cache()
.map(v -> {
    Flux<T> f = getSource(v);
    Flux<T> cf = f.cache();
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    return cf;
})

By default, cancellation won't cancel the prestarted sources. You have to manually wire up the end Subscriber with all participants:

List<Cancellation> cancellations = ...

// ...
.map(v -> {
    Flux<T> f = getSource(v);
    UnicastProcessor<T> up = UnicastProcessor.create();
    cancellations.add(f.subscribe(up));
    return us;
})

// ...
.map(v -> {
    Flux<T> f = getSource(v);
    ConnectableFlux<T> cf = f.replay();
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    cancellations.add(cf.connect());
    return cf;
})

// ...
.map(v -> {
    Flux<T> f = getSource(v);
    Flux<T> cf = f.replay().autoConnect(1, cancellations::add);
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    return cf;
})

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 2

20) defer via using

The operator defer let's you generate a source Observable/Publisher for each Subscriber whereas using let's you generate a resource per Subscriber, then use it to gereate a Observable/Publisher.

Therefore, you can imitate defer via a using setup where you create and ignore a resource and just create the source:

Supplier<Publisher<Integer>> s = () -> range(1, 10);

defer(s) => using(() -> "whatever", v -> s, v -> { });

However, you could also create the source itself as the resource and use identity-mapping on it:

using(s, v -> v, v -> { }) 

using(s::get, v -> v, v -> { })

The first shorter case is applicable if your library's using takes a Supplier and the second if it takes some () -> T generator function.

The reverse direction, namely expressing using as defer is a bit more involved:

Supplier<R> resource = ...
Function<R, Publisher<T>> source ==
Consumer<R> disposer;

defer(() -> 
    fromCallable(resource)
    .flatMap(r -> {
        try { 
            return source.apply(r)
            .doOnTerminate(() -> disposer.accept(r))
            .doOnUnsubscribe(() -> disposer.accept(r));
        } catch (Throwable ex) {
            disposer.accept(r);
            return error(ex);
        }
     })
);

In RxJava, a terminal event is followed by an unsubscribe call from most end-consumers (due to SafeSubscriber). In case the resource or the disposer is not idempotent, we have to make sure the dispose happens exactly once:

defer(() -> 
    fromCallable(resource)
    .flatMap(r -> {
        AtomicBoolean once = new AtomicBoolean();
        try { 
            return source.apply(r)
            .doOnTerminate(() -> disposeOnce(once, disposer, r))
            .doOnUnsubscribe(() -> disposeOnce(once, disposer, r));
        } catch (Throwable ex) {
            disposeOnce(once, disposer, r);
            return error(ex);
        }
     })
);

<R> void disposeOnce(AtomicBoolean once, Consumer<R> disposer, R resource) {
    if (once.compareAndSet(false, true)) {
        disposer.accept(resource);
    }
}

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 2

22) Compute a single value only when requested

By design, fromCallable executes the Callable immediately and doesn't wait for a downstream request to appear.

In case you want to compute only when requested, you can use the same jump-start trick with just().map() similar to Gem 10).

just("irrelevant")
.map(unused -> {
    try {
        return callable.call();
    } catch (Exception ex) {
        throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
    }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Since subscribe() requests immediately, the difference from fromCallable is not obvious at first. To see the difference, we need to manually request after the subscribe() has returned. One way for demonstrating this is via TestSubscriber:

TestSubscriber<T> ts = new TestSubscriber<>(0L);

just("irrelevant")
.map(unused -> {
    System.out.println("callable.call()");
    try {
        return callable.call();
    } catch (Exception ex) {
        throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
    }
})
.subscribe(ts);

System.out.println("subscribe()");
ts.requestMore(1); // or ts.request(1); in Rsc

Now the console will first print subscribe() followed by callable.call().

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 1

4) Continuations with concat

Sometimes, you want to run through a sequence of values and then resume with another sequence of a completely different type once the first completed. In this case, you'd usually also ignore the first sequence's values.

Some libraries already offer operators for it: after, then, andThen, etc. but if not available, you can use concat for it:

Publisher<Integer> a = ...
Publisher<String> b = ...

Publisher<String> c = (Publisher<String>)concat(a.ignoreElements(), b);

Unfortunately, you'll need some explicit casts and @SuppressWarnings for this; Java's type system doesn't seem to be powerful enough to express it otherwise.

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 1

12) processing elements of list-based values

Sometimes, an API gives you a Publisher<List<T>> where the signaled values are Lists and you want to process the elements in the list while holding the list together to be processed as a whole downstream later on.

Let's start out as a common source of Lists:

Publisher<List<Integer>> source = range(1, 1_000_000).buffer(256);

In-place processing

You can just simply run a for loop and manipulate the list in-place if the list is mutable (buffer emits such mutable list):

source.map(list -> {
    for (int i = 0; i < list.size(); i++) {
        list.set(i, list.get(i) + 2_000_000);
    }
    return list;
});

Or as a new list:

source.map(list -> {
    List<Integer> newList = new ArrayList<>(list.size());
    for (Integer v : list) {
        newList.add(v + 2_000_000);
    }
    return newList;
});

Stream processing

You can combine the worlds of reactive and the interactive Java 8 Stream processing:

source.map(list -> 
    list.stream()
    .map(v -> v + 2_000_000)
    .collect(toList())
);

ConcatMap

If you can't mute the list, stuck on Java 7 or before, or just don't want to look "non functional", you can use concatMap (not flatMap in order to keep the original list order), extract each list, process elements and then recollect the result into another list:

source.concatMap(list ->
    fromIterable(list)
    .map(v -> v + 2_000_000)
    .toList()
);

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 1

15) defer expressed as flatMap

The operator defer let's you return a custom Publisher instance for each of the Subscribers. The effect can be simulated by using just with flatMap:

Func0<Observable<Integer>> supplier = () -> range(System.currentTimeMillis() & 1023, 5);

defer(supplier) == just("whatever").flatMap(v -> supplier.call());

In fact you could write it with concatMap as well:

defer(supplier) == just("whatever").concatMap(v -> supplier.call());

Even though using flatMap and concatMap this way looks like too much overhead, most modern libraries optimize away just with a special defer like operator that behaves like this:

Function<Integer, Observable<Integer>> function = v -> range(v, 2);

just(-10).flatMap(function) == defer(() -> function.call(-10));

by exctacting the constant value from just and building a Func0 supplier that calls the original function with this constant and uses its returned Observable.

For this reason, using nest() with concatMap/flatMap is practically a no-op:

range(1, 5).nest().concatMap(o -> o.take(3)) == range(1, 5).take(3)

from reactive-streams-commons.

akarnokd avatar akarnokd commented on June 5, 2024 1

16) Emit elements of a list periodically

The source operator interval let's you create a periodic sequence of ever increasing Long values. Sometimes, we don't care about the values themselves and just want to act at the right time.

Then comes a requirement of emitting items from a list periodically, that is, with some fixed delay between elements. We can first map the Long values into the elements of the list, but we should also make sure we don't run out of indexes and get IndexOutOfBoundsException:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

interval(1, TimeUnit.SECONDS)
.takeUntil(t -> t == list.size() - 1)
.map(t -> list.get((int)t))
.subscribe(System.out::println)

Given the interval, we take its elements until the current running value is exactly the list's size minus one. The operator takeUntil executes the predicate after the value itself has been emitted downstream. In the map then, we simply call List.get() with a cast-down of the timer's value and now we get an item from the list.

from reactive-streams-commons.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.