Comments (34)
@tamtakoe this is because you must implement a ._read
method on the readable stream. e.g.
var Readable = require('stream').Readable;
var myStream = new Readable();
myStream._read = function (n) { };
myStream.push('some data');
// or even
myStream._read = function (n) {
this.push('some data');
};
myStream.push(null); // will cause an `end` event
myStream.pipe(...)
or inherit:
var Readable = require('stream').Readable;
var inherits = require('util').inherits;
function MyStream(options) {
Readable.call(this, options);
}
MyStream.prototype._read = function (n) {
this.push('some data');
this.push(null);
};
var myStream = new MyStream();
myStream.pipe(...)
It is worth mentioning that a Writable
stream needs a _write()
method in the same way.
var Writable = require('stream').Writable;
var w = new Writable();
w._write = function (data, encoding, next) {
// do something with data chunk
next(/* error if any */);
};
A Duplex
stream needs _read
and _write
methods.
And a Transform
stream needs only a _transform
method:
var Transform = require('stream').Transform;
var t = new Transform();
t._transform = function (data, encoding, next) {
// do something with data chunk
next(/* error or.. */, data); // push chunk on to readable side
};
from through2.
I think the issue is with your counting, and stuff happening async over the next ticks.
through2 also takes a second argument. This is a _flush
function, which happens right at the end of the stream, after the last chunk has passed through. This is when you want to call .push(null)
.
Try this:
compileProject(config.project1.modules)
.pipe(gulp.dest(buildPath)));
function compileProject(modules) {
var Readable = require('stream').Readable;
var generalStream = new Readable({ objectMode: true });
generalStream._read = function (n) {};
function oncomplete() {
generalStream.push(null);
}
Object.keys(modules).forEach(key, i) {
var flush;
if ((i + 1) === modules.length) {
flush = oncomplete;
}
compileModule(key, modules[key])
.pipe(through.obj(function (file, enc, callback) {
generalStream.push(file);
callback(null, file);
}, flush));
});
return generalStream;
}
from through2.
F.e. I generate new file with config or some module (without loading any resources) and I want to work with it as with a stream (use it in gulp).
Now I use
var myStream = require('event-stream').readable(function (count, callback) {
this.emit('data', 'some data');
this.emit('end');
callback();
});
myStream
.pipe(process)
.pipe(save)
Can I use through2
for this purpose?
from through2.
if I use require('stream').Readable
, I will have problems with using stream 3? I will need to rewrite code?
from through2.
Thank!
P.S. stream
and readable-stream
doesn't work if I run stream.push(null) asynchronously (Error: not implemented)
from through2.
fyi from2 is a pretty sweet module to create readable streams and avoid all the subclassing noise
from through2.
@tamtakoe streams3 are my preferred option and can be installed by npm install [email protected]
.
Streams2 are mainly an issue if you are trying to listen to both readable
and data
events on a single stream.
from through2.
yeah it would be cool, and not too much of a change to do:
var readable = new Readable({
read: function (n) {
this.push('data')
this.push(null)
}
});
from through2.
Many thanks to the proposed solutions! It is very useful to me!
Why not add a wrapper methods for creating streams in through2?
from through2.
Through2 is more about making a Through / Transform stream.
You could certainly make one for a Readable, and I actually wrote one for Writable using much of the same Ideas from this module.
But as mentioned above there are other options for readables. My thoughts are more towards io.js streams directly, to make them more user friendly.
from through2.
doesn't work...
var Readable = require('stream').Readable;
var generalStream = new Readable();
generalStream._read = function (n) { };
...
modules.forEach(function(module) {)
gulp.src(modules.files)
....
.pipe(through.obj(function(file, enc, callback) {
generalStream.push(file);
//From this point the code doesn't run (why?)
callback(null, file);
if (isLastModule) {
generalStream.push(null);
}
}));
})
return generalStream;
from through2.
Alas! the one thing i wondered if i needed to include in my example but didn't
from through2.
It works!
myStream._read = function (n) { }
looks strange. Why node.js developers didn't implement noop function as default value of _read
?
from through2.
I try from2
function read(n, next) {
console.log(n);
//var modulesInCounter = 0;
//var modulesOutCounter = 0;
var generalStream = this;
for (var key in modules) {
//modulesInCounter++;
compileModule(key, modules[key], base)
.pipe(through.obj(function (file, enc, callback) {
//modulesOutCounter++;
generalStream.push(file);
callback(null, file);
//if (modulesOutCounter >= modulesInCounter) {
// generalStream.push(null);
// next();
//}
}));
}
}
return from.obj(read);
- Why the
n
(argument of read) is always 16 - Why it works without
generalStream.push(null)
ornext()
?
from through2.
a1) I have never used from2, but I imagine 16 is the default (hughsk/from2/index.js#L83). I wouldn't worry about that too much, in objectMode it is usually void anyway, since usually only one object is passed through at a time.
It is specific usually to the amount of bytes to read from the internal buffer on .read()
a2) I have a feeling this might be because _read()
gets called only when a stream is consuming generalStream
, if nothing is waiting for data from it via readable
or data
event then this will be waiting to be consumed before it tries to _read
again.
(Obviously I can't see the rest of your code so I am making some assumptions here)
The result of what you have there is completely different to what you had before, this is the equivalent of:
var Readable = require('stream').Readable;
var generalStream = new Readable();
generalStream._read = function (n) {
console.log(n);
//var modulesInCounter = 0;
//var modulesOutCounter = 0;
var generalStream = this;
for (var key in modules) {
//modulesInCounter++;
compileModule(key, modules[key], base)
.pipe(through.obj(function (file, enc, callback) {
//modulesOutCounter++;
generalStream.push(file);
callback(null, file);
//if (modulesOutCounter >= modulesInCounter) {
// generalStream.push(null);
// next();
//}
}));
}
};
...
return generalStream;
from through2.
It is not equivalent. If you leave comments, _read
works in an infinite loop. If uncomment code _read
works several times.
My code. I use new Readable
two times: for generalStream and for headStream and I have a problem with a streams that were created by me.
compileProject(config.project1.modules)
.pipe(gulp.dest(buildPath)));
function compileProject(modules) {
var Readable = require('stream').Readable;
var generalStream = new Readable({ objectMode: true });
generalStream._read = function (n) {
var modulesInCounter = 0;
var modulesOutCounter = 0;
var generalStream = this;
for (var key in modules) {
modulesInCounter++;
compileModule(key, modules[key])
.pipe(through.obj(function (file, enc, callback) {
modulesOutCounter++;
generalStream.push(file);
callback(null, file);
if (modulesOutCounter >= modulesInCounter) {
generalStream.push(null);
}
}));
}
};
return generalStream;
}
function compileModule(name, params) {
var Readable = require('stream').Readable;
var headStream = new Readable({ objectMode: true });
headStream._read = function (n) {
var dependencies = params.deps.length ? '"' + params.deps.join('", "') + '"' : '';
var content = 'angular.module("' + params.moduleName + '", [' + dependencies + ']);';
var file = new File({
path: 'head.js',
contents: new Buffer(content)
});
this.push(file);
this.push(null);
};
var scriptsStream = gulp.src(params.scripts)
.pipe(g.uglify({
mangle: false
}))
var templatesStream = gulp.src(params.templates)
.pipe(g.angularTemplatecache(name + '/templates.js', {
module: params.moduleName,
base: path.join(base, params.root, '/'),
templateHeader: TEMPLATE_HEADER,
templateFooter: TEMPLATE_FOOTER
}))
var moduleStream = series(headStream, scriptsStream, templatesStream)
.pipe(g.concat(name + '.js'))
return moduleStream;
}
from through2.
When you pass the function to from2 this is what happens:
Class.prototype._from = read
Class.prototype._read = function(size) {
var self = this
if (this._reading || this.destroyed) return
this._reading = true
this._from(size, check)
function check(err, data) {
if (self.destroyed) return
if (err) return self.destroy(err)
if (data === null) return self.push(null)
self._reading = false
if (self.push(data)) self._read()
}
}
https://github.com/hughsk/from2/blob/master/index.js#L45-L59
So actually rather than calling push
you could use the second argument since you are pass ._read(<number>, <Function>)
and the Function takes an error and data, similar to the callback in a Transform stream (although in streams3 it can be undefined
, it doesn't have to be null
).
The issue with the above is that ._read()
will only happen once, because after you call push(null)
it signifies the end of the stream. Is that the desired effect?
from through2.
or even:
compileProject(config.project1.modules)
.pipe(gulp.dest(buildPath)));
function compileProject(modules) {
var Readable = require('stream').Readable;
var generalStream = new Readable({ objectMode: true });
var moduleKeys = Object.keys(modules);
var count = 0;
var total = moduleKeys.length;
function compileModule_(key, index) {
/* // streams3
compileModule(key, modules[key])
.on('data', function (file) {
generalStream.push(file);
if ((index + 1) === total) {
generalStream.push(null);
}
});
*/
// streams2
var mod = compileModule(key, modules[key]);
mod.on('readable', function () {
generalStream.push(mod.read());
if ((index + 1) === total) {
generalStream.push(null);
}
});
}
generalStream._read = function (n) {
compileModule_(moduleKeys[count], count);
count++;
};
return generalStream;
}
from through2.
It work correct! There is just one more little problem... It is very difficult (this code is at a lower level of abstraction than that used in the program).
I found how gulp.src creates stream using only through2 https://github.com/wearefractal/glob-stream/blob/master/index.js#L27-L41
I have successfully used it for headStream
var headStream = through.obj();
headStream.write(file);
headStream.end();
But it does not work for generalStream (works if you replace a string on the comments)
var generalStream = through.obj();
generalStream._read = function (n) {};
function oncomplete() {
generalStream.end(); //generalStream.push(null);
}
var moduleKeys = Object.keys(modules);
var count = 0;
var total = moduleKeys.length;
for (var key in modules) {
var flush;
count++;
if (count >= total) {
flush = oncomplete;
}
compileModule(key, modules[key], base)
.pipe(through.obj(function (file, enc, callback) {
generalStream.write(file); //generalStream.push(file);
callback(null, file);
}, flush));
}
return generalStream;
from through2.
if you are using through.obj you do not want to set ._read
remove that line as it will mess with the Transform stream internals.
from through2.
it is really up to you they kind of do the same thing, except that the through stream is adding writable stream functionality to the generalStream
rather than it only being readable.
It's kinda of like using through streams as a hammer to solve all problems, but if it is simpler for you to understand your code it's probably more important, and since it looks like it is build code rather than production deployed application code.
It just means generalStream
could be piped into, although i don't suggest that as it would make your code even more complicated.
from through2.
Clear code is very important for me. I'm writing a build script, so performance and accuracy does not matter. Ideally, you do not even know about the internal implementation of stream, and perform only simple actions with streams (create, merge, fork, save, ...).
It remains to find out why it works without flush
/generalStream.end()
/generalStream.push(null)
/etc. :-)
//It works! why?!
var generalStream = through.obj();
for (var key in modules) {
compileModule(key, modules[key], base)
.pipe(through.obj(function (file, enc, callback) {
generalStream.write(file);
callback(null, file);
}));
}
return generalStream;
from through2.
Probably because all the files have been handled, the script just ends, nothing is waiting in the event loop.
But by never ending the stream you would never clear up the stream internals. So this would not be great for code running on a server.
from through2.
How I can create chain of through2 streams?
I try
stream
.pipe(throughStream1)
.pipe(throughStream2)
//must be equivalent
throughStream = throughStream1.pipe(throughStream2)
stream
.pipe(throughStream)
But it doesn't work
from through2.
I am confused 😕
throughStream = throughStream1.pipe(throughStream2)
// will yield => throughStream === throughStream2
So by doing this:
throughStream = throughStream1.pipe(throughStream2)
stream
.pipe(throughStream)
you are actually doing this:
stream.pipe(throughStream2)
although throughStream1
is still set up to pipe into throughStream2
as well
It is not like a Promise
where the following would create 4 promises and give you the result of them all:
return new Promise(() => {
return new Promise(resolve => resolve({ data: true })
})
.then(result => {
return new Promise(resolve => resolve(result.data)
});
pipe
returns the stream that is being pipe
d too:
readable.pipe(throughStream) // => throughStream
so
stream
.pipe(throughStream1)
.pipe(throughStream2)
is the same as doing:
throughStream1.pipe(throughStream2)
stream.pipe(throughStream1)
from through2.
Hmm... Other way. Can I set stream data? Not chunk, but data as stream param?
stream
.pipe(through.noop, function(callback) {
this.data = {foo:bar};
callback();
}))
.pipe(through.noop, function(callback) {
console.log(this.data); //{foo:bar};
callback();
}))
this.data
doesn't work. It may be possible in another way?
from through2.
No not in that way.
I'm not sure what you are trying to do here, because the second argument to .pipe
is an object
not a function
:
stream.pipe(writable, { end: true|false })
so the function will not even be called.
It depends on how you are using this data
object
, couldn't you do something like:
;(function () {
var data = {};
stream
.pipe(through2.obj(function(data, encoding, callback) {
data[foo] = bar;
callback(null, data);
}))
.pipe(through2.obj(function(data, encoding, callback) {
console.log(data); // { foo:bar } ;
callback(null, data);
}));
}());
from through2.
I thought about this method, but came up with a other way, it will not have make logic out of the stream.
//storage.js
var data = {};
module.export = function(key, value){
return throught.obj(function(file, enc, callback) {
if (value) {
data[key] = value;
} else {
file.contents = new Buffer(data[key]);
}
callback(null, file)
})
}
//gulpfile.js
var storage = require('storage')
stream
.pipe(storage('foo','bar'))
.pipe(storage('foo'))
from through2.
New case with new stream
Windows, node.js v 0.12.2
File structure:
test1 |- script1.js |- script2.js |- ... test2 ... test5
gulpfile.js:
gulp.task('test', function() {
var streamsArr = [];
for (var stream, i = 1; i < 5; i++) {
stream = gulp.src(path.join(__dirname, 'test' + i, '**/*'))
.pipe(through.obj(function(file, enc, callback) {
setTimeout(function() {
callback(null, file);
}, Math.ceil(Math.random()*100));
}))
.pipe(require('gulp-concat')('script.js'))
.on('end', (function(i) {
console.log('end of the stream ' + i);
}).bind(null, i));
streamsArr.push(stream);
}
require('gulp-merge')(streamsArr) //or stream-series, or merge-stream, etc. — all make new stream
.pipe(through.obj(function(file, enc, callback){
console.log('merged stream transform:', path.relative(__dirname, file.path));
callback(null, file);
}, function(callback) {
console.log('merged stream flush');
callback();
}))
.on('end', function() {
console.log('end of the merged stream');
});
});
console:
end of the stream 4 end of the stream 3 merged stream transform: test1\script.js end of the stream 1 merged stream transform: test2\script.js end of the stream 2
Somehow, part of a stream is lost. Also did not work flush
and end
event of merged stream.
from through2.
You have to drain the stream in your gulp-merge
function since you are pushing data to the readable part of the stream (callback(null, file)
). You can do this by piping it somewhere or calling .resume()
require('gulp-merge')(streamsArr) //or stream-series, or merge-stream, etc. — all make new stream
.pipe(through.obj(function(file, enc, callback){
console.log('merged stream transform:', path.relative(__dirname, file.path));
callback(null, file);
}, function(callback) {
console.log('merged stream flush');
callback();
}))
.on('end', function() {
console.log('end of the merged stream');
})
.resume() // add this
from through2.
@mafintosh resume()
doesn't work. I get something like that in console:
end of the stream 2 end of the stream 4 merged stream transform: test1\script.js end of the stream 1 end of the stream 3
I did something like this, but it is too complicated and verbose
var through = require('through2');
var files = [];
for (var stream, i = 1; i < 5; i++) {
stream = gulp.src(path.join(__dirname, 'test' + i, '**/*'))
.pipe(through.obj(function(file, enc, callback) {
setTimeout(function() {
callback(null, file);
}, Math.ceil(Math.random()*100));
}))
.pipe(through.obj(function(file, enc, callback) {
files.push(file);
if (i === 4) {
createStream(files);
}
}))
}
function createStream(files) {
var stream = through.obj();
stream.pipe(through.obj(function(file, enc, callback) {
callback();
}, function(callback) {
this.push.apply(this, files);
callback();
}))
stream.end();
}
from through2.
@tamtakoe you aren't calling resume in that example?
from through2.
@mafintosh I copy your example in my example
...
require('gulp-merge')(streamsArr)
...
.resume() // add this
...
but it is not work correct. Maybe I did something wrong?
from through2.
@mafintosh There was a problem in the gulp-concat. I used v. 2.4.3, but in 2.5.2 it was fixed
from through2.
Related Issues (20)
- [QUESTION] through.obj remove duplicate files using path HOT 3
- Security risk
- Object `Transform`s not working well with `stream.compose`
- Update readable-stream module as it's dependent module safe-buffer marked as EOL
- TypeError: Cannot read property 'ended' of undefined HOT 2
- please use a well known open source license HOT 3
- Async issue HOT 1
- package.json parse error
- Readme should describe the transformFunction's "encoding" argument.
- Why only get to the first 16 matched files HOT 3
- how do I test a gulp plugin used througn2 with async callback?
- additional collab?
- Es7 Cannot read property 'push' of undefined HOT 3
- multi column postgres Copy using through2 HOT 1
- Hard to understand the code HOT 2
- 'end' emitted after 'finish' HOT 6
- Create stream with independent objectModes for readableState and writeableState? HOT 2
- v2.0.4 breaking change for gulp packages when using Node 0.10 HOT 17
- BC of new version HOT 1
- `this.push` method call other object stream function HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from through2.