Git Product home page Git Product logo

Comments (34)

sonewman avatar sonewman commented on August 16, 2024 1

@tamtakoe this is because you must implement a ._read method on the readable stream. e.g.

var Readable = require('stream').Readable;
var myStream = new Readable();

myStream._read = function (n) { };
myStream.push('some data');

// or even
myStream._read = function (n) {
  this.push('some data');
};

myStream.push(null); // will cause an `end` event
myStream.pipe(...)

or inherit:

var Readable = require('stream').Readable;
var inherits = require('util').inherits;

function MyStream(options) {
  Readable.call(this, options);
}
MyStream.prototype._read = function (n) {
  this.push('some data');
  this.push(null);
};

var myStream = new MyStream();
myStream.pipe(...)

It is worth mentioning that a Writable stream needs a _write() method in the same way.

var Writable = require('stream').Writable;
var w = new Writable();
w._write = function (data, encoding, next) {
  // do something with data chunk
  next(/* error if any */);
};

A Duplex stream needs _read and _write methods.

And a Transform stream needs only a _transform method:

var Transform = require('stream').Transform;
var t = new Transform();
t._transform = function (data, encoding, next) {
  // do something with data chunk
  next(/* error or.. */, data); // push chunk on to readable side
};

from through2.

sonewman avatar sonewman commented on August 16, 2024 1

I think the issue is with your counting, and stuff happening async over the next ticks.

through2 also takes a second argument. This is a _flush function, which happens right at the end of the stream, after the last chunk has passed through. This is when you want to call .push(null).

Try this:

compileProject(config.project1.modules)
    .pipe(gulp.dest(buildPath)));

function compileProject(modules) {
    var Readable = require('stream').Readable;
    var generalStream = new Readable({ objectMode: true });

    generalStream._read = function (n) {};

    function oncomplete() {
      generalStream.push(null);
    }

    Object.keys(modules).forEach(key, i) {
      var flush;

      if ((i + 1) === modules.length) {
        flush = oncomplete;
      }

      compileModule(key, modules[key])
        .pipe(through.obj(function (file, enc, callback) {
            generalStream.push(file);

            callback(null, file);
        }, flush));
    });

    return generalStream;
}

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

F.e. I generate new file with config or some module (without loading any resources) and I want to work with it as with a stream (use it in gulp).

Now I use

var myStream = require('event-stream').readable(function (count, callback) {
    this.emit('data', 'some data');
    this.emit('end');

    callback();
});

myStream
  .pipe(process)
  .pipe(save)

Can I use through2 for this purpose?

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

if I use require('stream').Readable, I will have problems with using stream 3? I will need to rewrite code?

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

Thank!

P.S. stream and readable-stream doesn't work if I run stream.push(null) asynchronously (Error: not implemented)

from through2.

mafintosh avatar mafintosh commented on August 16, 2024

fyi from2 is a pretty sweet module to create readable streams and avoid all the subclassing noise

from through2.

sonewman avatar sonewman commented on August 16, 2024

@tamtakoe streams3 are my preferred option and can be installed by npm install [email protected].

Streams2 are mainly an issue if you are trying to listen to both readable and data events on a single stream.

from through2.

sonewman avatar sonewman commented on August 16, 2024

yeah it would be cool, and not too much of a change to do:

var readable = new Readable({
  read: function (n) {
    this.push('data')
    this.push(null)
  }
});

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

Many thanks to the proposed solutions! It is very useful to me!

Why not add a wrapper methods for creating streams in through2?

from through2.

sonewman avatar sonewman commented on August 16, 2024

Through2 is more about making a Through / Transform stream.

You could certainly make one for a Readable, and I actually wrote one for Writable using much of the same Ideas from this module.

But as mentioned above there are other options for readables. My thoughts are more towards io.js streams directly, to make them more user friendly.

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

doesn't work...

var Readable = require('stream').Readable;
var generalStream = new Readable();

generalStream._read = function (n) { };

...

modules.forEach(function(module) {)
    gulp.src(modules.files)
        ....
        .pipe(through.obj(function(file, enc, callback) {
            generalStream.push(file);
            //From this point the code doesn't run (why?)

            callback(null, file);

            if (isLastModule) {
                generalStream.push(null);
            }
        }));
})

return generalStream;

from through2.

sonewman avatar sonewman commented on August 16, 2024

Alas! the one thing i wondered if i needed to include in my example but didn't

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

It works!

myStream._read = function (n) { } looks strange. Why node.js developers didn't implement noop function as default value of _read?

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

I try from2

function read(n, next) {
    console.log(n);

    //var modulesInCounter = 0;
    //var modulesOutCounter = 0;
    var generalStream = this;

    for (var key in modules) {
       //modulesInCounter++;

       compileModule(key, modules[key], base)
            .pipe(through.obj(function (file, enc, callback) {
                //modulesOutCounter++;
                generalStream.push(file);

                callback(null, file);

                //if (modulesOutCounter >= modulesInCounter) {
                //    generalStream.push(null);
                //    next();
                //}
            }));
    }
}

return from.obj(read);
  1. Why the n (argument of read) is always 16
  2. Why it works without generalStream.push(null) or next()?

from through2.

sonewman avatar sonewman commented on August 16, 2024

@tamtakoe

a1) I have never used from2, but I imagine 16 is the default (hughsk/from2/index.js#L83). I wouldn't worry about that too much, in objectMode it is usually void anyway, since usually only one object is passed through at a time.

It is specific usually to the amount of bytes to read from the internal buffer on .read()

a2) I have a feeling this might be because _read() gets called only when a stream is consuming generalStream, if nothing is waiting for data from it via readable or data event then this will be waiting to be consumed before it tries to _read again.

(Obviously I can't see the rest of your code so I am making some assumptions here)

The result of what you have there is completely different to what you had before, this is the equivalent of:

var Readable = require('stream').Readable;
var generalStream = new Readable();

generalStream._read = function (n) {
    console.log(n);

    //var modulesInCounter = 0;
    //var modulesOutCounter = 0;
    var generalStream = this;

    for (var key in modules) {
       //modulesInCounter++;

       compileModule(key, modules[key], base)
            .pipe(through.obj(function (file, enc, callback) {
                //modulesOutCounter++;
                generalStream.push(file);

                callback(null, file);

                //if (modulesOutCounter >= modulesInCounter) {
                //    generalStream.push(null);
                //    next();
                //}
            }));
    }
};
...
return generalStream;

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

It is not equivalent. If you leave comments, _read works in an infinite loop. If uncomment code _read works several times.

My code. I use new Readable two times: for generalStream and for headStream and I have a problem with a streams that were created by me.

compileProject(config.project1.modules)
    .pipe(gulp.dest(buildPath)));

function compileProject(modules) {
    var Readable = require('stream').Readable;
    var generalStream = new Readable({ objectMode: true });

    generalStream._read = function (n) {
        var modulesInCounter = 0;
        var modulesOutCounter = 0;
        var generalStream = this;

        for (var key in modules) {
            modulesInCounter++;

            compileModule(key, modules[key])
                .pipe(through.obj(function (file, enc, callback) {
                    modulesOutCounter++;
                    generalStream.push(file);

                    callback(null, file);

                    if (modulesOutCounter >= modulesInCounter) {
                        generalStream.push(null);
                    }
                }));
        }
    };

    return generalStream;
}

function compileModule(name, params) {
    var Readable = require('stream').Readable;
    var headStream = new Readable({ objectMode: true });

    headStream._read = function (n) {
        var dependencies = params.deps.length ? '"' + params.deps.join('", "') + '"' : '';
        var content = 'angular.module("' + params.moduleName + '", [' + dependencies + ']);';

        var file = new File({
            path: 'head.js',
            contents: new Buffer(content)
        });
        this.push(file);
        this.push(null);
    };

    var scriptsStream   = gulp.src(params.scripts)
        .pipe(g.uglify({
            mangle: false
        }))

    var templatesStream = gulp.src(params.templates)
        .pipe(g.angularTemplatecache(name + '/templates.js', {
            module: params.moduleName,
            base: path.join(base, params.root, '/'),
            templateHeader: TEMPLATE_HEADER,
            templateFooter: TEMPLATE_FOOTER
        }))

    var moduleStream = series(headStream, scriptsStream, templatesStream)
        .pipe(g.concat(name + '.js'))

    return moduleStream;
}

from through2.

sonewman avatar sonewman commented on August 16, 2024

When you pass the function to from2 this is what happens:

Class.prototype._from = read
Class.prototype._read = function(size) {
  var self = this

  if (this._reading || this.destroyed) return
  this._reading = true
  this._from(size, check)
  function check(err, data) {
    if (self.destroyed) return
    if (err) return self.destroy(err)
    if (data === null) return self.push(null)
    self._reading = false
    if (self.push(data)) self._read()
  }
}

https://github.com/hughsk/from2/blob/master/index.js#L45-L59

So actually rather than calling push you could use the second argument since you are pass ._read(<number>, <Function>) and the Function takes an error and data, similar to the callback in a Transform stream (although in streams3 it can be undefined, it doesn't have to be null).

The issue with the above is that ._read() will only happen once, because after you call push(null) it signifies the end of the stream. Is that the desired effect?

from through2.

sonewman avatar sonewman commented on August 16, 2024

or even:

compileProject(config.project1.modules)
    .pipe(gulp.dest(buildPath)));

function compileProject(modules) {
    var Readable = require('stream').Readable;
    var generalStream = new Readable({ objectMode: true });

    var moduleKeys = Object.keys(modules);
    var count = 0;
    var total = moduleKeys.length;

    function compileModule_(key, index) {

      /* // streams3
      compileModule(key, modules[key])
        .on('data', function (file) {
          generalStream.push(file);

          if ((index + 1) === total) {
            generalStream.push(null);
          }
        });
      */

      // streams2
      var mod = compileModule(key, modules[key]);
      mod.on('readable', function () {
          generalStream.push(mod.read());

          if ((index + 1) === total) {
            generalStream.push(null);
          }
        });
    }

    generalStream._read = function (n) {
      compileModule_(moduleKeys[count], count);
      count++;
    };

    return generalStream;
}

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

It work correct! There is just one more little problem... It is very difficult (this code is at a lower level of abstraction than that used in the program).

I found how gulp.src creates stream using only through2 https://github.com/wearefractal/glob-stream/blob/master/index.js#L27-L41

I have successfully used it for headStream

var headStream = through.obj();
headStream.write(file);
headStream.end();

But it does not work for generalStream (works if you replace a string on the comments)

var generalStream = through.obj();

generalStream._read = function (n) {};

function oncomplete() {
    generalStream.end(); //generalStream.push(null);
}

var moduleKeys = Object.keys(modules);
var count = 0;
var total = moduleKeys.length;

for (var key in modules) {
    var flush;
    count++;

    if (count >= total) {
        flush = oncomplete;
    }

    compileModule(key, modules[key], base)
        .pipe(through.obj(function (file, enc, callback) {
            generalStream.write(file); //generalStream.push(file);

            callback(null, file);
        }, flush));
}

return generalStream;

from through2.

sonewman avatar sonewman commented on August 16, 2024

if you are using through.obj you do not want to set ._read remove that line as it will mess with the Transform stream internals.

from through2.

sonewman avatar sonewman commented on August 16, 2024

it is really up to you they kind of do the same thing, except that the through stream is adding writable stream functionality to the generalStream rather than it only being readable.

It's kinda of like using through streams as a hammer to solve all problems, but if it is simpler for you to understand your code it's probably more important, and since it looks like it is build code rather than production deployed application code.

It just means generalStream could be piped into, although i don't suggest that as it would make your code even more complicated.

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

Clear code is very important for me. I'm writing a build script, so performance and accuracy does not matter. Ideally, you do not even know about the internal implementation of stream, and perform only simple actions with streams (create, merge, fork, save, ...).

It remains to find out why it works without flush/generalStream.end()/generalStream.push(null)/etc. :-)

//It works! why?!
var generalStream = through.obj();

for (var key in modules) {
    compileModule(key, modules[key], base)
        .pipe(through.obj(function (file, enc, callback) {
            generalStream.write(file);

            callback(null, file);
        }));
}

return generalStream;

from through2.

sonewman avatar sonewman commented on August 16, 2024

Probably because all the files have been handled, the script just ends, nothing is waiting in the event loop.

But by never ending the stream you would never clear up the stream internals. So this would not be great for code running on a server.

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

How I can create chain of through2 streams?

I try

stream
    .pipe(throughStream1)
    .pipe(throughStream2)

//must be equivalent
throughStream = throughStream1.pipe(throughStream2)

stream
    .pipe(throughStream)

But it doesn't work

from through2.

sonewman avatar sonewman commented on August 16, 2024

I am confused 😕

throughStream = throughStream1.pipe(throughStream2)
// will yield => throughStream === throughStream2

So by doing this:

throughStream = throughStream1.pipe(throughStream2)

stream
    .pipe(throughStream)

you are actually doing this:

stream.pipe(throughStream2)

although throughStream1 is still set up to pipe into throughStream2 as well

It is not like a Promise where the following would create 4 promises and give you the result of them all:

return new Promise(() => {
  return new Promise(resolve => resolve({ data: true })
})
.then(result => {
  return new Promise(resolve => resolve(result.data)
});

pipe returns the stream that is being piped too:

readable.pipe(throughStream) // => throughStream

so

stream
    .pipe(throughStream1)
    .pipe(throughStream2)

is the same as doing:

throughStream1.pipe(throughStream2)
stream.pipe(throughStream1)

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

Hmm... Other way. Can I set stream data? Not chunk, but data as stream param?

stream
    .pipe(through.noop, function(callback) {
        this.data = {foo:bar};

        callback();
    }))
    .pipe(through.noop, function(callback) {
        console.log(this.data); //{foo:bar};

        callback();
    }))

this.data doesn't work. It may be possible in another way?

from through2.

sonewman avatar sonewman commented on August 16, 2024

No not in that way.
I'm not sure what you are trying to do here, because the second argument to .pipe is an object not a function:

stream.pipe(writable, { end: true|false })

so the function will not even be called.
It depends on how you are using this data object, couldn't you do something like:

;(function () {
  var data = {};

  stream
      .pipe(through2.obj(function(data, encoding, callback) {
          data[foo] = bar;

          callback(null, data);
      }))
      .pipe(through2.obj(function(data, encoding, callback) {
          console.log(data); // { foo:bar } ;

          callback(null, data);
      }));
}());

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

I thought about this method, but came up with a other way, it will not have make logic out of the stream.

//storage.js
var data = {};

module.export = function(key, value){
    return throught.obj(function(file, enc, callback) {
        if (value) {
            data[key] = value;
        } else {
            file.contents = new Buffer(data[key]);
        }
        callback(null, file)
    })
}
//gulpfile.js
var storage = require('storage')

stream
    .pipe(storage('foo','bar'))
    .pipe(storage('foo'))

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

New case with new stream
Windows, node.js v 0.12.2

File structure:

test1
|- script1.js
|- script2.js
|- ...
test2
...
test5

gulpfile.js:

gulp.task('test', function() {
    var streamsArr = [];

    for (var stream, i = 1; i < 5; i++) {
        stream = gulp.src(path.join(__dirname, 'test' + i, '**/*'))
            .pipe(through.obj(function(file, enc, callback) {
                setTimeout(function() {
                    callback(null, file);

                }, Math.ceil(Math.random()*100));
            }))
            .pipe(require('gulp-concat')('script.js'))
            .on('end', (function(i) {
                console.log('end of the stream ' + i);
            }).bind(null, i));

        streamsArr.push(stream);
    }

    require('gulp-merge')(streamsArr) //or stream-series, or merge-stream, etc. — all make new stream
        .pipe(through.obj(function(file, enc, callback){
            console.log('merged stream transform:', path.relative(__dirname, file.path));

            callback(null, file);

        }, function(callback) {
            console.log('merged stream flush');

            callback();
        }))
        .on('end', function() {
            console.log('end of the merged stream');
        });
});

console:

end of the stream 4
end of the stream 3
merged stream transform: test1\script.js
end of the stream 1
merged stream transform: test2\script.js
end of the stream 2

Somehow, part of a stream is lost. Also did not work flush and end event of merged stream.

from through2.

mafintosh avatar mafintosh commented on August 16, 2024

You have to drain the stream in your gulp-merge function since you are pushing data to the readable part of the stream (callback(null, file)). You can do this by piping it somewhere or calling .resume()

  require('gulp-merge')(streamsArr) //or stream-series, or merge-stream, etc. — all make new stream
        .pipe(through.obj(function(file, enc, callback){
            console.log('merged stream transform:', path.relative(__dirname, file.path));

            callback(null, file);

        }, function(callback) {
            console.log('merged stream flush');

            callback();
        }))
        .on('end', function() {
            console.log('end of the merged stream');
        })
        .resume() // add this

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

@mafintosh resume() doesn't work. I get something like that in console:

end of the stream 2
end of the stream 4
merged stream transform: test1\script.js
end of the stream 1
end of the stream 3

I did something like this, but it is too complicated and verbose

var through = require('through2');
var files = [];

for (var stream, i = 1; i < 5; i++) {
    stream = gulp.src(path.join(__dirname, 'test' + i, '**/*'))
        .pipe(through.obj(function(file, enc, callback) {
            setTimeout(function() {
                callback(null, file);

            }, Math.ceil(Math.random()*100));
        }))
        .pipe(through.obj(function(file, enc, callback) {
            files.push(file);

            if (i === 4) {
                createStream(files);
            }
        }))
}

function createStream(files) {
    var stream = through.obj();
    stream.pipe(through.obj(function(file, enc, callback) {

        callback();
    }, function(callback) {
        this.push.apply(this, files);

        callback();
    }))
    stream.end();
}

from through2.

mafintosh avatar mafintosh commented on August 16, 2024

@tamtakoe you aren't calling resume in that example?

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

@mafintosh I copy your example in my example

...
require('gulp-merge')(streamsArr)
    ...
    .resume() // add this
...

but it is not work correct. Maybe I did something wrong?

from through2.

tamtakoe avatar tamtakoe commented on August 16, 2024

@mafintosh There was a problem in the gulp-concat. I used v. 2.4.3, but in 2.5.2 it was fixed

from through2.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.