reactive-extensions / rx-node Goto Github PK
View Code? Open in Web Editor NEWRxJS Bindings for Node.js and io.js
License: Other
RxJS Bindings for Node.js and io.js
License: Other
I run the sample below and see nothing in console
var Rx = require('rx');
var RxNode = require('rx-node');
var source = Rx.Observable.range(0, 5);
var subscription = RxNode.writeToStream(source, process.stdout, 'utf8');
├── [email protected]
├─┬ [email protected]
│ └── [email protected]
Did I miss something?
The docs have a fromEvent
method, but rx-node
doesn't include it. In any case, should it be fromEventEmitter
, given that there is toEventEmitter
?
Looks like that update was never pushed to npm, fyi.
I am trying to run an interval stream until some input comes from sdtin, however this code:
var RxNode = require('rx-node');
const Rx = require('rxjs');
Rx.Observable
.interval(100)
.takeUntil(RxNode.fromStream(process.stdin))
.subscribe(console.log);
results in:
TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
at Object.subscribeToResult (.../node_modules/rxjs/util/subscribeToResult.js:73:27)
at new TakeUntilSubscriber (.../node_modules/rxjs/operator/takeUntil.js:65:38)
at TakeUntilOperator.call (.../node_modules/rxjs/operator/takeUntil.js:51:33)
at Observable.subscribe (.../node_modules/rxjs/Observable.js:42:22)
at Object.<anonymous> (.../test.js:7:4)
at Module._compile (module.js:571:32)
at Object.Module._extensions..js (module.js:580:10)
at Module.load (module.js:488:32)
at tryModuleLoad (module.js:447:12)
at Function.Module._load (module.js:439:3)
How do I make RxNode Observables behave like RxJs?
pause() does not exist on a writeable stream
I put together a method for converting an Observable to a readable stream. It looks something like this:
function toReadableStream(observable) {
var stream = new Readable({
objectMode: true
});
stream._read = function() {
observable.subscribe(
this.push.bind(this),
this.emit.bind(this, 'error'),
this.push.bind(this, null)
)
};
return stream;
}
I have another one called subscribeCallback
, which "subscribes" an observable to a node-style callback:
function subscribeCallback(observable, callback) {
return observable.
// Only take the last item, to ensure we aren't calling `callback`
// multiple times.
last().
subscribe(
function(x) { callback(null, x); },
function(err) { callback(err); }
);
};
My use case is that I'm working with a library where we're trying to keep all of our interfaces node-standard (callbacks or streams), but I'd still like to use RxJS for implementations. So I have a lot of functions that look like this
function fetchAThing(callback) {
// Bridge another node-style callback function to an Observable
var getter = Observable.fromNodeCallback(getSomething);
// Do my observable stuff
var obs = getter.
map(...).
filter(...);
// Bridge back to node-style callbacks
subscribeCallback(obs, callback);
}
function fetchABunchOfThings() {
var obs = RxNode.fromReadbleStream(getAStreamOfThings());
// Do my observable stuff
var obs2 = obs.
map(...).
filter(...);
// Bridge back to a node stream
return toReadableStream(obs2);
}
Is this something that you would see as in the scope of the RxNode library? If so, I can look at submitting a pull-request.
This issue is moved from: Reactive-Extensions/RxJS#508
http://nodejs.org/api/stream.html#stream_readable_pause
I tried to request only two items from a Readable Node stream with Rx
and I observed the following behaviour:
Rx subscribe
next emits only the expected two itemsRx Observable
reads every item from the stream, doesn't pause the stream after twoRx
var source = Rx.Node.fromStream(dbUserStream).controlled();
source.subscribe(..);
source.request(2);
or
Rx.Node.fromStream(dbUserStream)
.take(2)
.subscribe(..);
I think Rx should support some backpressure here and pause the Readable stream. Would be great for take
also.
In Highland it works in the following way:
_(dbUserStream)
.take(2)
.toArray(function (users) { ... });
Did I miss something?
Hi
I have see the pull request about RxJS 5, but it looks that finally there is no a version of rx-node that supports the new RxJs 5.
Is there some plans to support RxJS 5?
Thanks
Why does writeToStream transform everything to a string? I would consider this a bug and it should be aligned with the typical node idiom that relies on an encoding to indicate string writing and on the actual type of the thing being written.
It is impossible to use it to write Buffers into a file.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.