Git Product home page Git Product logo

rxjava8's Introduction

#RxJava8 - Reactive Extensions using Java 8

##Overview

Implementation of core features of reactive extension using Java 8. It is inspired by Microsoft's Rx library at https://rx.codeplex.com/, but it doesn't support all of their APIs.

##Building

  • Download and install Gradle.
  • Download and install Java 8.
  • Checkout code using
git clone [email protected]:bhatti/RxJava8.git
  • Compile and build jar file using
./gradlew jar
  • For now, you will have to copy and add jar file manually in your application.

##Version

  • 0.1 : experimental

##License

  • MIT

##How To Guide

Creating Observable from Array of objects

   Observable.from("Erica", "Matt", "John", "Mike").subscribe(System.out::println, 
      Throwable::printStackTrace, () -> System.out.println("done"));

Creating Observable from Collection

   List<String> names = Arrays.asList("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   Observable.from(names).subscribe(System.out::println, 
      Throwable::printStackTrace, () -> System.out.println("done"));

Creating Observable from Stream

   Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   // note third argument for onComplete is optional
   Observable.from(names).subscribe(name -> System.out.println(name), 
      error -> error.printStackTrace());

Creating Observable from Iterator

   Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   Observable.from(names.iterator()).subscribe(name -> System.out.println(name), 
      error -> error.printStackTrace());

Creating Observable from Spliterator

   List<String> names = Arrays.asList("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   Observable.from(names.spliterator()).subscribe(System.out::println, 
      Throwable::printStackTrace);

Creating Observable from a single object

   Observable.just("value").subscribe(v -> System.out.println(v), 
      error -> error.printStackTrace());
   // if a single object is collection, it would be treated as a single entity, e.g.
   Observable.just(Arrays.asList(1, 2, 3)).subscribe( num -> System.out.println(num), 
      error -> error.printStackTrace());

Creating Observable for an error

   Observable.throwing(new Error("test error")).subscribe(System.out::println, 
      error -> System.err.println(error));
   // this will print error 

Creating Observable from a consumer function

   Observable.create(observer -> {
      for (String name : names) {
         observer.onNext(name);
      }
      observer.onCompleted();
   }).subscribe(System.out::println, Throwable::printStackTrace);

Creating Observable from range

   // Creates range of numbers starting at from until it reaches to exclusively
   Observable.range(4, 8).subscribe(num -> System.out.println(num), 
      error -> error.printStackTrace());
   // will print 4, 5, 6, 7

Creating Observable for integer numbers

   // Creates infinite integers starting at given number and incremented by 1
   Observable.integers(4).limit(4).subscribe(num -> System.out.println(num), 
      error -> error.printStackTrace());
   // will print 4, 5, 6, 7

Creating empty Observable - it would call onCompleted right away

   Observable.empty().subscribe(System.out::println, 
      Throwable::printStackTrace, () -> System.out.println("Completed"));

Creating never Observable - it would not call any of call back methods

   Observable.never().subscribe(System.out::println, Throwable::printStackTrace);

Changing Scheduler

By default Observable notifies observer asynchronously using thread-pool scheduler but you can change default scheduler as follows:

Using thread-pool scheduler

   // We are creating thread pool of size 4 here
   Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newThreadPoolScheduler(4)).
      subscribe(System.out::println, Throwable::printStackTrace);

Using new-thread scheduler - it will create new thread

   Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newNewThreadScheduler()).
      subscribe(System.out::println, Throwable::printStackTrace);

Using timer thread with interval - it will notify at each interval

   Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newTimerSchedulerWithMilliInterval(1000)).
      subscribe(System.out::println, Throwable::printStackTrace);
   // this will print each name every second

Using immediate scheduler

This scheduler will call callback functions right away on the same thread. You can use this scheduler for a smaller amount of data that you want to consume synchronously. However, you cannot unsubscribe as it runs on the same thread.

   Observable.from("Erica", "Matt", "John").
      subscribeOn(Scheduler.newImmediateScheduler()).
      subscribe(System.out::println, Throwable::printStackTrace);
   // this will print each name every second

Counting

Count method stores number of elements that is then passed to the subscriber

Count

   Observable.from("Erica", "Matt", "John").count().
      subscribe(System.out::println, Throwable::printStackTrace);

Transforming

Observables keep sequence of items as streams and they support map/flatMap operation as supported by standard Stream class, e.g.

Map

   Observable.from("Erica", "Matt", "John").map(name -> name.hashCode()).
      subscribe(System.out::println, Throwable::printStackTrace);

FlatMap

FlatMap merges list of lists into a single list when doing transformation, e.g.

   Stream<List<Integer>> integerListStream = Stream.of( Arrays.asList(1, 2), 
      Arrays.asList(3, 4), Arrays.asList(5));
   Observable.from(integerListStream).flatMap(integerList -> integerList.stream()).
      subscribe(System.out::println, Throwable::printStackTrace);

Filtering

Observables supports basic filtering support as provided by Java Streams, e.g.

Filter

   Observable.from("Erica", "Matt", "John", "Mike", "Scott", 
      "Alex", "Jeff", "Brad").filter(name -> name.startsWith("M")).
      subscribe(System.out::println, Throwable::printStackTrace);
   // This will only print Matt and Mike

Skip - skips given number of elements

   Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   Observable.from(names).skip().subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will skip Erica and John

Limit

   Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   Observable.from(names).limit(2).subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will only print first two names

Distinct - removes duplicates

   Stream<String> names = Stream.of("Erica", "Matt", "John", "Erica");
   Observable.from(names).distinct.subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will print Erica only once

Merge - concates two observable data

   Observable<Integer> observable1 = Observable.from(Stream.of(1, 2, 3));
   Observable<Integer> observable2 = Observable.from(Stream.of(4, 5, 6));
   observable1.merge(observable2).subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will print 1, 2, 3, 4, 5, 6

Zip - pushes data from observable data as a tuple

   Observable<String> observable1 = Observable.from("One", "Two", "Three");
   Observable<Integer> observable2 = Observable.from(1, 2, 3);
   observable1.zip(observable2).subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will pass instance of Tuple object to System.out.println, which would print each tuple as [One, 1], [Two, 2], [Three, 3]

Parallel - parallel processing internal stream

   Observable<Integer> observable = Observable.range(1, 101)
                   .subscribeOn(Scheduler.newNewThreadScheduler())
                   .parallel().subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will print 1, 2, 3, ... 100

toList - returns internal objects as list

   List<Integer> list = Observable.from(1, 2).merge(Observable.from(3, 4)).toList();
   // This will return list of 1, 2, 3, 4

toSet - returns internal objects as set

   Set<Integer> set = Observable.from(1, 2).merge(Observable.from(3, 4)).merge(Observable.just(3)).toSet();
   // This will return set containg 1, 2, 3, 4 (unordered and without any duplicates)

API Doc and Samples

Support or Contact

Email bhatti AT plexobject DOT com for any questions or suggestions.

rxjava8's People

Contributors

bhatti avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

Forkers

xgrommx

rxjava8's Issues

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.