Git Product home page Git Product logo

rxreplayingshare's Introduction

RxJava Replaying Share

ReplayingShare is an RxJava 3 transformer which combines replay(1), publish(), and refCount() operators.

Unlike traditional combinations of these operators, ReplayingShare caches the last emitted value from the upstream observable or flowable only when one or more downstream subscribers are connected. This allows expensive upstream sources to be shut down when no one is listening while also replaying the last value seen by any subscriber to new ones.

replayingShare() replay(1).refCount() publish().refCount() replay(1).autoConnect(1)
Disconnects from upstream when there are no subscribers
Replays the latest value to new subscribers when other subscribers are active
Replays the latest value to new subscribers when no other subscribers are active

marble diagram

Usage

Apply with compose to an upstream Observable or Flowable and cache the resulting instance for all new subscribers.

@Singleton class Chart {
  private final Observable<Bitmap> chart;

  @Inject Chart(Observable<List<Data>> data) {
    chart = data.debounce(1, SECONDS)
        .map(list -> bigExpensiveRenderChartToBitmapFunction(list))
        .compose(ReplayingShare.instance());
  }

  Observable<Bitmap> data() {
    return chart;
  }
}

Kotlin users can use the operator via an extension function.

@Singleton class Chart
@Inject constructor(data: Observable<List<Data>>) {
  val chart: Observable<Bitmap> = data.debounce(1, SECONDS)
      .map(list -> bigExpensiveRenderChartToBitmapFunction(list))
      .replayingShare()
}

Note: This operator is designed for composition with infinite or extremely long-lived streams. Any terminal event will clear the cached value.

Download

Gradle:

implementation 'com.jakewharton.rx3:replaying-share:3.0.0'
// Optional:
implementation 'com.jakewharton.rx3:replaying-share-kotlin:3.0.0'

Maven:

<dependency>
  <groupId>com.jakewharton.rx3</groupId>
  <artifactId>replaying-share</artifactId>
  <version>3.0.0</version>
</dependency>
<!-- Optional: -->
<dependency>
  <groupId>com.jakewharton.rx3</groupId>
  <artifactId>replaying-share-kotlin</artifactId>
  <version>3.0.0</version>
</dependency>

Snapshots of the development version are available in Sonatype's snapshots repository.

RxJava 2.x

Gradle:

implementation 'com.jakewharton.rx2:replaying-share:2.2.0'
// Optional:
implementation 'com.jakewharton.rx2:replaying-share-kotlin:2.2.0'

Maven:

<dependency>
  <groupId>com.jakewharton.rx2</groupId>
  <artifactId>replaying-share</artifactId>
  <version>2.2.0</version>
</dependency>
<!-- Optional: -->
<dependency>
  <groupId>com.jakewharton.rx2</groupId>
  <artifactId>replaying-share-kotlin</artifactId>
  <version>2.2.0</version>
</dependency>

RxJava 1.x

Gradle:

implementation 'com.jakewharton.rx:replaying-share:1.0.1'
// Optional:
implementation 'com.jakewharton.rx:replaying-share-kotlin:1.0.1'

Maven:

<dependency>
  <groupId>com.jakewharton.rx</groupId>
  <artifactId>replaying-share</artifactId>
  <version>1.0.1</version>
</dependency>
<!-- Optional: -->
<dependency>
  <groupId>com.jakewharton.rx</groupId>
  <artifactId>replaying-share-kotlin</artifactId>
  <version>1.0.1</version>
</dependency>

License

Copyright 2016 Jake Wharton

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

rxreplayingshare's People

Contributors

dependabot[bot] avatar jakewharton avatar kxfang avatar oldergod avatar zacsweers avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxreplayingshare's Issues

PublishSubject does not emmit latest item

I've got a PublishSubject which emmits items when the State of a Service changes.

stateSubject = PublishSubject.create()
stateSubject.compose(ReplayingShare.instance());
void onStateChange(State newState) {
   stateSubject.onNext(newState);
}

in my Views (Presenters) i subscribe on that subject and as the Views get destroyed / attach again i want to have the latest state when subscribing again. (--> ReplayingShare)

interactor.getServiceStateObservable()
				.subscribeOn(Schedulers.io())
				.observeOn(AndroidSchedulers.mainThread())
				.doOnNext(countdownServiceState -> {
					onStateUpdate(state);
				})
				.subscribe();

but it don't get any value when subscribing again.

Did I got anything wrong?

LastSeenObserver does not check downstream disposal

While investigating #30, I noticed that LastSeenObserver does the following:

        @Override
        public void onSubscribe(Disposable d) {
            downstream.onSubscribe(d);

            T value = lastSeen.value;
            if (value != null) {
                downstream.onNext(value);
            }
        }

Is there a reason why we don't check d.isDisposed() before calling downstream.onNext(v)? A downstream operator might call d.dispose() in its onSubscribe, so in that case we probably shouldn't emit another value right?

new subscription gets last value from earlier subscription

I'm not sure if it is the expected behavior or is it a bug. When I run the following code:

final Observable<Object> observable = Observable.create(new Observable.OnSubscribe<String>() {

    @Override
    public void call(final Subscriber<? super String> subscriber) {

        subscriber.onNext("1");
        subscriber.onNext("2");
        subscriber.onNext("3");
        subscriber.onCompleted();
    }
}).compose(ReplayingShare.instance());

observable
    .subscribe(new Action1<Object>() {

                   @Override
                   public void call(final Object text) {
                       Log.i(TAG, "onNext1: " + text);
                   }
               },
               new Action1<Throwable>() {

                   @Override
                   public void call(final Throwable throwable) {
                       Log.e(TAG, "onError1", throwable);
                   }
               },
               new Action0() {

                   @Override
                   public void call() {
                       Log.i(TAG, "onCompleted1");
                   }
               });

new Thread(new Runnable() {

    @Override
    public void run() {

        try {
            Thread.sleep(200);
        }
        catch (InterruptedException e) {
        }

        observable
            .subscribe(new Action1<Object>() {

                           @Override
                           public void call(final Object text) {
                               Log.i(TAG, "onNext2: " + text);
                           }
                       },
                       new Action1<Throwable>() {

                           @Override
                           public void call(final Throwable throwable) {
                               Log.e(TAG, "onError2", throwable);
                           }
                       },
                       new Action0() {

                           @Override
                           public void call() {
                               Log.i(TAG, "onCompleted2");
                           }
                       });
    }
}).start();

I get this output:

MainActivity: onNext1: 1
MainActivity: onNext1: 2
MainActivity: onNext1: 3
MainActivity: onCompleted1
MainActivity: onNext2: 3
MainActivity: onNext2: 1
MainActivity: onNext2: 2
MainActivity: onNext2: 3
MainActivity: onCompleted2

The second subscription get the last value from the first subscription. But I think the ouput should be:

MainActivity: onNext1: 1
MainActivity: onNext1: 2
MainActivity: onNext1: 3
MainActivity: onCompleted1
MainActivity: onNext2: 1
MainActivity: onNext2: 2
MainActivity: onNext2: 3
MainActivity: onCompleted2

Clear lastSeen on disposal of upstream

I figured that if I use RxReplayingShare to re-connect to a previously disposed upstream (i.e. after all original subscriptions where disposed), it would return me the previously cached / invalid value of the now dead upstream before querying the new upstream for data.

IMHO

val lastSeen = LastSeen<T>()
LastSeenObservable<T>(
  upstream
    .doOnNext(lastSeen)
    .share(),
  lastSeen
)

should become

val lastSeen = LastSeen<T>()
LastSeenObservable<T>(
  upstream
    .doOnNext(lastSeen)
    .doOnDispose { lastSeen.value = null }
    .share(),
  lastSeen
)

This fixed the issue for me. What do you think?

Only unsubscribe from source after no subscribers + time delay conditions are met

Would this fit within the scope of this library?

Example usage: source.replayingShare(1, TimeUnit.SECONDS)

The above source would only be unsubscribed from after both the following conditions are met

  • source has no subscribers
  • source has had no subscribers for 1 second

I could imagine this would be pretty useful for caching values such as disk loads across configuration changes.

About section inaccurately refers to RxJava 2

The about section of this repo, and consequently the Google search snippet, calls ReplayingShare "An RxJava 2 transformer", while the readme calls it "an RxJava 3 transformer". The About section should be updated to match the readme.

Default value

That's be dope to get a default value, kind of a startWith that would be executed only once in the lifetime of the replayingShare().

Right now, say I have

api.doStuff()
  .startWith(defaultData)
  .replayingShare()

the defaultData will be emitted for all new first subscriber.

Can't import lib

Hey guys!

I put implementation 'com.jakewharton.rx2:replaying-share-kotlin:2.1.0' in app/build.gradle but AS 3.3 can't resolve imports. What could be wrong? AS Can see replaying-share-kotlin in External Libraries.

Possible missed emission when racing with subscribers

There appears to be a race condition associated with RxReplayingShare which isn't present when using .replay(1).refCount(). I can't seem to reliably reproduce the issue when running a JVM unit test on my laptop, but can do so fairly reliably when running on an Android emulator.

I have this test function run when the app starts:

private fun runTest() {
    val observable = Observable.just(Unit)
        .delay(30, TimeUnit.MILLISECONDS)
        .replay(1)
        .refCount()
    val events = Collections.synchronizedList(mutableListOf<String>())
    repeat(1000) { i ->
        events += "Iteration: $i"
        observable
            .doOnSubscribe { events += "Subscribe: $i" }
            .doOnError { events += "Error: $i $it" }
            .doOnDispose { events += "Dispose: $i" }
            .doOnComplete { events += "Complete: $i" }
            .doFinally { events += "Finally: $i" }
            .subscribe { events += "Next: $i" }
    }
    Thread.sleep(10_000L)
    File(filesDir, "logs.txt").printWriter().use { writer ->
        for (event in events) {
            writer.println(event)
        }
    }
}

The logs.txt file is used in place of println/Log, since logcat seemed to be missing some of the output, and a file was more reliable.

When I run this, I get 5000 log entries, which is expected: One for each Iteration, Subscribe, Next, Complete, and Finally (order varies).

However, if I change .replay(1).refCount() to .replayingShare(), I get significantly fewer Next emissions - sometimes missing up to 100 emissions. My hypothesis is that this is happening because the following sequence of events occurs every once in a while:

  1. Subscriber A subscribes to the replayingShare, no cached value is available, so one is not emitted
  2. The replayingShare makes the first subscription to the share, and it opens the connection
  3. Subscriber B subscribes to the replayingShare, no cached value is available, so one is not emitted
  4. The upstream observable (Observable.just(Unit).delay(30, TimeUnit.MILLISECONDS)) makes the first emission: this is cached to the LastSeen, but too late for subscriber B to see it
  5. The replayingShare makes the second subscription to the share - the upstream for which has no values left to emit
  6. The stream terminates having only emitted the value to subscriber B.

If my assumption is correct, this is similar to the race condition that occurs when using share by itself.

Given that this operator presents itself as comparable to .replay(1).refCount() with the added feature of retaining the latest emission while dormant, I don't think that this behavior should be expected.

As quick workaround, I've tried replacing the .share() in RxReplayingShare's implementation with .replay(1).refCount(). This works for my use cases, but does end up producing many duplicate emissions (also about ~100 in this test with my hardware on the emulator I'm using).

Subscription happening later than a previous stream termination

We’re using replayingShare() for our location provider service with the idea of caching the last location (beyond release of costly GPS radio resources etc.)

class LocationService {
    private val locationSource: Flowable<Location> by lazy {
        Observable.compose(MyFancyRxLocationLibraryConfigurator())
            ...
            .doOnSubscribe { Timber.log 1 }
            .replayingShare()
    }

    // public api
    fun locations(): Flowable<Location> = locationSource
}

In my use case, at another point of location interest, i’m trying to see if at least 1 location data point was available. If available, i’d like to end the current stream and proceed with some business logic. So i do something like this:

locationService
        .locations()
        .take(1)
        .doOnSubscribe { 
            Timber.log 2  // this is called
        }
        .subscribe(
            { 
                onNext() // do some BL
            } ,
            { onError() },
            { 
                onComplete()
                Timber.log 3 // onComplete triggerred courtesy .take
            }

        )

If we’ve never spun up the GPS radio and acquired a location, then all works well

 // locationService onSubscribe Timber.log 2 seen
 // locationSource onSubscribe Timber.log 1 seen
// locationService onComplete Timber.log 3 recieved

I noticed though that I immediately perform the atleast 1 location check again (the take(1) use case mentioned above), instead I notice the following:

// locationService onSubscribe Timber.log 2 seen
// locationService onComplete Timber.log 3 recieved
// locationSource onSubscribe Timber.log 1 seen

🤔

The reason this gets tricky is because the GPS radio is now constantly being pinged since it happens to see the subscription a tad bit later.

Clearing the upstream (on any termination event by using a customized .replayingShare does the trick and makes my use case “work” as suggested in this other issue), but i’m trying to understand what’s the deal here with the onSubscribe call being triggered later than the onComplete when this check is done rapidly?

JW surmised a race condition. So I tried removing all the subscribeOn + observeOn in this chain and I'm still noticing the behavior.

Possible issue with certain Observables

An issue appeared with the Bluetooth package RxAndroidBle that might be of interest. The authors have deprecated their own ConnectionSharingAdapter for sharing BLE connections, and now recommend the use of RxReplayingShare. This is my understanding:

The package has a method Observable<RxBleConnection> establishConnection( ... ). This Observable is somewhat unusual in that it emits a single item (the connection), and never completes. Unsubscribing from it terminates the connection, and if the connection is lost due to a Bluetooth fault, the subscription is canceled with an onError() exception.

Apparently the issue is that RxReplayingShare will continue to emit the "stale" cached connection to subscribers even after the original connection is broken. According to the OP, the problem is that ReplayingShare does not react to finalizations. He describes a fix that might be worth considering. Comments appreciated.

Question: doesn't this implementation support only one stream?

ReplayingShare keeps track of a static instance. This instance then keeps track of the subscribers.

Am I correct to understand that this basically is limited to only one stream?

If I have a stream of clicks and a stream of ticks, I am unable to use this implementation for both, correct?

public Observable<View> getClickObservable() {
    return RxView.clicks(mButton).compose(ReplayingShare.instance();
}

public Observable<long> getTickObservable() {
    return Observable.interval(1, TimeUnits.SECONDS).compose(ReplayingShare.instance();
}

@Override
public void onStart() {
    super.onStart();

    getTickObservable().subscribe(view -> Log.i(TAG, "Got a tick!"));
    getClickObservable().subscribe(view -> Log.i(TAG, "Got a click!"));
}

The syntax may be off but should convey a point. Won't ReplayingShare emit both clicks and ticks to both of the subscribers?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.