Git Product home page Git Product logo

parallel-collectors's Introduction

Java Stream API Virtual-Threads-enabled Parallel Collectors - overcoming limitations of standard Parallel Streams

build pitest Maven Central

Stargazers over time

Parallel Collectors is a toolkit easing parallel collection processing in Java using Stream API... but without limitations imposed by standard Parallel Streams.

list.stream()
  .collect(parallel(i -> blockingOp(i), toList()))
    .orTimeout(1000, MILLISECONDS)
    .thenAcceptAsync(System.out::println, executor)
    .thenRun(() -> System.out.println("Finished!"));

They are:

  • lightweight, defaulting to Virtual Threads (an alternative to Project Reactor for scenarios where a lighter solution is preferred)
  • powerful (the combined power of Stream API and CompletableFutures, allowing for timeout specification, composition with other CompletableFutures, and asynchronous processing)
  • configurable (flexibility with customizable Executors and parallelism levels)
  • non-blocking (eliminates the need to block the calling thread while awaiting results)
  • short-circuiting (if one of the operations raises an exception, remaining tasks will get interrupted)
  • non-invasive (they are just custom implementations of Collector interface, no magic inside, zero-dependencies, no Stream API internals hacking)
  • versatile (enables easy integration with existing Stream API Collectors)

Maven Dependencies

JDK 21+:

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>3.0.0</version>
</dependency>

JDK 8+:

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>2.6.0</version>
</dependency>
Gradle

JDK 21+:

compile 'com.pivovarit:parallel-collectors:3.0.0'`

JDK 8+:

compile 'com.pivovarit:parallel-collectors:2.6.0'`

Philosophy

Parallel Collectors are intentionally unopinionated, leaving responsibility to users for:

  • Proper configuration of provided Executors and their lifecycle management
  • Choosing appropriate parallelism levels
  • Ensuring the tool is applied in the right context

Review the API documentation before deploying in production.

Basic API

The main entrypoint is the com.pivovarit.collectors.ParallelCollectors class - which follows the convention established by java.util.stream.Collectors and features static factory methods returning custom java.util.stream.Collector implementations spiced up with parallel processing capabilities.

By design, it's obligatory to supply a custom Executor instance and manage its lifecycle.

All parallel collectors are one-off and must not be reused.

Available Collectors:

  • CompletableFuture<Stream<T>> parallel(Function) (uses Virtual Threads)

  • CompletableFuture<Collection<T>> parallel(Function, Collector) (uses Virtual Threads)

  • CompletableFuture<Stream<T>> parallel(Function, Executor, parallelism)

  • CompletableFuture<Collection<T>> parallel(Function, Collector, Executor, parallelism)

  • Stream<T> parallelToStream(Function) (uses Virtual Threads)

  • Stream<T> parallelToOrderedStream(Function) (uses Virtual Threads)

  • Stream<T> parallelToStream(Function, Executor, parallelism)

  • Stream<T> parallelToOrderedStream(Function, Executor, parallelism)

Batching Collectors

By default, all ExecutorService threads compete for each task separately - which results in a basic form of work-stealing, which, unfortunately, is not free, but can decrease processing time for subtasks with varying processing time.

However, if the processing time for all subtasks is similar, it might be better to distribute tasks in batches to avoid excessive contention:

Batching alternatives are available under the ParallelCollectors.Batching namespace.

Leveraging CompletableFuture

Parallel Collectors™ expose results wrapped in CompletableFuture instances which provides great flexibility and possibility of working with them in a non-blocking fashion:

CompletableFuture<List<String>> result = list.stream()
  .collect(parallel(i -> foo(i), toList(), executor));

This makes it possible to conveniently apply callbacks, and compose with other CompletableFutures:

list.stream()
  .collect(parallel(i -> foo(i), toSet(), executor))
  .thenAcceptAsync(System.out::println, otherExecutor)
  .thenRun(() -> System.out.println("Finished!"));

Or just join() if you just want to block the calling thread and wait for the result:

List<String> result = list.stream()
  .collect(parallel(i -> foo(i), toList(), executor))
  .join();

What's more, since JDK9, you can even provide your own timeout easily.

Examples

1. Apply i -> foo(i) in parallel on a custom Executor and collect to List
Executor executor = ...

CompletableFuture<List<String>> result = list.stream()
  .collect(parallel(i -> foo(i), toList(), executor));
2. Apply i -> foo(i) in parallel on a custom Executor with max parallelism of 4 and collect to Set
Executor executor = ...

CompletableFuture<Set<String>> result = list.stream()
  .collect(parallel(i -> foo(i), toSet(), executor, 4));
3. Apply i -> foo(i) in parallel on a custom Executor and collect to LinkedList
Executor executor = ...

CompletableFuture<List<String>> result = list.stream()
  .collect(parallel(i -> foo(i), toCollection(LinkedList::new), executor));
4. Apply i -> foo(i) in parallel on a custom Executor and stream results in completion order
Executor executor = ...

list.stream()
  .collect(parallelToStream(i -> foo(i), executor))
  .forEach(i -> ...);
5. Apply i -> foo(i) in parallel on a custom Executor and stream results in original order
Executor executor = ...

list.stream()
  .collect(parallelToOrderedStream(i -> foo(i), executor))
  .forEach(i -> ...);

Rationale

Stream API is a great tool for collection processing, especially if you need to parallelize execution of CPU-intensive tasks, for example:

public static void parallelSetAll(int[] array, IntUnaryOperator generator) {
    Objects.requireNonNull(generator);
    IntStream.range(0, array.length).parallel().forEach(i -> { array[i] = generator.applyAsInt(i); });
}

However, Parallel Streams execute tasks on a shared ForkJoinPool instance.

Unfortunately, it's not the best choice for running blocking operations even when using ManagedBlocker - as explained here by Tagir Valeev) - this could easily lead to the saturation of the common pool, and to a performance degradation of everything that uses it.

For example:

List<String> result = list.parallelStream()
  .map(i -> foo(i)) // runs implicitly on ForkJoinPool.commonPool()
  .toList();

In order to avoid such problems, the solution is to isolate blocking tasks and run them on a separate thread pool... but there's a catch.

Sadly, Streams can only run parallel computations on the common ForkJoinPool which effectively restricts the applicability of them to CPU-bound jobs.

However, there's a trick that allows running parallel Streams in a custom FJP instance... but it's not considered reliable (and can still induce oversubscription issues while competing with the common pool for resources)

Note, however, that this technique of submitting a task to a fork-join pool to run the parallel stream in that pool is an implementation "trick" and is not guaranteed to work. Indeed, the threads or thread pool that is used for execution of parallel streams is unspecified. By default, the common fork-join pool is used, but in different environments, different thread pools might end up being used.

Says Stuart Marks on StackOverflow.

Not even mentioning that this approach was seriously flawed before JDK-10 - if a Stream was targeted towards another pool, splitting would still need to adhere to the parallelism of the common pool, and not the one of the targeted pool [JDK8190974].

Dependencies

None - the library is implemented using core Java libraries.

Limitations

  • Upstream Stream is always evaluated as a whole, even if the following operation is short-circuiting. This means that none of these should be used for working with infinite streams. This limitation is imposed by the design of the Collector API.

  • Never use Parallel Collectors with Executors with RejectedExecutionHandler that discards tasks - this might result in a deadlock.

Good Practices

  • Consider providing reasonable timeouts for CompletableFutures in order to not block for unreasonably long in case when something bad happens (how-to)
  • Name your thread pools - it makes debugging easier (how-to)
  • Limit the size of a working queue of your thread pool (source)
  • Limit the level of parallelism (source)
  • A no-longer-used ExecutorService should be shut down to allow reclamation of its resources
  • Keep in mind that CompletableFuture#then(Apply|Combine|Consume|Run|Accept) might be executed by the calling thread. If this is not suitable, use CompletableFuture#then(Apply|Combine|Consume|Run|Accept)Async instead, and provide a custom Executor instance.

Words of Caution

While Parallel Collectors and Virtual Threads make parallelization easy, it doesn't always mean it's the best choice. Platform threads are resource-intensive, and parallelism comes with a cost.

Before opting for parallel processing, consider addressing the root cause through alternatives like DB-level JOIN statements, batching, data reorganization, or... simply selecting a more suitable API method.


See CHANGELOG.MD for a complete version history.

parallel-collectors's People

Contributors

armandino avatar dependabot-preview[bot] avatar dependabot[bot] avatar pivovarit avatar robpiwowarek avatar the-alchemist avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

parallel-collectors's Issues

Implement blocking alternatives

Blocking ParallelCollectors should not return CompletableFutures but results directly:

Set<String> result = Stream.of(1, 2, 3)
  .collect(inParallelToSet(i -> foo(), executor));

instead of:

CompletableFuture<Set<String>> result = Stream.of(1, 2, 3)
  .collect(inParallelToSet(i -> foo(), executor));

Implement collectors accepting mappers as parameters

public static <T, R> Collector<T, List<CompletableFuture<R>>, CompletableFuture<List<R>>> toListInParallel(Function<T, R> mapper, Executor executor)

So that users are not forced to implement an intermediate operation of creating Supplier instances.

Improve ThrottlingParallelCollector implementation

Internal ThrottlingParallelCollector could be improved.

For now, it firstly drains the source stream into a working queue and only then starts the whole processing. This could be problematic if we are dealing with a slow producer.

The consumption could start immediately without a need to wait for the whole stream to get consumed first.

Should unlimited parallelism level be allowed?

Currently, ParallelCollectors facade exposes Collector implementation that cap the parallelism level at Integer.MAX_VALUE which might be problematic if the tool ends up being used by someone that just skimmed over the documentation.

I'm thinking we should deprecate those static factories method and always force users to provide parallelism explicitly. Additionally, we could benefit from a simplified API (6 instead of 12 factory methods).

What do you think?
Another option is to still allow that but provide some reasonable defaults.

Optimize parallelism == 1 edge cases

If we have a look at base benchmarks(attached), you can see that throughput suffers a lot when dealing with non-batching cases when parallelism == 1, and in this case we're dealing with one big batch effectively.

This is quite a peculiar scenario since one would expect parallelism > 1 in most cases, however, it's still a valid scenario if someone wants to just perform the computation asynchronously without orchestrating things themselves.

Since there's no parallelism involved, we could easily just rely on a different Collector implementation that doesn't bring that much overhead.


/* 972ffbb @ Intel i7-4980HQ (8) @ 2.80GHz, 8u222
Benchmark                               (parallelism)   Mode  Cnt      Score     Error  Units
Bench.parallel_batch_collect                        1  thrpt    5  10218.766 ± 131.633  ops/s
// ...
Bench.parallel_batch_streaming_collect              1  thrpt    5  10715.432 ±  78.383  ops/s
// ...
Bench.parallel_collect                              1  thrpt    5     67.891 ±   1.357  ops/s
// ...
Bench.parallel_streaming                            1  thrpt    5     50.920 ±   1.569  ops/s
// ...
 */

Technical Writing

  • JavaDocs
  • README.md with examples
  • "Parallel Collection Processing in Java without Parallel Streams" @4comprehension.com
  • social media announcement
    • Reddit
    • Twitter
    • LinkedIn

Remove UnboundedDispatcher

And replace its usage with ThrottlingDispatcher with parallelism defaulting to Runtime.getRuntime().availableProcessors()

Support spring cloud sleuth

Is your feature request related to a problem? Please describe.
I would like to use parallel-collectors in a spring boot application with spring cloud sleuth for tracing requests.
The problem is that the code executed on parallel-collectors loses the request tracing information. I think it happens because com.pivovarit.collectors.Dispacher executes the tasks with a newLazySingleThreadExecutor() that spring cloud is not able to instrument.

Describe the solution you'd like
It would be very helpful if com.pivovarit.collectors.Dispacher allows us to configure the Executor that runs the tasks instead of creating and using a newLazySingleThreadExecutor() that spring cloud sleuth cannot instrument.
I think that spring cloud sleuth is using org.springframework.cloud.sleuth.instrument.async.TraceableExecutorService to be able to support tracing in Executors.

Describe alternatives you've considered
I've tried to remove the newLazySingleThreadExecutor() from Dispacher and use the executor to execute the tasks. It seems to work properly, but some tests have failed so it seems to be a bad solution.

Additional context
I've attached a simple spring boot with spring cloud sleuth application.
spring-boot-sleuth-parallel-collectors.zip

Start the application with ./mvnw spring-boot:run
Go to http://localhost:8080 in your navigator and check the application logs.

A message is written to the log when transforming some numbers with parallel-collectors, but the spring cloud sleuth "trace-id" is different for every thread:
Selection_721

The expected behavior would be to have the same "trace-id" for every thread:
Selection_722

Thank you so much for creating parallel-collectors!

MavenGate (CVE)

XFrog triggers an alert on package: com.pivovarit:parallel-collectors

Looks like com.pivovarit is not registered hence groupId can be claimed by malicious user

Introduce API methods that allow reusing native Java Collectors

  • <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Collector<R, ?, RR> c, Function<T, R> m, Executor e, int parallelism)
  • <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> m, Executor e, int parallelism)

A signature like this would allow reducing API surface by getting rid of *toList/toMap/toSet factories and replacing them with one API that allows users to pass their own Java Collectors.

So, instead of:

  • .collect(parallelToSet(i -> i + 2, executor, 2));
  • .collect(parallelToList(i -> i + 2, executor, 7));

One would expect:

  • CF<Set<Integer>> r = ...collect(parallel(toSet(), i -> i + 2, executor, 2));
  • CF<List<Integer>> r = ...collect(parallel(toList(), i -> i + 2, executor, 2));

Collector> parameter could be omitted and would result in a CF containing unprocessed Stream instances:

  • CF<Stream<Integer>> r = ...collect(parallel(i -> i + 2, executor, 2));
  • CF<Stream<Integer>> r = ...collect(parallel(i -> i + 2, executor, 2));

The difference for users is subtle but would allow to greatly simplify the implementation.

Extract common logic to ParallelDispatcher

public class ParallelDispatcher<T> {

    final ExecutorService dispatcher = newSingleThreadExecutor(new CustomThreadFactory());
    final Executor executor;
    final Queue<Supplier<T>> workingQueue;
    final Queue<CompletableFuture<T>> pendingQueue;

    public ParallelDispatcher(Executor executor, Queue<Supplier<T>> workingQueue, Queue<CompletableFuture<T>> pendingQueue) {
        this.executor = executor;
        this.workingQueue = workingQueue;
        this.pendingQueue = pendingQueue;
    }
}

Implement ParallelCollectors.toCombinedFuture

Let's add a method returning a simple collector that collects a stream of futures into a single future.

public static void main(String[] args) {
        CompletableFuture<List<Integer>> collect = Stream.of(1, 2, 3)
          .map(i -> CompletableFuture.supplyAsync(() -> i))
          .collect(toFuture());
    }

    public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> toFuture() {
        return Collectors.collectingAndThen(Collectors.toList(), list -> {
            CompletableFuture<List<T>> future = CompletableFuture
              .allOf(list.toArray(new CompletableFuture[0]))
              .thenApply(__ -> list.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));

            for (CompletableFuture<T> f : list) {
                f.whenComplete((integer, throwable) -> {
                    if (throwable != null) {
                        future.completeExceptionally(throwable);
                    }
                });
            }

            return future;
        });
    }

Add an example integrated with Spring Boot

Is your feature request related to a problem? Please describe.

I would like to add an example integrated in a Spring Boot Application

Describe the solution you'd like

I would like to add an Example explained as Use Case with the library.

Many thanks in advance.

Remove repetition from tests

Currently, most tests are super repetitive copy-pastes of each other.

All because of the time and pain related to working with Java generics... but I still believe this can be simplified to avoid all this form of copy-pastes.

Inconsistent behavior when parallel=1

To parallelize some side-effects I wrote following code which works as expected on my local machine:

  public static void main( String[ ] args ) {
        var executor = Executors.newFixedThreadPool(10 );
        try {
            var list = List.of( "A", "B" );

            list.stream( )
                    .collect( ParallelCollectors.parallel( TestCase::processRecord, executor) )
                    .join( );
        } finally {
            executor.shutdown( );
        }
    }

    private static String processRecord( String record ) {
        System.out.println( Thread.currentThread( ).getName( ) );
        return record;
    }
$ docker run --rm testcase
pool-1-thread-2
pool-1-thread-1

However it took me a while to figure out why it doesn't when deployed on our cluster (Mesos & K8S). Eventually, I figured out that since release 2.3.0, parallel-collectors has an inconstent behavior when parallel=1 and parallel>1.

$ docker run --rm -cpuset-cpus=0 testcase
$ docker run --rm -cpuset-cpus=0,1 testcase
$ docker run --rm -cpuset-cpus=0,1,2 testcase
pool-1-thread-2
pool-1-thread-1

When parallel=1, mapping function is only invoked when the stream returned by parallel().join() is consumed by a terminal operation (some like count won't as the mapping function can be optimized away).

  public static void main( String[ ] args ) {
        var executor = Executors.newFixedThreadPool( 10 );
        try {
            var list = List.of( "A", "B" );

            System.out.println( "count" );
            list.stream( )
                    .collect( ParallelCollectors.parallel( TestCase::processRecord, executor ) )
                    .join( )
                    .count( );

            System.out.println( "toList" );
            list.stream( )
                    .collect( ParallelCollectors.parallel( TestCase::processRecord, executor ) )
                    .join( )
                    .collect( Collectors.toList( ) );
        } finally {
            executor.shutdown( );
        }
    }
$ docker run --cpuset-cpus=0  --rm testcase
count
toList
main
main

When parallel>1, mapping is always performed in provider executor in an eager way (no need to consume the stream).

This seems a bug as Javadoc says that parallel computation will be performed on a custom executor which is not true in this case. It is also dangerous as it is seems too easy to write the same bogus code than me.

I would propose to revert to 2.2.0 behaviour.

Make it possible to configure the dispatcher instance in the Dispatcher

Is your feature request related to a problem? Please describe.
dispatcher in Dispatcher is used to wrap the executor that is passed in which makes it hard to use Spring Security Concurrency Support
Describe the solution you'd like
Make it possible to configure the dispatcher implementation (or maybe some other solution that would fix this)

Describe alternatives you've considered
I don't see any alternatives

Additional context
The default (and recommended) implementation of security context holder strategy uses thread locals to keep track of the current security context so by default it doesn't get passed on to code executing in the threadpools so that's where the concurrency support comes in: it injects the security context from the current thread to the new one.
So when we use parallel-collectors, the security context is passed on to the dispatcher thread which does not pass it on to the executor thread.

Please avoid suggesting parallel N+1 database interaction patterns

The current readme suggests this example here:

list.stream()
  .collect(parallelToList(i -> fetchFromDb(i), executor, 2)).orTimeout(1000, MILLISECONDS)
  .thenAccept(System.out::println)
  .thenRun(() -> System.out.println("Finished!"));

And it reiterates on the example later on. Please do not suggest fetching individual values from the database one by one, by ID, especially when an ID set is available, which there obviously is, in the presence of such an ID list / stream.

A lot of junior developers will think "hey we can speed up our app by distributing our trivial query across several threads". The result is saturated threads both on the client and in the database, when batch and/or bulk SQL operations would have been much better for both systems.

As API / tool vendors, it is our responsibility to show simple examples that do not misguide junior developers into thinking that a quick fix will really solve their problems.

Short-circuit when exception gets thrown

In case an exception gets thrown, collectors should immediately terminate operations and return a future completed exceptionally.

@Test
void shouldCollectToCollectionAndShortCircuitOnException() {

    // given
    executor = threadPoolExecutor(1);
    LongAdder counter = new LongAdder();

    try {
        IntStream.generate(() -> 42).boxed().limit(10)
          .map(i -> supplier(() -> {
              counter.increment();
              throw new IllegalArgumentException();
          }))
          .collect(parallelToList(executor, 1))
          .join();
    } catch (Exception e) {
    }

    assertThat(counter.longValue()).isOne();
}

Is Filtering/FlatMap supported?

Hi,

Its a very nice project, and I want to use it.
I have a question,
can I use filtering and flatMap functionality that is available in streams in Parallel-collectors?

Implement parallelConsume()

public static <T> Collector<Supplier<T>, List<CompletableFuture<T>>, CompletableFuture<Void>> parallelConsumeOn(Executor executor) {
        requireNonNull(executor, "executor can't be null");
        // ...
    }

    public static <T> Collector<Supplier<T>, List<CompletableFuture<T>>, CompletableFuture<Void>> parallelConsumeOn(Executor executor, int parallelism) {
        requireNonNull(executor, "executor can't be null");
        // ...
    }

Enable batching scheduling strategy

For now, all ParallelCollectors schedule each Stream item to be processed separately.

This works just fine in most cases since it provides a natural work-stealing by letting all threads compete for each job separately, but can be problematic if the original Stream contains significant number of elements.

Describe the solution you'd like
Introduce API methods that allow choosing a scheduling strategy where batching is one of them.

This could be achieved by introducing a static inner class Batching into ParallelCollectors to provide natural namespacing:

ParallelCollectors.Batching.parallel(i -> i, executor)

This should be a preferred option since adding more parameters into existing methods would make them unacceptable.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.