Comments (6)
Cool idea!
A few thoughts on buffering:
- With a
ReadableStream
, a consumer can start multipleread()
s without waiting for the previous read to complete. This is already handled for you by the async generator machinery: callingnext()
again while the async generator is still busy will queue up the call. - With a
ReadableStream
, a producer can enqueue multiple chunks without waiting for the previous chunks to be read by a consumer. Async generators don't support this: after youyield
a value, you have to wait until the consumer callsnext()
again before you can yield the next one. For this, you're going to need to manage two buffers (one for each iterator) manually.
The solution from StackOverflow seems like a good starting point. However, with async iterators, you also have to handle the case where source.next()
was already called but hasn't resolved yet, so it's not as simple as slapping some async
and await
keywords on it.
Garbage collection is indeed a concern. For ReadableStream.tee()
, when one of the streams is cancelled, we make sure not to enqueue any more chunks to it.
- To cancel an async iterator, you use its
return()
method. You can use atry..finally
inside your async generator to perform cleanup logic for when this happens, so that seems like the ideal place to mark yourself as "cancelled" and discard any buffered chunks. - Alternatively, you can put the buffers inside a
WeakRef
, and have the two async iterators hold the only strong reference of their respective buffer. When one of the async iterators is no longer used, its buffer can be garbage collected.
I'm thinking:
- Have two buffers, one for each async iterator.
- Have a shared read promise, initially
undefined
. - Each async iterator consists of an infinite loop. In each iteration:
- If our buffer contains an element,
shift()
it and yield it. - Otherwise, if we aren't already reading from
source
, create a new promise that callssource.next()
and stores the result in both buffers. - Otherwise, await the current read promise. Afterwards, we're guaranteed to have at least one element in our buffer, which we'll yield in the next iteration. (Okay, well, at least if
next()
didn't return{ done: true }
. I guess we also need a shareddone
flag...😅 )
- If our buffer contains an element,
I must say, you have sparked my interest. If I find some time this evening, I'll give it a try myself.
from web-streams-polyfill.
Async generators don't support this: after you yield a value, you have to wait until the consumer calls next() again before you can yield the next one
I'm aware of that. i will basically not have a bucket/queue/sink filled up with multiple reads for performances. They would have to call next to advance
planning on doing something like stream.Readable.from(iterator, { highWaterMark })
afterwards to solve this issue
(or not at all if the pipe chain is sort)
Alternatively, you can put the buffers inside a WeakRef
Have two buffers, one for each async iterator.
I have been thinking of using WeakRef, but it's so new and more complicated :P
I hope that i did not have to use two buffers (arrays) that calls shift and push.
if you study quickly/briefly how this two libraries works
- https://github.com/sindresorhus/yocto-queue
- https://github.com/WebReflection/linkedom/blob/HEAD/deep-dive.md
you will see that the common goal is to have a Node
that have a "branch" to the next/prev Node
without a array
if i could implement a similar strategy that only links to the next item then maybe i could have two (tee:ed) iterators that would simply just read from the same object in a "forward" only action
// Encueued or yielded the following values:
// a,b,c,d,e,f
// Would result in something like this:
const branch = {
value: 'a',
next: {
value: 'b',
next: {
value: 'c',
next: promise
}
}
}
if the tee:ed iterator 1 is on value b (branch.next
) and iterator 2 on value c (branch.next.next
)
then a
(and variable branch
) can be GC'ed with ease
The two iterators would basically read from the same source (branch
object) but being on different path
I have also speculated a long (recursive) promise chain, something like
currentValue = it.next()
currentValue = currentValue.then(yield_value).then(it.next)
I must say, you have sparked my interest. If I find some time this evening, I'll give it a try myself.
😛
Looking forward to it. Hope you can come up with something cool
from web-streams-polyfill.
Sorry if this is totally unrelated to the stream specification.
It's not really any issue with your source
I just hope i could code golf a lite bit with you since you have worked so much with iterator, generators and streams. I just wanted to do something like ReadableStream.prototype.tee
dose but with async generators
Maybe this can lead to some .tee()
performances/breakthrough that could turn up to be more GC friendly?
(if it isn't already GC friendly somehow)
Like:
Garbage collection is indeed a concern. For ReadableStream.tee(), when one of the streams is cancelled, we make sure not to enqueue any more chunks to it.
What if the stream isn't closed or cancelled? we stumble upon a issue where developers would simply just don't care about one of the two readable streams after cloning it (but this was for node:streams
). it was just simply left in the dark to be GC'ed at some point
from web-streams-polyfill.
I'm aware of that. i will basically not have a bucket/queue/sink filled up with multiple reads for performances. They would have to call next to advance
planning on doing something like
stream.Readable.from(iterator, { highWaterMark })
afterwards to solve this issue
(or not at all if the pipe chain is sort)
Makes sense.
you will see that the common goal is to have a
Node
that have a "branch" to the next/prevNode
without a array
if i could implement a similar strategy that only links to the next item then maybe i could have two (tee:ed) iterators that would simply just read from the same object in a "forward" only action
That might work.
The polyfill already uses a linked list of bounded arrays of chunks for its queues. It .push()
es new chunks onto the current array, and if that array becomes too large it creates a new node with a fresh empty array. When popping a chunk of the queue, it merely clears the element from the array and increments a pointer, to avoid having to repeatedly shrink the size of the array. This works well for a queue with a single write head and a single read head, but it wouldn't work for the case of tee()
. Although, perhaps it's feasible to make a version with two read heads...?
Maybe this can lead to some
.tee()
performances/breakthrough that could turn up to be more GC friendly?🤷♂️
(if it isn't already GC friendly somehow)
Chromium uses a dedicated HeapDequeue
for QueueWithSizes
, and even a dedicated TeeEngine
for all of the teeing logic. IIRC Gecko and WebKit also use optimized data structures in their implementations. So we're not going to beat C++ here.
What if the stream isn't closed or cancelled? we stumble upon a issue where developers would simply just don't care about one of the two readable streams after cloning it (but this was for
node:streams
). it was just simply left in the dark to be GC'ed at some point
Good question. Right now, the streams standard doesn't do anything in that case.
One could argue that if the ReadableStream
is only reachable through its controller (i.e. only its underlying source is alive), then it can no longer be consumed and we should prevent further enqueues. Perhaps we should then automatically cancel or error the stream? I'm not sure what the consequences of that would be, though.
from web-streams-polyfill.
All right, here's my first naive implementation. No optimizations whatsoever, but it can serve as a good starting point.
(I wrote it in TypeScript. If that bothers you, just run it through tsc
or the TypeScript playground first.
/// <reference lib="es2018.asyncgenerator" />
type Pair<T> = [T, T];
function teeAsyncIterator<T>(iterable: AsyncIterable<T>): [AsyncIterable<T>, AsyncIterable<T>] {
const source = iterable[Symbol.asyncIterator]();
const buffers: Pair<T[] | undefined> = [[], []];
let reading = false;
let done = false;
let currentRead: Promise<void>;
async function next(): Promise<void> {
reading = true;
const result = await source.next();
if (result.done) {
done = true;
} else {
buffers[0]?.push(result.value);
buffers[1]?.push(result.value);
}
reading = false;
}
async function* branch(i: 0 | 1): AsyncGenerator<T, undefined, undefined> {
try {
while (true) {
if (buffers[i]!.length) {
yield buffers[i]!.shift()!;
} else if (done) {
return;
} else {
if (!reading) {
currentRead = next();
}
await currentRead;
}
}
} finally {
buffers[i] = undefined;
}
}
return [branch(0), branch(1)];
}
EDIT: I moved it to a Gist, to make it a bit easier to track changes.
from web-streams-polyfill.
Cool, closing and moving the discussion to the gist!
from web-streams-polyfill.
Related Issues (20)
- Use in a streaming library for nodejs and browsers HOT 6
- dom-exception.ts HOT 6
- closing a stream followed by respond(0) can error if close is not syncronously putting stream in closed state HOT 5
- `pipeTo` doesn't use `signal.reason` as error HOT 2
- Example for polyfilling TextDecoderStream in previous browsers HOT 2
- `pipeTo` with `preventCancel: true` never settle if `readable` doesn't produce new chunks HOT 2
- Upgrading to Typescript 4.9 results in error TS2322 when using web-streams-polyfill HOT 3
- Abilities to support web streams under IE8-? HOT 3
- signal with pre aborted HOT 5
- Running WPT HOT 13
- Ready to boost your popularity to like 22 mil download / week? HOT 14
- Upgrading to Typescript 4.4 results in error TS2345 when using web-streams-polyfill HOT 5
- ERR_UNSUPPORTED_DIR_IMPORT HOT 2
- Building with rollup ends with circular dependency warnings HOT 2
- `Cannot abort a stream that already has a writer` HOT 9
- "TypeError: Failed to execute 'pipeThrough' on 'ReadableStream'" HOT 4
- Issue with dynamic import HOT 3
- The stream is not in a state that permits enqueue; new Response(readable).blob().text() resolve to [object ReadableStream] not underlying source HOT 10
- ReadableStream and File/Blob HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from web-streams-polyfill.