#RxJava8 - Reactive Extensions using Java 8
##Overview
Implementation of core features of reactive extension using Java 8. It is inspired by Microsoft's Rx library at https://rx.codeplex.com/, but it doesn't support all of their APIs.
##Building
git clone [email protected]:bhatti/RxJava8.git
- Compile and build jar file using
./gradlew jar
- For now, you will have to copy and add jar file manually in your application.
##Version
- 0.1 : experimental
##License
- MIT
##How To Guide
Creating Observable from Array of objects
Observable.from("Erica", "Matt", "John", "Mike").subscribe(System.out::println,
Throwable::printStackTrace, () -> System.out.println("done"));
Creating Observable from Collection
List<String> names = Arrays.asList("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
Observable.from(names).subscribe(System.out::println,
Throwable::printStackTrace, () -> System.out.println("done"));
Creating Observable from Stream
Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
// note third argument for onComplete is optional
Observable.from(names).subscribe(name -> System.out.println(name),
error -> error.printStackTrace());
Creating Observable from Iterator
Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
Observable.from(names.iterator()).subscribe(name -> System.out.println(name),
error -> error.printStackTrace());
Creating Observable from Spliterator
List<String> names = Arrays.asList("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
Observable.from(names.spliterator()).subscribe(System.out::println,
Throwable::printStackTrace);
Creating Observable from a single object
Observable.just("value").subscribe(v -> System.out.println(v),
error -> error.printStackTrace());
// if a single object is collection, it would be treated as a single entity, e.g.
Observable.just(Arrays.asList(1, 2, 3)).subscribe( num -> System.out.println(num),
error -> error.printStackTrace());
Creating Observable for an error
Observable.throwing(new Error("test error")).subscribe(System.out::println,
error -> System.err.println(error));
// this will print error
Creating Observable from a consumer function
Observable.create(observer -> {
for (String name : names) {
observer.onNext(name);
}
observer.onCompleted();
}).subscribe(System.out::println, Throwable::printStackTrace);
Creating Observable from range
// Creates range of numbers starting at from until it reaches to exclusively
Observable.range(4, 8).subscribe(num -> System.out.println(num),
error -> error.printStackTrace());
// will print 4, 5, 6, 7
Creating Observable for integer numbers
// Creates infinite integers starting at given number and incremented by 1
Observable.integers(4).limit(4).subscribe(num -> System.out.println(num),
error -> error.printStackTrace());
// will print 4, 5, 6, 7
Creating empty Observable - it would call onCompleted right away
Observable.empty().subscribe(System.out::println,
Throwable::printStackTrace, () -> System.out.println("Completed"));
Creating never Observable - it would not call any of call back methods
Observable.never().subscribe(System.out::println, Throwable::printStackTrace);
Changing Scheduler
By default Observable notifies observer asynchronously using thread-pool scheduler but you can change default scheduler as follows:
Using thread-pool scheduler
// We are creating thread pool of size 4 here
Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newThreadPoolScheduler(4)).
subscribe(System.out::println, Throwable::printStackTrace);
Using new-thread scheduler - it will create new thread
Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newNewThreadScheduler()).
subscribe(System.out::println, Throwable::printStackTrace);
Using timer thread with interval - it will notify at each interval
Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newTimerSchedulerWithMilliInterval(1000)).
subscribe(System.out::println, Throwable::printStackTrace);
// this will print each name every second
Using immediate scheduler
This scheduler will call callback functions right away on the same thread. You can use this scheduler for a smaller amount of data that you want to consume synchronously. However, you cannot unsubscribe as it runs on the same thread.
Observable.from("Erica", "Matt", "John").
subscribeOn(Scheduler.newImmediateScheduler()).
subscribe(System.out::println, Throwable::printStackTrace);
// this will print each name every second
Counting
Count method stores number of elements that is then passed to the subscriber
Count
Observable.from("Erica", "Matt", "John").count().
subscribe(System.out::println, Throwable::printStackTrace);
Transforming
Observables keep sequence of items as streams and they support map/flatMap operation as supported by standard Stream class, e.g.
Map
Observable.from("Erica", "Matt", "John").map(name -> name.hashCode()).
subscribe(System.out::println, Throwable::printStackTrace);
FlatMap
FlatMap merges list of lists into a single list when doing transformation, e.g.
Stream<List<Integer>> integerListStream = Stream.of( Arrays.asList(1, 2),
Arrays.asList(3, 4), Arrays.asList(5));
Observable.from(integerListStream).flatMap(integerList -> integerList.stream()).
subscribe(System.out::println, Throwable::printStackTrace);
Filtering
Observables supports basic filtering support as provided by Java Streams, e.g.
Filter
Observable.from("Erica", "Matt", "John", "Mike", "Scott",
"Alex", "Jeff", "Brad").filter(name -> name.startsWith("M")).
subscribe(System.out::println, Throwable::printStackTrace);
// This will only print Matt and Mike
Skip - skips given number of elements
Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
Observable.from(names).skip().subscribe(System.out::println,
Throwable::printStackTrace);
// This will skip Erica and John
Limit
Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
Observable.from(names).limit(2).subscribe(System.out::println,
Throwable::printStackTrace);
// This will only print first two names
Distinct - removes duplicates
Stream<String> names = Stream.of("Erica", "Matt", "John", "Erica");
Observable.from(names).distinct.subscribe(System.out::println,
Throwable::printStackTrace);
// This will print Erica only once
Merge - concates two observable data
Observable<Integer> observable1 = Observable.from(Stream.of(1, 2, 3));
Observable<Integer> observable2 = Observable.from(Stream.of(4, 5, 6));
observable1.merge(observable2).subscribe(System.out::println,
Throwable::printStackTrace);
// This will print 1, 2, 3, 4, 5, 6
Zip - pushes data from observable data as a tuple
Observable<String> observable1 = Observable.from("One", "Two", "Three");
Observable<Integer> observable2 = Observable.from(1, 2, 3);
observable1.zip(observable2).subscribe(System.out::println,
Throwable::printStackTrace);
// This will pass instance of Tuple object to System.out.println, which would print each tuple as [One, 1], [Two, 2], [Three, 3]
Parallel - parallel processing internal stream
Observable<Integer> observable = Observable.range(1, 101)
.subscribeOn(Scheduler.newNewThreadScheduler())
.parallel().subscribe(System.out::println,
Throwable::printStackTrace);
// This will print 1, 2, 3, ... 100
toList - returns internal objects as list
List<Integer> list = Observable.from(1, 2).merge(Observable.from(3, 4)).toList();
// This will return list of 1, 2, 3, 4
toSet - returns internal objects as set
Set<Integer> set = Observable.from(1, 2).merge(Observable.from(3, 4)).merge(Observable.just(3)).toSet();
// This will return set containg 1, 2, 3, 4 (unordered and without any duplicates)
API Doc and Samples
Support or Contact
Email bhatti AT plexobject DOT com for any questions or suggestions.