Git Product home page Git Product logo

Comments (6)

MattiasBuelens avatar MattiasBuelens commented on May 29, 2024

Cool idea! 😁

A few thoughts on buffering:

  • With a ReadableStream, a consumer can start multiple read()s without waiting for the previous read to complete. This is already handled for you by the async generator machinery: calling next() again while the async generator is still busy will queue up the call.
  • With a ReadableStream, a producer can enqueue multiple chunks without waiting for the previous chunks to be read by a consumer. Async generators don't support this: after you yield a value, you have to wait until the consumer calls next() again before you can yield the next one. For this, you're going to need to manage two buffers (one for each iterator) manually.

The solution from StackOverflow seems like a good starting point. However, with async iterators, you also have to handle the case where source.next() was already called but hasn't resolved yet, so it's not as simple as slapping some async and await keywords on it.

Garbage collection is indeed a concern. For ReadableStream.tee(), when one of the streams is cancelled, we make sure not to enqueue any more chunks to it.

  • To cancel an async iterator, you use its return() method. You can use a try..finally inside your async generator to perform cleanup logic for when this happens, so that seems like the ideal place to mark yourself as "cancelled" and discard any buffered chunks.
  • Alternatively, you can put the buffers inside a WeakRef, and have the two async iterators hold the only strong reference of their respective buffer. When one of the async iterators is no longer used, its buffer can be garbage collected.

I'm thinking:

  • Have two buffers, one for each async iterator.
  • Have a shared read promise, initially undefined.
  • Each async iterator consists of an infinite loop. In each iteration:
    1. If our buffer contains an element, shift() it and yield it.
    2. Otherwise, if we aren't already reading from source, create a new promise that calls source.next() and stores the result in both buffers.
    3. Otherwise, await the current read promise. Afterwards, we're guaranteed to have at least one element in our buffer, which we'll yield in the next iteration. (Okay, well, at least if next() didn't return { done: true }. I guess we also need a shared done flag... 😅)

I must say, you have sparked my interest. If I find some time this evening, I'll give it a try myself. 😛

from web-streams-polyfill.

jimmywarting avatar jimmywarting commented on May 29, 2024

Async generators don't support this: after you yield a value, you have to wait until the consumer calls next() again before you can yield the next one

I'm aware of that. i will basically not have a bucket/queue/sink filled up with multiple reads for performances. They would have to call next to advance

planning on doing something like stream.Readable.from(iterator, { highWaterMark }) afterwards to solve this issue
(or not at all if the pipe chain is sort)

Alternatively, you can put the buffers inside a WeakRef
Have two buffers, one for each async iterator.

I have been thinking of using WeakRef, but it's so new and more complicated :P

I hope that i did not have to use two buffers (arrays) that calls shift and push.
if you study quickly/briefly how this two libraries works

you will see that the common goal is to have a Node that have a "branch" to the next/prev Node without a array
if i could implement a similar strategy that only links to the next item then maybe i could have two (tee:ed) iterators that would simply just read from the same object in a "forward" only action

// Encueued or yielded the following values:
// a,b,c,d,e,f

// Would result in something like this:
const branch = {
  value: 'a',
  next: {
    value: 'b',
    next: {
      value: 'c',
      next: promise
    }
  }
}

if the tee:ed iterator 1 is on value b (branch.next) and iterator 2 on value c (branch.next.next)
then a (and variable branch) can be GC'ed with ease

The two iterators would basically read from the same source (branch object) but being on different path

I have also speculated a long (recursive) promise chain, something like

currentValue = it.next()
currentValue = currentValue.then(yield_value).then(it.next)

I must say, you have sparked my interest. If I find some time this evening, I'll give it a try myself. 😛

Looking forward to it. Hope you can come up with something cool

from web-streams-polyfill.

jimmywarting avatar jimmywarting commented on May 29, 2024

Sorry if this is totally unrelated to the stream specification.
It's not really any issue with your source

I just hope i could code golf a lite bit with you since you have worked so much with iterator, generators and streams. I just wanted to do something like ReadableStream.prototype.tee dose but with async generators

Maybe this can lead to some .tee() performances/breakthrough that could turn up to be more GC friendly? 🤷‍♂️
(if it isn't already GC friendly somehow)

Like:

Garbage collection is indeed a concern. For ReadableStream.tee(), when one of the streams is cancelled, we make sure not to enqueue any more chunks to it.

What if the stream isn't closed or cancelled? we stumble upon a issue where developers would simply just don't care about one of the two readable streams after cloning it (but this was for node:streams). it was just simply left in the dark to be GC'ed at some point

from web-streams-polyfill.

MattiasBuelens avatar MattiasBuelens commented on May 29, 2024

I'm aware of that. i will basically not have a bucket/queue/sink filled up with multiple reads for performances. They would have to call next to advance

planning on doing something like stream.Readable.from(iterator, { highWaterMark }) afterwards to solve this issue
(or not at all if the pipe chain is sort)

Makes sense. 👍

you will see that the common goal is to have a Node that have a "branch" to the next/prev Node without a array
if i could implement a similar strategy that only links to the next item then maybe i could have two (tee:ed) iterators that would simply just read from the same object in a "forward" only action

That might work.

The polyfill already uses a linked list of bounded arrays of chunks for its queues. It .push()es new chunks onto the current array, and if that array becomes too large it creates a new node with a fresh empty array. When popping a chunk of the queue, it merely clears the element from the array and increments a pointer, to avoid having to repeatedly shrink the size of the array. This works well for a queue with a single write head and a single read head, but it wouldn't work for the case of tee(). Although, perhaps it's feasible to make a version with two read heads...? 🤔

Maybe this can lead to some .tee() performances/breakthrough that could turn up to be more GC friendly? 🤷‍♂️
(if it isn't already GC friendly somehow)

Chromium uses a dedicated HeapDequeue for QueueWithSizes, and even a dedicated TeeEngine for all of the teeing logic. IIRC Gecko and WebKit also use optimized data structures in their implementations. So we're not going to beat C++ here. 😛

What if the stream isn't closed or cancelled? we stumble upon a issue where developers would simply just don't care about one of the two readable streams after cloning it (but this was for node:streams). it was just simply left in the dark to be GC'ed at some point

Good question. Right now, the streams standard doesn't do anything in that case.

One could argue that if the ReadableStream is only reachable through its controller (i.e. only its underlying source is alive), then it can no longer be consumed and we should prevent further enqueues. Perhaps we should then automatically cancel or error the stream? I'm not sure what the consequences of that would be, though. 🤔

from web-streams-polyfill.

MattiasBuelens avatar MattiasBuelens commented on May 29, 2024

All right, here's my first naive implementation. No optimizations whatsoever, but it can serve as a good starting point. 🙂

(I wrote it in TypeScript. If that bothers you, just run it through tsc or the TypeScript playground first. 😛)

/// <reference lib="es2018.asyncgenerator" />

type Pair<T> = [T, T];

function teeAsyncIterator<T>(iterable: AsyncIterable<T>): [AsyncIterable<T>, AsyncIterable<T>] {
  const source = iterable[Symbol.asyncIterator]();

  const buffers: Pair<T[] | undefined> = [[], []];
  let reading = false;
  let done = false;
  let currentRead: Promise<void>;

  async function next(): Promise<void> {
    reading = true;
    const result = await source.next();
    if (result.done) {
      done = true;
    } else {
      buffers[0]?.push(result.value);
      buffers[1]?.push(result.value);
    }
    reading = false;
  }

  async function* branch(i: 0 | 1): AsyncGenerator<T, undefined, undefined> {
    try {
      while (true) {
        if (buffers[i]!.length) {
          yield buffers[i]!.shift()!;
        } else if (done) {
          return;
        } else {
          if (!reading) {
            currentRead = next();
          }
          await currentRead;
        }
      }
    } finally {
      buffers[i] = undefined;
    }
  }

  return [branch(0), branch(1)];
}

EDIT: I moved it to a Gist, to make it a bit easier to track changes. 😉

from web-streams-polyfill.

jimmywarting avatar jimmywarting commented on May 29, 2024

Cool, closing and moving the discussion to the gist!

from web-streams-polyfill.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.