Git Product home page Git Product logo

cyclops's Introduction

Getting Cyclops X (10)

Tutorial :

What's new in Cyclops X (cyclops 10)

cyclops-data-types

  • Fast purely functional datastructures (Vector, Seq / List, LazySeq / LazyList, NonEmptyList, HashSet, TreeSet, TrieSet, HashMap, LinkedMap, MultiMap, TreeMap, BankersQueue, LazyString, Discrete Interval Encoded Tree, Zipper, Range, Tree, DifferenceList, HList, Dependent Map )
  • Structural Pattern Matching API (deconstruct algebraic product and sum types)
  • Improved type safety via the removal of unsafe APIs -- E.g. Unlike Optional, Option has no get method (which could throw a null pointer) -- New data structures do not support operations that would throw exceptions (you can't call head on an empty list for example)
  • Eager and Lazy alternatives for most datastructures (Option is eager, Maybe is lazy + reactive)
  • Improved naming of types (Function1-8 rather than Fn1-8, Either not Xor)
  • Group id is changed to com.oath.cyclops
  • Versioning between cyclops-react and cyclops is merged on cyclops versioning scheme (version 10 = Cyclops X)
  • Light weight dependencies : reactive-streams API, KindedJ & Agrona
  • JVM Polyglot Higher Kinded Types Support with KindedJ

Modules

Gradle

where x.y.z represents the latest version

compile 'com.oath.cyclops:cyclops:x.y.z'

Maven

<dependency>
    <groupId>com.oath.cyclops</groupId>
    <artifactId>cyclops</artifactId>
    <version>x.y.z</version>
</dependency>

screen shot 2016-02-22 at 8 44 42 pm

Powerful Streams and functional data types for building modern Java 8 applications. We extend JDK interfaces where possible for maximum integration.

This is the 10.x branch for 2.x branch click the link below

License

cyclops is licensed under the Apache 2.0 license.

http://www.apache.org/licenses/LICENSE-2.0

Thanks to our Sponsors

cyclops's People

Contributors

0xflotus avatar atapin avatar awturner avatar c0nscience avatar chenzhang22 avatar colinfkennedy avatar dmitraver avatar earlzero avatar felixwiemuth avatar gitter-badger avatar greg2001 avatar h3xstream avatar jijisv avatar jlorenzen avatar johnmcclean avatar kwonglau avatar lauralaureus avatar lizhoulee avatar lukaseder avatar morrowgi avatar pepijno avatar quike avatar ramswaroop avatar scr-oath avatar seanf avatar shisheng-1 avatar tkountis avatar wyang14 avatar zeouterlimits avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cyclops's Issues

Status getMillis method is not correct

Currently it will return your Nano seconds * 10000000, but it should be a divide.
public final long getElapsedMillis(){
return elapsedNanos / 1000000;
}

Although, this will give you a rounding error, which may or may not matter.

Zip should zip the underlying Streams of CompletableFutures

Zip currently relies on the Seq implementation which takes two iterators on the top level Streams. This means that Zip populates the new Zipped Stream as results from those Streams flow in.

We should keep this functionality but make it available under a different name.

I think it makes more sense when zipping two Streams if the original order of the data is maintained. We can do this by Zipping the underlying Streams of CompletableFutures, the new Stream can be populated asynchronously.

LazyFutureStream : issue when using filter with forEach / collect /toList etc

It works with run, runOnCurrent etc
Issue occurs when filtering on in a stream that includes an operation that makes use of queues (e.g. flatMap)

         LazyReact
            .parallelCommonBuilder()
            .iterateInfinitely("", last -> nextFile())
            .limit(100)
            .map(this::readFileToString)
            .map(this::parseJson)
                .batchByTime(1, TimeUnit.MICROSECONDS)
            .peek(batch -> System.out.println("batched : " + batch))
            .filter(c->!c.isEmpty())
            .map(this::processOrders)
            .forEach(next -> count2= count2+next.size());

Potential Operators for 1.0

  • startsWith : true if stream starts with value, values or Stream : by result position & Future position
  • endsWith : true if stream ends with value, values or Stream : by result & Future position
  • timeInterval : map to time interval : with operators based on result & Future position
  • timestamp : add a time stamp to each event
  • slidingByTime : sliding window by time
  • difference : difference between two streams : by result & Future position
  • intersection : intersection between two streams : by result & Future position
  • combinations : combinations present in a Stream : by result & Future position
  • permutations : permutations present in a Stream : by result & Future position
  • isInfinite : test if infinite stream
  • indexOf : index of occur of value, values, Stream or positive test : by result & Future position
  • prepend : prepend value, values or Stream : by result position & Future position
  • insertAt : insert value, values or Stream at index : by result position & Future position
  • skipLast, limitLast : skip and limit from the right : by result & Future position
  • xOf : batch after set number of operations completed
  • cycleWhile : repeat while condition holds
  • cycleUntil : repeat until condition holds
  • deleteBetween : delete between index : by result & Future position
  • custom : inject a custom, user defined operator

Offer EagerFutureStream future based operators methods on LazyFutureStream where is makes sense.

Operator organisation

simple-react v0.99 will introduce a new operator that returns a set of asynchronous terminal operations (the futureOperations operator).

A similar set of organisational operators could be used to organise access to other operators - in particular to separate access to operators that act on bare metal futures and those that operate on results.

operateOnFutures : switch to a mode where all available operators act directly on underlying futures
standardOperation : return to the standard view from operateOnFutures view

Add a generate method to LazyReact

LazyReact builder = new LazyReact(100,100);

Currently to process a generative data supply we have to similate generate with limit with range functionality. E.g.

builder.range(0,max)
           .map(i->dataPool.next())
           .forEach(data -> work(data);

Generate with a limit would be more intuitive

builder.generate(()->dataPool.next())
           .limit(max)
           .forEach(data -> work(data);

Merge EagerFutureStream and SimpleReactStream

Create a single Eager based 'stream' - not a JDK Stream. Passing data to a queue for aggregation doesn't work as well with Eager Streams. The new Stream should not use operations that result in batchng - a small, simple API for primarily for handling blocking I/O asyncrhonously and in parallel.

Performance enhancement : add autoOptimize option for LazyReact

LazyFutureStreams with multi-threaded executors can 'fan-out' operations across all executors. Subsequent operations should occur on the calling thread for speed. Some more advanced operations require data sharing across threads and make use of a Many Producer Single Consumer Queue. Fan out should occur again once data is collected from the queue.

An autoOptimise option should manage automatic fanOut

Implement Reactive Streams for JDK Streams and jOOλ Seq

  • FutureStreamSubscriber can already produce JDK 8 Streams from a Reactive Streams subscription, it just needs to be exposed.
  • The FutureStreamPublishers can be adapted to convert JDK 8 Streams / jOOλ Seq's into FutureStreams before publication.

Optimize terminal operations

Terminal operations typically run on a single thread (excluding parallel reduction options -which make use of JDK parallel Streams).

Any processing in terminal ops (such as anyMatch, xMatch, noneMatch) that can be moved onto the Futures should be, that aren't already should be (it's currently available for forEach)

LazyFutureStream

Hi

First congratulation for this great tools.

i want to process a batch tool, in parallel for each files of the File.walk stream , how to mixed them ?
like

    LazyFutureStream.parallel(Files.walk(dir))
            .forEach(id -> { //do work here
                        System.out.println(id + "\t" + Thread.currentThread());
                    });

any idea ?

Best regards
Bruno

Performance Optimisation : Investigate replacing CompletableFuture with a custom, optimised Future implementation

Currently simple-react is built on top of a JDK 8 Stream of CompletableFutures. This involves the Stream creating or managing a CompletableFuture for each element in the Stream. On top of this the Stream passes the execution pipeline to each element (via the CompletableFuture interface and methods such as thenApply or thenApplyAsync). CompletableFuture is an immutable functional class, and returns a new instance each time, so we create an additional set of Objects proportional to the number of operations to be applied per element in the Stream. This is ripe for optimisation.

A simple benchmark test shows that throughput measuring the repeated application of the identity function, on a Mac Book Pro, more than halves from 330 million applications per second to less than half that (137 million application per second) when the process is allowed to run long enough to cause the garbage collector to kick in. A large portion of the Objects that build up in memory are CompletableFuture instances.

The execution pipeline is (logically) shared, it need only be defined once. This change alone could reduce the number of new Objects to 1 per element.

There are a finite number of Future tasks active per Stream at any given point, recycling Future instances via Object pooling would also significantly reduce the number Objects created. For a large Stream of data the number of Objects in total (Futures + elements) would trend towards the number of elements alone (as the number of elements dwarfs the number of reused Future objects).

Add explicit async operator equivalents where explicit sync operators exist today

Such as

thenAsync
peekAsync
filterAsync

Overloaded versions that allow a different Executor to be supplied should also be provided. This could be especially useful for Streams which fan out & in at different stages e.g. use async() to fan out and distribute work across threads, before continuing using sync methods. Advanced operators force a fan in - via a Many Producer / Single Consumer Queue, and user code currently has to fan back out - again via switching between async() and sync() - being able to write

  lazyReact.react(()->service1,()->service2,()->service3)
                  .thenSync(this::process)
                  .flatMap(Collection::stream)
                  .thenAsync(this::save)
                  .run();

is a little bit simpler than

 lazyReact.react(()->service1,()->service2,()->service3)
                  .sync() // continue on calling thread
                  .map(this::process)
                  .flatMap(Collection::stream)
                  .async() //resubmit to an Executor
                  .map(this::save)
                  .run();

Issue constructing a LazyFutureStream from a single CompletableFuture

Works fine for SimpleReact, but not LazyReact

This works, but with LazyReact it doesn't

  new SimpleReact().from(
     this.restClient.postForEntity(apiURL, new HttpEntity(query,headers), Result.class))
                    .peek(System.out::println)
                    .then(action->asyncResponse.resume(action))
                    .peek(System.out::println)
                    .onFail(error->{ error.printStackTrace(); return asyncResponse.resume(error.getCause());})
                    .peek( status->bus.post(RequestEvents.finish(query, correlationId)));

Objects can be read and written from the Object Pool on multiple threads

FuturePool is written to support single threaded reads and writes, but multiple threads can write to the pool. The main pathway is where the thread used by the underlying Stream both reads and writes to the pool. When a toQueue operation is completed however, a Future can returned itself to the pool when it completes - this is on a different thread.

FuturePool needs to use a thread-safe collection such as an AtomicArrayReference to store pooled objects.

This is on master, but on any released version of simple-react.

Merge EagerFutureStream and SimpleReactStream, remove time based operators from EagerFutureStream

Eager Streams (EagerFutureStream and SimpleReactStream) perform a terminal operation on the underlying Stream at each phase, this means for any (more complex) operators (see list of operators that are batched for EagerFuturestream) that require data to flow though a queue they need to pull all data in from that Queue before the subsequent stages can be started (in other words data must flow through the queue to the next stage as a batch).

This makes time based operators such as batchByTime, jitter, debounce, chunked etc. less useful than for LazyFutureStreams. Reducing the operator set for EagerFutureStream would reduce the gap between it and SimpleReactStream - and perhaps they should be merged, leaving 2 streams

LazyFutureStream for infinite async streaming
and
EagerFutureStream, simpler API, for processing of small batches

Allow pluggable strategies for non-blocking queues, when queue bound reached

Producers of NonBlocking Queues should be able to either

  1. drop data
  2. spin / dominate current thread until space available in Queue
  3. sleep / park for configurable time period before retry
  4. Some combination of the above

Consumers should be able to sleep for a configurable time period or spin until data available.

Clean up QueueFactory class

NonBlockingQueue's implement BlockingQueue interface which is confusing.
Create new interface that wraps both and break out into separate top level classes.

EagerFutureStream should use xxxFutures methods by default

LazyFutureStream can perform complex operations on results asynchronously, because evaluation is lazy (e.g. map (CompletableFuture::completedFuture) doesn't have to be completed immediately before continuing).

Because EagerFutureStream materializes a collection at each phase, any operation that requires a mapping of concrete results back to CompletableFutures needs to be completed before the phase definition can be complete. This creates a bottlekneck where EagerFutureStream behaves syncrhonously.

xxxxFutures methods should become the default behaviour for EagerFutureStream by swapping with their results based equialent. The methods can be called xxxxOnResults.

Improve javadoc

Include javadoc for each operator (can't rely on @see to cyclops-sequence-api for example). Fix formatting on each operator.

Integrate cyclops-streams

LazyFutureStream should extends (cyclops-streams)SequenceM which extends (jool)Seq which extends (JDK)Stream. Taking advantage of functionality available for the Streams eco-system across 3 other projects. This will add all the SequenceM operators to LazyFutureStream.

SequenceM should also be made available as a top level Stream.

SequenceM - fast sequential Stream, can execute asynchronously on a separate (single thread), supports Reactive Streams and connectable hotstreams.

LazyFutureStream - advanced stream functionality particularly useful for multi-threaded I/O - based on FastFuture. Supports Reactive Streams and connectable hotstreams. Can be used as a sequential Stream, but SequenceM would be much faster (conversely LazyFutureStream much more performant for multi-threaded blocking I/O).

SimpleReactStream - merged SimpleReact and EagerFutureStreams - simpler API, eager behaviour - for blocking I/O - based on CompletableFuture (allOf/ anyOf also exposed)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.