Git Product home page Git Product logo

rxjs's Introduction

import * as Rx from 'rxjs';

export class rxjsExmaples{ constructor() { //this.creatObservable(); // new Rx.Observable ,subscribe //this.createIntervalObservable(); //, Observable.interval , unsubscribe, next,complete // this.tryMyTakeFilter(); // this.bultInOperators(); // this.subjects(); //this.hotVsCold(); //this.builtInOperators2(); //this.combineObservables(); this.errorHandling();

} title = 'app works!'; creatObservable() {

const observable = new Rx.Observable(observer => {
  console.log('creating observable')
  setTimeout(function () {
    observer.next('an item');
    setTimeout(function () {
      observer.next('another item');
      observer.complete();
    }, 500);
  }, (1000));
})

observable.subscribe(
  (item) => {
    console.log(item),
      error => { console.error(error) },
      () => { console.log('completed') }
  });

setTimeout(function () {
  observable.subscribe(
    (item) => {
      console.log(item),
        error => { console.error(error) },
        () => { console.log('completed') }
    });
}, 2000);

} createIntervalObservable() { const observable = Rx.Observable.interval(1000); const observer = { next: (index) => console.log(index) }; let subscription = observable.subscribe(observer); setTimeout(function () { subscription.unsubscribe(); }, 10000);

} //what an operator does is simply wrapping the observable with new observable that filter that passed stream from the inner observable take$(observable: Rx.Observable, amount) { //creating and returing new observable return new Rx.Observable(observer => { let count = 0; const subscription = observable.subscribe({ next: (item) => { if (count++ >= amount) { observer.complete(); } else { observer.next(item); } }, error: (error) => { observer.error(error) }, complete: () => observer.complete()

  });
  //return value = function that happen when the created observable subscription invoke .unsubscribe()
  return () => subscription.unsubscribe(); //unsubsribing from the inner observable
});
// / creating and returing new observable

} tryMyTakeFilter() { this.take$(Rx.Observable.interval(1000), 5) .finally(() => { console.log('take filter finished') }) .subscribe((item) => { console.log('from take filter :' + item) }) }

bultInOperators() { //interval + take: Rx.Observable.interval(1000).take(3).subscribe((index) => { console.log(index) }); //Timer Rx.Observable.timer(1000).subscribe((index) => console.log('timer : ' + index), (err) => console.log(err), () => console.log('timer completed')); //Of Rx.Observable.of('hello world').subscribe((item) => console.log('Of :' + item)); Rx.Observable.of(['hello world', 3, 4]).subscribe((item) => console.log('Of With Array :' + item)); //from Rx.Observable.from(['hello', 'world', 3, 4]).subscribe((item) => console.log('From : ' + item)); //throw Rx.Observable.throw(new Error('this is an error')).subscribe({ error: item => console.log(item) }); //throw an error //empty Rx.Observable.empty().subscribe({ complete: () => console.log('empty : completed') })//emits no items to the observer and complete immidetly //never Rx.Observable.never().subscribe(() => console.log('never')) //emits no items and never complets //range Rx.Observable.range(10, 20).subscribe((index) => console.log('range :' + index)) //emits 1 to 30 //defer let defer$ = Rx.Observable.defer(() => { return Rx.Observable.of('hi') }); //will invoke this function when subscribing defer$.subscribe((item) => console.log('Defer : ' + item));

//fs.readdir('../app',(err,files)=>console.log(files));

} subjects() { //subject is like an observable that we can control when it emits values const subject$ = new Rx.Subject();

subject$.subscribe({
  next: (item) => console.log('subject :' + item),
  complete: () => console.log('subject completed!')
});

subject$.next('emit1');
subject$.next('emit2');
subject$.complete();
//example with interval
const interval$ = Rx.Observable.interval(500).take(5);
const intervalSubject$ = new Rx.Subject();
interval$.subscribe(intervalSubject$); //subject as next and complete methods - so its like we passing an observer (=intervalSubject) to the interval observable 

intervalSubject$.subscribe((item) => console.log('subject Interval sub 1 : ' + item));
intervalSubject$.subscribe((item) => console.log('subject Interval sub 2 : ' + item));
setTimeout(function () {//NOTICE - the subject produce values wether or not somebody listening
  intervalSubject$.subscribe((item) => console.log('subject Interval suspended sub : ' + item));

}, 1000);

//behavior subject:
//the moment the subscriber subscribe to the behavior subject - it gets the previous value as  emit
const behaviourSubject$ = new Rx.BehaviorSubject<string>('hello');//behavior subjects get initial state that gey emit
behaviourSubject$.subscribe(s => console.log('behavioirSubject : said :' + s))

behaviourSubject$.next('hi');
behaviourSubject$.next('hey');
behaviourSubject$.next('bye');

setTimeout(function () {
  behaviourSubject$.subscribe(s => console.log('behaviorSubject SUB2 :' + s));
}, 2000);
//IN BehaviorSubjects -the moment the subscriber subscribe to the behavior subject - it gets the last emitted value
//but what if we want multi previous values to get emit to the new subscriber?:


const replaySubject$ = new Rx.ReplaySubject<Number>(2); //the initial value - number of previous values a new subscriber gets
replaySubject$.next(1);
replaySubject$.next(2);
replaySubject$.next(3);

//sub after 3 emits
replaySubject$.subscribe(c => console.log('reaplysubject :' + c));//output : replaysubject 2,replaysubject 3

const replaysubject2$ = new Rx.ReplaySubject<string>();

replaysubject2$.next('hello');
replaysubject2$.next('hello');
replaysubject2$.next('hello');
replaysubject2$.next('hello');
//subscribe after 4 emits
replaysubject2$.subscribe((s) => console.log('ReplaySubject<string> :' + s)); //output : all vlaues including all prev values
replaysubject2$.next('bye');
replaysubject2$.next('blah');
//https://stackoverflow.com/questions/34376854/delegation-eventemitter-or-observable-in-angular2

}

hotVsCold() { // //HOT - u not expected to recieve history data: // const keyUps$ = Rx.Observable.fromEvent(document.body, 'keyups'); // //Cold - when u subscribe to it - then it produce values and you excpect to recieve all vlaues from start to finish // const interval$ = Rx.Observable.interval(400); // //every time we subscribe to the interval we got a new set timeout

// interval$.subscribe(i => console.log('one :' + i));

// setTimeout(function () {
//   interval$.subscribe(i => console.log('two :' + i));

// }, 1000);
//OUTPUT : one : 0, one : 1 ..2 then one:3 ,two:0 , one:4 ,two:1
//--------------------------------------------------------------------------------------------------

//making interval a HOT observable
const interval2$ = Rx.Observable.interval(100).take(5).publish();//PUBILSH() =hot
interval2$.connect();//start emit values even if nobody sub to it
setTimeout(function () {
  interval2$.subscribe(i => console.log('sub1 :' + i));
}, 300);
setTimeout(function () {
  interval2$.subscribe(i => console.log('sub2 : ' + i));
}, 400);

//OUTPUT :  sub1 : 2, sub1 : 3,sub2 : 3, sub1 : 4,sub2 : 4, NOTICE the subscribers get the same value

//--------------------------------------------------------------------------------------------------

//example when to use this:
const chatmessages$ = new Rx.Observable(observer => {
  observer.next(1);
  observer.next(2);
  setTimeout(function () {

    observer.complete();
  }, 5000);
  //return value = function that happen when the created observable subscription invoke .unsubscribe()
}).publishLast();// OR publish() //HOt observable that no matter when u sub to it ,
// it will emit values to the subscriber from the last vlaue before the completion
//and  publishLast() will emit values only after the observable complete
const connection = chatmessages$.connect();
const sub1 = chatmessages$.subscribe((i) => { console.log('chat Sub 1 :' + i) }, () => { }, () => console.log('chat sub 1 completed'));
const sub2 = chatmessages$.subscribe((i) => { console.log('chat Sub 2 :' + i) }, () => { }, () => console.log('chat sub 2 completed'));
setTimeout(function () {
  sub1.unsubscribe();
  sub2.unsubscribe();

  connection.unsubscribe(); //because its a hot observable we also need to dispose the connection (observable itself)
}, 6000);
//OUTPUT : after 5000mls - 'chat sub 1 : 2', 'chat sub 2 : 2' 'chat sub 1 completed', 'chat sub completed'
//NOTE - we can use publishReplay if we want to emit to the suber more previous values
//--------------------------------------------------------------------------------------------------
//---refcount 
//refcount will handle the connect() and connection.unsubscribe for you
//it will connect when there is first subscription and  will connection.unsubsribe
//when all subs unsubscribed()

const observable$ = new Rx.Observable(observer => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  return () => console.log('disposed refcount observable');
})
const publish$ = observable$.publishReplay(2).refCount();

const subs1 = publish$.subscribe((i) => console.log(`refCount sub1 :${i}`));
const subs2 = publish$.subscribe((i) => console.log(`refCount sub2 :${i}`));

subs1.unsubscribe();
subs2.unsubscribe();
const subs3 = publish$.subscribe((i) => console.log(`refCount sub3 :${i}`));
//NOTE observabke$.share() = observable$.publish().refCount()

}

builtInOperators2() { //------do , map finally // Rx.Observable.range(1, 10) // .finally(() => console.log('completed!!')) // on complete // .do((i) => console.log('from do : ' + i))//doesnt affect the stream // .map(i => i * 2) // .subscribe(i => console.log(i)) //OUTPUT //from do :1 ,2 from do:2,4......completed!! //--------------------------------------------------------------------------------------------------

// //------filter
// Rx.Observable.interval(100)
// .startWith(-10)
// .filter(i=>i%2 ===0)
// .subscribe(i=>console.log(i));
//OUTPUT -10,0,2,4..
//--------------------------------------------------------------------------------------------------

//------Merge 
//merge the 2 observable to 1 stream of data
// Rx.Observable.interval(1000)
//   .merge(Rx.Observable.interval(500))
//   .subscribe(i => console.log(i)); //OUTPUT : 0 , 0, 1, 2, 1 ,3, 4,2
//Or
// Rx.Observable.merge(
//   Rx.Observable.interval(1000).map(i=>'second :'+i),
//   Rx.Observable.interval(500).map(i=>'half second :'+i),

// ).take(10).subscribe(console.log);
//--------------------------------------------------------------------------------------------------

//-----Concat - concat the strams of data on after another
// Rx.Observable.concat(
//   Rx.Observable.interval(100).take(5),
//   Rx.Observable.range(10, 3)
// ).subscribe(console.log); //OUTPUT 0,1,2,3,4,10,11,12
//--------------------------------------------------------------------------------------------------

// //-------MergeMap - Projects each source value to an Observable which is merged in the output Observable.
// let getTracks = () => {
//   return new Promise((res, rej) => {
//     setTimeout(function () {
//       res(['track1', 'track2'])
//     }, 2000);
//   })
// }
// Rx.Observable.fromPromise(getTracks()).subscribe(console.log); //OUTPUT ['track1','track2]
// let observable$: Rx.Observable<Array<string>> = Rx.Observable.fromPromise(getTracks())
// observable$.mergeMap(tracks => Rx.Observable.from(tracks)).subscribe(console.log)//OUTPUT : 'track1','track2'


// let querypromise = (query: string): Promise<string> => {
//   return new Promise((res, rej) => {
//     setTimeout(function () {
//       res('THE QUERY IS :' + query);
//     }, 1000);
//   })
// }

// Rx.Observable.of("my query")
//   .do(() => console.log('before merge'))
//   .mergeMap((q) => querypromise(q))
//   .do(() => console.log('after merge'))
//   .subscribe(console.log); //OUTPUT : before merge, after merge, THE QUERY IS : my query

//--------------------------------------------------------------------------------------------------
//-----------switchmap - same as mergemap but it  cares only about the last source value and forsake the others
//emit immediately, then every 5s
// const source = Rx.Observable.timer(0, 5000);
// //switch to new inner observable when source emits, emit items that are emitted 
// const example = source.switchMap(() => Rx.Observable.interval(500));
// //output: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8

// //in merge map it would have been 0,1,2,3,4,5,6,7,8 ,9,0 10,1 11,2 ...... 19,9,0
// const subscribe = example.subscribe(val => console.log(val));
// //--------------------------------------------------------------------------------------------------
//reduce + scan
//reduce emit value only after it stream finishes and reduce has final value
// Rx.Observable.range(1, 10)
//   .reduce((prev, current) => prev + current)
//   .subscribe((i) => console.log('reduce :' + i)); //OUTPUT reduce 55
// //scan - same as reduce but it emit each valu reduce sepertly
// Rx.Observable.range(1, 10)
//   .scan((prev, current) => prev + current)
//   .subscribe(i => console.log(`scan ${i}`)); //OUTPUT scan 1, scan 3, scan 6....scan 55
// //so with scan we can
// Rx.Observable.interval(100).scan((p, c) => p + c).subscribe(console.log);
// //and it will print valu for each source data
// // but because Reduce on the other hand wait for the source to finish it wont do anything
// Rx.Observable.interval(100).reduce((p, c) => p + c).subscribe(i=>console.log('reduce'+i));//OUTPUT: NONE

// //--------------------------------------------------------------------------------------------------
//Buffer + ToArray

// Rx.Observable.interval(100).bufferCount(10)
// .subscribe(console.log); //OUTPUT : [0,1,2....9], [9,10....19],...   

// Rx.Observable.interval(100).bufferTime(300)
// .subscribe(console.log); //OUTPUT : [0,1,2], [4,5,6],...   

//The buffer method periodically gathers items emitted by a source Observable into buffers, and emits these buffers as its own emissions.

// const stopSubject$ = new Rx.Subject();
// Rx.Observable.interval(100).buffer(stopSubject$).subscribe(console.log);// OUTPUT [0,1,2,3,4]
// //the buffer method get an observable that singles when to flush the data
// setTimeout(function() {
//   stopSubject$.next();
// }, 500); 

// Rx.Observable.range(1,10)
// .toArray().subscribe(console.log); //OUTPUT : [1,2...10]
//toArray collect all data stream into array and pass it forward when observable finished
// //--------------------------------------------------------------------------------------------------
// first ,last ,skip , take ,takeUntil, skipUntil
const observable$ = Rx.Observable.create(observer => { //NOTE -cold observable
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.next(4);

});

// observable$.first().subscribe(console.log); //OUTPUT 1 - NOTE - will get error if no element exists
// observable$.last().subscribe(console.log); //OUTPUT none - it waits unti it complete - NOTE - will get error if no element exists
//observable$.single().subscribe(console.log); //OUTPUT error - there isnt single element, there are multi

// observable$.skip(2).subscribe(console.log); //OUTPUT 3,4 -
// observable$.take(3).subscribe(console.log); //OUTPUT 1,2,3

// observable$.takeUntil(i=>i<3).subscribe(console.log); //OUTPUT 1,2,3

//Rx.Observable.interval(100).skipWhile(i=>i<4).takeWhile(i=>i<10).subscribe(console.log);//OUTPUT 4...9
Rx.Observable.interval(100).skipUntil(Rx.Observable.timer(350)).takeUntil(Rx.Observable.timer(1000)).subscribe(console.log);//OUTPUT 3...8

}

combineObservables() { //http://rxmarbles.com/#zip Rx.Observable.range(1, 10) .zip(Rx.Observable.interval(1000), (left, right) => {//NOTE - left -first observable items , right -second observable items return got ${left} value , after ${right} secondes }) .subscribe(console.log); //OUTPUT - 'got 1 after 0 secondes' , 'got 2 after 1 seconds'

//http://rxmarbles.com/#withLatestFrom
//Rx.Observable.interval(200).withLatestFrom(Rx.Observable.interval(500)).subscribe(console.log);//OUTPUT -after 200mls [1,0], after "" [2,1],[3,1],[4,2],[5,2]
//http://rxmarbles.com/#combineLatest
//similar to withLatest but it emit value wether one of the observables emits data
Rx.Observable.interval(200).combineLatest(Rx.Observable.interval(500)).subscribe(console.log);//OUTPUT -after 200mls [1,0],[2,0],[3,0],-->[4,0],[4,1]<--

} errorHandling() { // //if we dont handle errors -> the observable will stop is streaming and will unsubscriibe // Rx.Observable.concat( // Rx.Observable.of(42), // Rx.Observable.throw(new Error('blah')), // Rx.Observable.of(10) // ).subscribe(console.log); // 42 , Error raise! . not getting 10 - observable got unsubscribed automatically //----------------------------- //simulate error raise: let getapi = () => { return new Promise((res, rej) => { setTimeout(function () { rej(new Error('ERORR!!!')); }, 100); }) }

Rx.Observable.fromPromise(getapi())
  .catch(err => {
    console.log(err);
    return Rx.Observable.of(err);
  })
  .subscribe(item => console.log('ITEM :' + item)) //OUTPUT - ERROR!! :err descripton ,'ITEM Error :ERROR!!'
//---------------------------------
//simulaet error raise :
let getapiObservable = () => {
  return new Rx.Observable((observer) => {
    console.log('Getting Api');
    setTimeout(function () {
      observer.error(new Error('ERORR **'))
    }, 100);
  })
}

getapiObservable()
.retry(3) // retry the request 3 times if it Fails
.catch(err=>{console.log('Error Raised :' +err); return Rx.Observable.of(err)})
.subscribe(i => {console.log('item :' + i)})//OUTPUT : Getting Api * 3 , Error Raised : Error ERROR **, item :Error :Error**
//NOTE - if we wont catch - it still retry 3 times but the error will be throw to console

} }

rxjs's People

Contributors

kareemlashin avatar

Watchers

James Cloos avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.