repeaterjs / repeater Goto Github PK
View Code? Open in Web Editor NEWThe missing constructor for creating safe async iterators
Home Page: https://repeater.js.org
License: MIT License
The missing constructor for creating safe async iterators
Home Page: https://repeater.js.org
License: MIT License
I think it would be handy to have array-like helper functions in the core or utils/helpers package.
const numbersChannel = new Channel<number>(() => {});
const oddNumbersChannel = filterChannel(numbersChannel, item => item % 2 !== 0);
const evenNumbersChannel = filterChannel(numbersChannel, item => item % 2 === 0);
const stringOddNumbersChannel = mapChannel(oddNumbersChannel, item => item.toString());
Can't think of about any other from the top of my head. These should probably be functions and not instance level methods so the API doesn't differ from async iterators.
Reported by @elderapo.
When Repeater.race
is called with an iterator and a promise which doesn’t settle, memory usage grows until the process crashes.
Reproduction:
import * as crypto from "crypto";
import {Repeater} from "@repeaterjs/repeater";
async function randomString(): Promise<string> {
await new Promise((resolve) => setTimeout(resolve, 1));
return crypto.randomBytes(10000).toString("hex")
}
async function *createStrings() {
while (true) {
yield randomString();
}
}
function map<T, U>(iter: AsyncIterable<T>, fn: (value: T) => U): AsyncGenerator<U> {
return new Repeater(async (push, stop) => {
for await (const value of Repeater.race([iter, stop])) {
push(fn(value as T) as U);
}
stop();
});
}
(async function main() {
let i = 0;
for await (const value of map(createStrings(), (value) => value.split(""))) {
if (i++ % 1000 === 0) {
const usage = process.memoryUsage();
const rssMiB = Math.round(usage.rss / (1024 ** 2) * 100) / 100;
console.log(`Resident Set: ${rssMiB} MiB`);
}
}
})();
This will log an increasing resident set size until the process runs out of memory. It seems to be exacerbated by what is passed through the iterator which is racing the stop
promise.
readmes at
https://www.npmjs.com/package/@repeaterjs/timers
https://www.npmjs.com/package/@repeaterjs/pubsub
https://www.npmjs.com/package/@repeaterjs/limiters
say go to https://repeater.js.org/ where there is no info about these packages
Chase that React clout 😏
I'm attempting to use repeaters in a react-native typescript project. When I try to for await any iterator it throws the exception
TypeError: Invalid attempt to iterate non-iterable instance.
This happens running the example verbatim from the documentation. Not really sure what's going on here. It must be something about my typescript configuration?
"@repeaterjs/repeater": "^3.0.4",
tsconfig.json
{
"compilerOptions": {
"sourceMap": true,
"allowJs": false,
"resolveJsonModule": true,
"allowSyntheticDefaultImports": true,
"esModuleInterop": true,
"experimentalDecorators": true,
"jsx": "react",
"lib": ["es2019"],
"moduleResolution": "node",
"noErrorTruncation": true,
"noImplicitReturns": true,
"target": "ESNext",
"strict": true,
"noImplicitAny": true,
"noFallthroughCasesInSwitch": true,
"noEmit": true,
"skipLibCheck": true,
"types": ["node", "jest"],
"typeRoots": ["./node_modules/@types"]
},
"include": ["src"],
"exclude": [
"**/node_modules",
"node_modules",
"**/node_modules/**/*",
"**/.*/",
"babel.config.js",
"metro.config.js",
"jest.config.js",
"ios",
"android",
".history",
".tsbuild"
]
}
Repeater.prototype.throw
was initially implemented just to have parity with async generator objects in #2. It was designed so that it called stop
on the repeater with an error and return
ed the final value, allowing consumers to catch the error, but after some thinking I’m wondering if we can also give producers a chance to catch the error as well. Specifically, what if calling throw
caused the stop
promise to reject? I was initially hesitant to implement throw
in this way because I was worried about unhandled promise rejections, but after using repeaters for a while I think all responsible/non-toy repeaters involve some cleanup where the stop
promise is awaited, and if the executor awaits the stop
promise, throwing an error into the repeater should have the same effect. This means that the repeater would not be stopped if the executor caught the stop
rejection, mimicking the behavior where generators can continue to yield values after a thrown error using try
/catch
/finally
.
Currently, there is no synchronous and easy way of checking if the channel is closed. It would be very useful for debugging purposes. The best current solution I can think of is checking if stop
promise has been resolved but this seems both hacky and there is no synchronous way of checking it.
There is however a question if allowing this is a great idea. I think it might make it easier to write some buggy/leaky code. For example, I was merging some channels in a very specific way and needed to act accordingly when other channels got closed, etc.
The API I suggest is channel.isClosed
/channel.isClosed()
.
Had the realization that we can make close thenable, reducing the number of arguments passed to the executor from three to two. It never really made sense what the difference between close
and stop
was (any word used to describe “finishing” a channel is likely both a noun and a verb), and the naming didn’t clear up the distinction. By having close be promise-like, we can reduce the number of times we pass close but ignore the argument.
Maybe close
should be renamed to stop
as well. I keep running into issues where I forget to pass close
but window.close
exists, causing confusing errors. push
and stop
are both four letters too, which is a plus.
This is a super-breaking change but we’re below 1.0 so whatever.
Hey,
I just wanted to play with repeater, and I was curious whether is it possible for example to convert from Rx.Observable to repeater and back.
Maybe you can point me to some example.
Thanks!
It would be embarrassing to argue that one of our key features of channels is memory safety and suffer from a memory leak bug. Figure out a way to rigorously test channels for memory growth.
Maybe something like the test from this issue: leebyron/iterall#25
The next version of typescript will include types for AsyncGenerator
which parameterize the values passed to next
and return
. Blocked by microsoft/TypeScript#30790
Lazy initialization is a cool feature but not always needed/good. There are situations where it leads to a less predictable outcome or just makes things harder.
I think it should remain on by default but should be configurable by like new Channel<T>(..., { lazy: false })
Running into an issue with repeaterjs 3.0.4, my repeater is crashing when I try to stop it by calling return
.
Error: WeakMap error
at Repeater.return
.....my app stack
Not a lot to go on here. I'm calling return()
to stop a repeater and create a new one when some input conditions change. I'm wrapping a react-native-firebase firestore onSnapshot listener in a repeater and my query can change so I need to stop and re-create the listener and repeater when that happens.
I am not sure why push
is asynchronous. From a quick look at the code, it seemed like there are only wrappers like Promise.resolve(value)
.
It complicated things because you have to either use await with every push which leads to more complex code (you have to consider that other things might execute meanwhile there is a push to the channel) or omit the await and just treat it as a synchronous call which makes eslint go nuts because it doesn't want to allow floating promises.
I am reviewing the library, and running thruogh the documentation, I came across this:
Being a callback-based API makes using observables with async/await and promises awkward; in fact, observables suffer from the same issue of “callback hell” which promises were designed to solve. Observable libraries are aware of this and provide “higher-order observable operators,” which work on observables of observables, but these solutions are seldom used and virtually incomprehensible to human beings, who are unaccustomed to thinking in extradimensional spaces.
I have been using observables for a while and I wonder what you meant by callback hell occurring also with observables. Do you have an example? If by higher-order observable operators, you meant flatMap
or switchMap
and the likes, I have actually been using those pretty often, and I would disagree that they are incomprehensible to human beings. That is quite a strong statement.
Aside from developer experience, I guess my next question is: is there something that you can write with repeaters that you cannot with observables? Or to say the same, are repeaters more expressive than observables? I can think about backpressure as a possible candidate, but I haven't been able to figure the answer from the documentation to figure that out so asking here.
Thanks!!
Channel.race
Channel.merge
Channel.zip
Channel.latest
Channel.race
calls Promise.race
on each iteration, Channel.merge
merges all async iterators into a single iterator, Channel.zip
calls Promise.all
on each iteration, and Channel.latest
returns an iterator which iterates with the latest values when any individual iterator updates.
Allow push calls to pass a promise rather than a naked value. Mimic the async generator behavior where generators unwrap yielded promises before iteration resolves. This makes types like Channel<Promise<T>>
impossible to create, but I think that’s a good thing. It’s similarly impossible to create a promise of a promise Promise<Promise<T>>
.
The one thing we need to figure out is what to do if the passed promise rejects. I think it’s reasonable to close the channel and finish the iterator (clear the buffer) if this happens, because it implies that the error has been received before any pending values.
Opening a separate issue from #49 for the sake of people googling why their process is just dying on them.
It seems a bit too easy to accidentally cause the event loop to die:
const { Repeater } = require('@repeaterjs/repeater')
const test = new Repeater((push, stop) => {
push(1)
// stop()
})
;(async () => {
for await (const value of test) {
console.log(value)
}
// event loop dies before this point unless you uncomment stop() above
console.log('done')
})()
Currently, channels will run the executor in a promise after next is called for the first time. It would be better if the executor is run synchronously during the first call so we don’t defer to the microtask queue.
Currently we resolve push
results to the value passed to next
. However, the repeater implementation is off-by-one in the sense that the first push
call resolves to the first value passed to next
, the second push
to the second value and so on. This behavior differs from generators, which will resume yield with the value passed to the next call to next
, dropping the first value passed to next
entirely. To mimic async generators, we should be resolving the first push
to the second value passed to next
, and so on.
In other words, the following test is wrong:
test("pushes resolve to value passed to next", async () => {
let push: (value: number) => Promise<number | void>;
const repeater = new Repeater(async (push1) => {
push = push1;
});
const next1 = repeater.next(-1);
const push1 = push!(1);
await expect(push1).resolves.toEqual(-1);
await expect(next1).resolves.toEqual({ value: 1, done: false });
const push2 = push!(2);
await expect(repeater.next(-2)).resolves.toEqual({ value: 2, done: false });
await expect(push2).resolves.toEqual(-2);
});
The correct version of this test should be:
let push: (value: number) => Promise<number>;
const repeater = new Repeater(async (push1) => {
push = push1;
});
const next1 = repeater.next(-1);
const next2 = repeater.next(-2);
const next3 = repeater.next(-3);
const next4 = repeater.next(-4);
const push1 = push!(1);
const push2 = push!(2);
const push3 = push!(3);
const push4 = push!(4);
await expect(next1).resolves.toEqual({ value: 1, done: false });
await expect(next2).resolves.toEqual({ value: 2, done: false });
await expect(next3).resolves.toEqual({ value: 3, done: false });
await expect(next4).resolves.toEqual({ value: 4, done: false });
await expect(push1).resolves.toEqual(-2);
await expect(push2).resolves.toEqual(-3);
await expect(push3).resolves.toEqual(-4);
await expect(
Promise.race([
push4,
new Promise((resolve) => setTimeout(() => resolve(-1000), 1)),
]),
).resolves.toEqual(-1000);
Hi, noticing in my tests that when i'm trying to use the stop promise in a call to Promise.race, it's not prioritising this promise even though it's clearly settled. I'm guessing this is happening because Stop is not a "native" promise, and Promise.race is not seeing it as immediately settled?
To demonstrate, here's some code that runs to completion:
async function* increment() {
let count = 0;
for (;;) {
++count;
await promisify(nextTick)(); // this line is required or we never exit
yield count;
}
}
const repeater = new Repeater<number>(async (push, stop) => {
const iter = increment()[Symbol.asyncIterator]();
//const stop2 = stop.then((a) => a); // workaround
for (;;) {
const result = await Promise.race([stop, iter.next()]); // workaround: use stop2 here instead of stop
if (result === undefined) {
// stopped by our caller
await iter.return();
break;
}
if (result.done) {
// source iterator is done
stop();
break;
}
await push(result.value);
}
});
for await (const count of repeater) {
if (count >= 5) {
break;
}
}
If you now try running this but this time comment out the "nextTick" line, you'll note the code hangs. What's actually happening is the implicit call to end the repeater on the final loop's break never ends, as the code is stuck in the repeater loop with the call to Promise.race always returning the promise result for iter.next, despite stop having clearly resolved and being listed first.
To "fix" this behaviour I can uncomment the workaround line and use stop2 in the race call. This then causes race to resolve to the stop2 promise and we therefore exit the loop.
Please let me know if there's a better way to structure this code (dealing with never ending source async iterators) to avoid this entirely.
Promises which are passed to Repeater.race are added to the inferred yield of the returned iterator. It should be added to the return value instead somehow.
The yarn monorepo format is pain to deal with, is difficult to integrate with github/javascript/typescript tools and hasn’t really been useful in any meaningful way. I’ve twice accidentally published dependent packages without updating internal dependencies because packages are symlinked together. I have a github org with lots of space so why not use it to its full potential, and allow for better discoverability.
A “Channel” is a generic and widely used concept in programming. There are lots of libraries (Google’s BrowserChannel
, all the csp Channel libraries that this library is in direct competition with) and DOM API’s (MessageChannel
, RTCDataChannel
, BroadcastChannel
), such that I feel like it’s in our interest, both for SEO and clarity to rename the base class and package name for this library.
I want something that communicates that this is THE constructor for custom async iterators, especially when dealing with callbacks, so maybe a synonym of iterator, with connotations that it is mutable, ephemeral, reactive, asynchronous, concurrent.
3.0 was released last November and I’m itching to do some breaking changes. Here’s what I’m planning:
Move the combinator functions to their own module. EDIT: I’m starting to think the static methods should stay on the class, even if we implement the other combinator functions.
I initially wrote combinators as static methods (Repeater.race
) on the exported Repeater class, both on the assumption that they would be commonly used and because it draws a nice parallel with Promise.race
. If repeaters ever end up becoming a standard thing, this might be how the functions are defined, but as a library, having the combinator functions exposed as static methods messes with the bundle size, and is not tree-shakeable. Additionally, it’s been brought to my attention (#48) that unlike my previous assumption, there actually may be a need for repeaters in the implementation of combinators like map
and filter
and creating a module for these functions all defined in one place would be nice. I’ve checked out some other combinator libraries and it seems like many of them leak memory and cause deadlocks, so there is going to be value in creating a combinator module, especially for combinators like switchMap
, which require a high degree of concurrency.
Kill the monorepo. Package monorepos are annoying to deal with. I have made the mistake of not updating the internal versions of packages multiple times, there’s duplicated tooling across the packages, and I hate having to navigate a multi-level directory structure for what are essentially single-file modules. My plan is to get all the modules exported under the main repeater package and deprecate the other packages.
Kill pubsub. I’m probably the only one using the pubsub module right now, but after a year of async iterator usage, what I believe is that you should always start with an EventTarget or callback-based abstraction when writing custom classes. Async iterators are high-level, always async abstractions and we shouldn’t encourage people to write abstractions which exclusively use them.
Rename some functions. One thing I’m noticing is that the function names in limiters
and timers
are squatting on the expected name for the actual repeater object (const ??? = timeout(1000)
). Most of the time you inline the function call directly in the for await
statement, but for cases where you need to assign the repeater to a variable the naming can be kind of annoying. I’d like to maybe adopt some kind of consistent naming pattern like createTimeout
, genTimeout
or maybe sourceTimeout
.
Stop repeaters when the executor settles. EDIT: April 2021 I think this one also might not be worth it. I’m reading through code which uses Repeaters in the wild, and it might be a pain in the butt to have to fix all that code. This change strikes me as being less explicit and more confusing. (#49) Related to implementing combinators, I want to implicitly stop repeaters when the executor function returns. Whenever I write combinators I consistently feel compelled to write stop
calls directly before the return statement or in a finally block. This is probably one of the more disruptive breaking changes as it will cause repeaters to stop unexpectedly if you didn’t await anything in the executor, but you can reconstruct the old behavior by awaiting or returning the stop
promise. If your repeater executor exits synchronously, I encourage you to somehow await or return the stop promise to prepare for this change.
yarn run v1.22.4
$ jest --config ./jest.config.js --rootDir ./ --color --no-colors
PASS src/__tests__/buffers.ts
FAIL src/__tests__/repeater.ts
● Repeater › throw method caught in async function
expect(received).resolves.toEqual()
Received promise rejected instead of resolved
Rejected to value: [Error: throw method caught in async function]
1207 | });
1208 | await expect(r.next()).resolves.toEqual({ value: 0, done: false });
> 1209 | await expect(r.throw(error)).resolves.toEqual({
| ^
1210 | value: 1,
1211 | done: false,
1212 | });
at expect (../../node_modules/expect/build/index.js:134:15)
at Object.<anonymous> (src/__tests__/repeater.ts:1209:11)
FAIL src/__tests__/combinators.ts (12.27 s)
● combinators › Repeater.race › generator vs Promise.resolve
expect(received).resolves.toEqual(expected) // deep equality
- Expected - 2
+ Received + 2
Object {
- "done": false,
- "value": 1,
+ "done": true,
+ "value": "z",
}
40 | Promise.resolve("z"),
41 | ]);
> 42 | await expect(iter.next()).resolves.toEqual({ value: 1, done: false });
| ^
43 | await expect(iter.next()).resolves.toEqual({ value: "z", done: true });
44 | await expect(iter.next()).resolves.toEqual({ done: true });
45 | });
at Object.toEqual (../../node_modules/expect/build/index.js:198:20)
at Object.<anonymous> (src/__tests__/combinators.ts:42:42)
● combinators › Repeater.merge › Promise.resolve vs generator
expect(received).toEqual(expected) // deep equality
- Expected - 1
+ Received + 1
Array [
- 1,
-1,
+ 1,
2,
3,
4,
5,
]
257 | }
258 | } while (!result.done);
> 259 | expect(nums).toEqual([1, -1, 2, 3, 4, 5]);
| ^
260 | await expect(iter.next()).resolves.toEqual({ done: true });
261 | });
262 |
at Object.<anonymous> (src/__tests__/combinators.ts:259:20)
Test Suites: 2 failed, 1 passed, 3 total
Tests: 3 failed, 140 passed, 143 total
Snapshots: 0 total
Time: 15.389 s
Ran all test suites.
error Command failed with exit code 1.
info Visit https://yarnpkg.com/en/docs/cli/run for documentation about this command.
Seems like some race conditions in tests which rely on async generators running in a specific order compared to async functions, as well as throwing promises not working.
which I changed to make sure the map does not hang.
...but there are a lot of failing tests now in yaacovCR/graphql-executor#54
I am not so concerned right now about the tests in https://github.com/yaacovCR/graphql-executor/blob/reform-map/src/execution/__tests__/mapAsyncIterator-test.ts because I am not sure how correct that file is.
But there are functional differences it seems in the order of next/return calls vs next/throw calls in the tests here https://github.com/yaacovCR/graphql-executor/blob/c254cdcc7254b07f583c6391d2103fe8e2761a52/src/execution/__tests__/subscribe-test.ts#L889-L976 where the first passes, and the second fails -- I am not sure which should be correct, but I would think they should be the same.
Channels are meant to be used alongside async generators, not compete with them, so we want the docs to be clear about when you’d use which.
I don't know if this causes problems in TS, but I noticed it because I'm trying to convert the defs to Flow, so I wanted to point it out.
The AsyncIterator
interface is
// es2018.asyncgenerator.d.ts
interface AsyncGenerator<T = unknown, TReturn = any, TNext = unknown>
extends AsyncIterator<T, TReturn, TNext> {
// NOTE: 'next' is defined using a tuple to ensure we report the correct assignability errors
// in all places.
next(...args: [] | [TNext]): Promise<IteratorResult<T, TReturn>>;
return(value: TReturn): Promise<IteratorResult<T, TReturn>>;
throw(e: any): Promise<IteratorResult<T, TReturn>>;
[Symbol.asyncIterator](): AsyncGenerator<T, TReturn, TNext>;
}
next
and return
don't accept PromiseLike
the way they do in Repeater
.
One of the kind of cool things about zen-observable
is you can
new Observable(observer => {
checkSomethingSynchronously()
return someOtherObservable.subscribe(observer)
})
I'm looking for an elegant way to do this with repeater. Seems like right now I'd have to use for..await
:
new Repeater(async (push, stop) => {
await checkSomethingAsynchronously()
for await (const x of someOtherRepeater) push(x)
})
The syntax isn't bad really, but didn't we come to the conclusion awhile back that using for..await
like this is dangerous? I can never remember the caveats exactly.
The AsyncIterator
defines an optional throw method that can be used to make the iterator throw an error. This is useful when you have nested async iteration loops and you don’t want to wait for the child loop to terminate before continuing the parent loop, but you still want to propagate errors to avoid Promise rejections. Example:
for await (const a of iterator) {
if (a.subscribe) {
(async () => {
for await (const b of a.subscribe()) {
console.log(b);
}
// propagate errors to the parent
})().catch((err) => iterator.throw(err));
}
}
Figure out the rejection semantics for throw
and how it interacts with next
and return
. My plan is for throw
to close the channel with the error and await return
. If the channel is already closed, should throw
reject?
Currently, throttler will release all tokens at a fixed interval. This logic is slightly wrong because it means that a caller can expend the token budget halfway through one window and then expend the budget again halfway through the second window, doubling rate at which tokens are used. We need to use a sliding window so that tokens are released after wait
ms from the first call.
https://www.figma.com/blog/an-alternative-approach-to-rate-limiting/
Since the API is so similar to Observable, I find it great for consuming observable-like things but I'm not as sold for using it on non-observable-like things, like transducers or other async iterator combiners.
It seems almost simpler to implement transducers like map
correctly using a raw async iterator protocol than using Repeater
.
The naive assumption (in repeaterMap
below) is to use a for await
loop, but this keeps consuming input after the caller returns, which would be especially bad if the input is infinite. Whereas a raw async iterator can immediately forward return and throw calls to the input.
A less naive attempt (awkwardRepeaterMap
) is to do the for await
in a separate function and break once stop is detected. However more calls to next()
still go through than with the raw async iterator.
Thoughts?
const { Repeater } = require('@repeaterjs/repeater')
const range = n => ({
i: 0,
n,
async next() {
console.log('next()')
const i = ++this.i
if (i >= this.n) return { done: true }
return { value: i, done: false }
},
async return() {
console.log('return()')
this.i = this.n
return { done: true }
},
async throw(error) {
console.log('throw()')
this.i = this.n
return { done: true }
},
[Symbol.asyncIterator]() {
return this
},
})
const repeaterMap = (iterable, iteratee) =>
new Repeater(async (push, stop) => {
for await (const value of iterable) push(iteratee(value))
await stop
})
const rawMap = (iterable, iteratee) => ({
iterator: iterable[Symbol.asyncIterator](),
async next() {
const { value, done } = await this.iterator.next()
return { value: iteratee(value), done }
},
return() {
return this.iterator.return()
},
throw(error) {
return this.iterator.throw(error)
},
[Symbol.asyncIterator]() {
return this
},
})
const awkwardRepeaterMap = (iterable, iteratee) =>
new Repeater(async (push, stop) => {
let stopped = false
async function next() {
for await (const value of iterable) {
if (stopped) break
push(iteratee(value))
}
}
next()
await stop
stopped = true
})
async function go() {
console.log('rawMap')
for await (const i of rawMap(range(10), i => i * 2)) {
console.log(i)
if (i >= 5) break
}
console.log('\nrepeaterMap')
for await (const i of repeaterMap(range(10), i => i * 2)) {
console.log(i)
if (i >= 5) break
}
console.log('\nawkwardRepeaterMap')
for await (const i of awkwardRepeaterMap(range(10), i => i * 2)) {
console.log(i)
if (i >= 5) break
}
}
go()
rawMap
next()
2
next()
4
next()
6
return()
repeaterMap
next()
next()
next()
2
next()
next()
next()
4
next()
next()
6
next()
next()
awkwardRepeaterMap
next()
next()
next()
2
next()
next()
next()
4
next()
next()
6
next()
return()
The return
method should accept a thenable which resolves to the final value as well as a final value (see spec). The typescript AsyncIterator
interface requires return
to be passed PromiseLike<TReturn> | TReturn
. We should handle the case where a PromiseLike is passed to return
in code and update types.
I think it would be cool to create custom errors for easier error handling. I suggest ts-custom-error
library. It allows do something like:
const someInternalCode = () => {
if (xxx) {
throw new CannotReadFromEmptyBuffer()
}
}
try {
someInternalCode();
} catch (ex) {
if (ex instanceof CannotReadFromEmptyBuffer) {
console.error("You just tried to read from an empty buffer...")
return;
}
console.error("Some error happened", ex)
}
Hi guys! I was trying out your library and faced the behaviour that I don't understand.
I have two Repeaters that randomly fetch jokes from the APIs. And I want these two Repeaters merged.
When ran apart each Repeater will yield 3 jokes as expected.
For some reason, this code will return me only 4 jokes when I expect 6 of them. Can someone explain to me what am I doing wrong here?
Here's the code:
const axios = require('axios');
const { Repeater } = require('@repeaterjs/repeater');
function getDadJokes(n) {
return new Repeater(async (push) => {
for (i = 0; i < n; i++) {
const { data: { joke } } = await axios.request({
url: 'https://v2.jokeapi.dev/joke/Any?type=single'
});
push(`${joke}`);
}
})
}
function getChuckNorrisJokes(n) {
return new Repeater(async (push) => {
for (i = 0; i < n; i++) {
const { data: { value } } = await axios.request({
url: 'https://api.chucknorris.io/jokes/random'
});
push(`${value}`);
}
});
}
(async function () {
const merged = Repeater.merge([
getChuckNorrisJokes(3),
getDadJokes(3),
]);
for await (const joke of merged) {
console.log(`\n${joke}\n`);
}
})();
After some initial work on redesigning Repeater.prototype.throw
to close #35, I think we should redesign Repeater.prototype.return
and the stop
promise as well, insofar as current behavior creates additional discrepancies between repeaters and async generators. Current behavior is problematic for the following reasons:
stop
promise resolves to the value passed to Repeater.prototype.return
.This is behavior which we cannot replicated in an equivalent async generator, insofar as there is no way with try
/finally
blocks to inspect the original return value, only overwrite it. By exposing the value passed to return
, we create opportunities for abstraction leaks where you have to treat calls to return
on a repeater differently from calls to return
on an async generator.
Let’s say that we actually wanted this extra ability to inspect the value passed to return
via stop
. After all, with repeaters, you can race the stopping/returning of the repeater with another promise to ensure timely returns, which is something you cannot do within an async generator. However, even if we wanted to inspect the value passed to return
with the stop
promise, the only way it could be used would be to overwrite the final result of the iterator. All code which runs in the executor after the stop
promise settles in the case of early return is equivalent to code within a finally
block in an async generator, and established best practices dictate that we should not put any control flow statements in finally blocks. We already have no means of putting yield
statements in a repeater’s “finally” block, insofar as push
becomes a no-op function once the repeater is stopped, so it makes sense that we should also disallow the equivalent of return as well. Using the return value of the executor in the case of early return is like promoting the usage of return
statements in finally
blocks for async generators, which is something we don’t want to do.
Therefore, making the value passed to the return
method inspectable becomes unnecessary. The executor’s return value should only be used in the case of a repeater’s normal completion. The only situation where we should overwrite the result of an early return is in the case of the executor throwing an error.
In the implementation of various functions like the Repeater
static method combinators, as well as the implementation of repeater utilities, I often wondered if we should return the stop
promise from the executor to preserve the return
behavior of async generators. By simply ignoring normal completions in the case of early returns, we get the default behavior of async generators for free, and we can stop worrying about returning the stop
promise or using its value.
If I have "module": "nodenext"
in my tsconfig and my module type is commonjs (the default) then I get a TS error on import { Repeater } from "@repeaterjs/repeater"
:
The current file is a CommonJS module whose imports will produce 'require' calls; however, the referenced file is an ECMAScript module and cannot be imported with 'require'. Consider writing a dynamic 'import("@repeaterjs/repeater")' call instead.
This is because it resolves to the repeater.d.ts
listed in the "types"
field of your package.json
, but because your package.json
declares "type": "module"
, that repeater.d.ts
is ESM.
CJS/ESM is kind of a disaster but fortunately all it takes to fix this is an export map (I'll make a PR):
"exports": {
".": {
"require": {
"types": "./cjs/repeater.d.ts",
"default": "./cjs/repeater.js"
},
"types": "./repeater.d.ts",
"default": "./repeater.js"
}
},
Wrote some code in #48 that was like:
function map(iterable, fn) {
return new Repeater(async (push, stop) => {
const iter = iterable[Symbol.asyncIterator]();
let finalIteration;
stop.then(() => {
finalIteration = typeof iter.return === "function" ? iter.return() : {done: true};
});
while (!finalIteration) {
const iteration = await iter.next();
if (iteration.done) {
stop();
return iteration.value;
}
await push(fn(iteration.value));
}
// there’s no need to return finalIteration’s value here because when repeaters are returned, the return value will just be whatever was passed into the `return` method.
await finalIteration;
});
}
Note that we have to call stop manually before performing an early return in the repeater executor (stop(); return;
). I’m starting to think that the stop
could just be implicit when we return from the executor.
We automatically stop the executor when the repeater execution rejects or throws, so why don’t we automatically stop the executor when it returns or fulfills? I think I initially did not implement repeaters in this way because of tests where I would push in a sync function, but I have never written any actual real-life repeaters which don’t await stop somehow.
This would be a breaking change. It would require people who returned early from repeaters to await stop, but I’m not sure anyone uses repeaters like that.
This is an idea for a new feature, either in Repeater or as a separate class in the same project.
The idea: The object returned by new Repeater()
should have an additional .then
method, which will be a one-time-use variant of .next
.
Background: Repeater combinators, Crank JS components etc. can cleverly accept Promises in place of Async Iterables. I would like to write a library that does the reverse of this: provide values that other peoples’ code can use as either a Promise or an Async Iterable.
Example: Say I want to implement a file read()
function which can be used in two ways:
const result = await read(name); // Buffered read
and
for await (const chunk of read(name)) { ... } // Streaming read
A good API to help me write this function might be:
function read(filename) {
return new RepeatOrNot(async (push, stop) => {
const emitter = getDataEventEmitter(filename);
const buffer = [];
emitter.on('data', (chunk) => {
if (stop.oneTime) buffer.push(chunk);
else push(chunk);
});
emitter.on('close', () => {
if (stop.oneTime) push(buffer.join(''));
else stop();
}
await stop();
emitter.closeAndCleanUp();
});
}
This could be implemented with the following additional behavior on top of Repeater:
When .then()
is called on the returned object:
stop
(name oneTime
above) set to true.stop
promise immediately.A few months ago I had to deal with large tree structures and started learning about stuff about Iteration and have been using it a lot since then (can't wait for the iterator-helper proposal).
And while reading up something aboud NodeJS-Streams i found this line in the docs: (section talking about their compatability from the docs)
With the support of async generators and iterators in JavaScript, async generators are effectively a first-class language-level stream construct at this point.
And after some learning experimenting i pretty much replaced using Stream-classes in favour of async-itterators/generators for all simpler use-cases, because I was never the biggest fan of the NodeJS-EventEmitter implementation.
I randomly stumbled over this project and quite like the things it does and the detailed explanations on the website taught me some new things. So i wanted to ask:
What are your thoughts on using modern async iteration to create an alternative EventEmitter with similiar capabilities?
Node already exposes to functions for working with promises and async iteration on EventEmitters:
function once (emitter: EventEmitter, event: string | symbol): Promise<any[]>
function on (emitter: EventEmitter, event: string | symbol): AsyncIterator<any[]>
I did some experimenting around creating such a class and i had some pretty good results, and ended up with quite simple code that implemented many of the safety features and other capabilities of the NodeJS-EventEmitter.
And now that i found this project i wanted to ask someone who knows a bit more about this topic than me.
The Pub/Sub sub-package already implements some similar things
It feels like a waste to create a channel that fires only once and closes, which is what the delay
function from @channel/timers
does. Figure out a way to reuse delay timers.
We should probably also make timeout
return a channel. It doesn’t make sense for a channel package function to return a promise.
Adding this behavior to delay
/timeout
will make it useful to use with Channel.race
to timeout each iteration of a target async iterator.
Sometimes, I want to create synchronous iterators from synchronous callback-based APIs. For instance, the typescript compiler exposes a ts.forEachChild
function which takes a node and a callback and walks the typescript AST. It would be nice to be able to consume this and other sync-callbacks with a for…of
loop and it would be really nice if the Channel
class could polymorphically return a synchronous iterator. We would return a sync iterator based on whether or not the executor returns a Promise
or PromiseLike
, and throw errors if push
is called after the executor returns for a sync Channel
. We could also make sure that the executor in the synchronous case does not read the stop
argument, because it doesn’t really make sense for synchronous iteration. I’m not entirely sure this is possible to do typescript and might need something like conditional types.
This is related to #18 in that “Channel” seems to imply asynchrony, one possibility is that we rename Channel
to X
and make the current Channel
class AsyncX
or something, but this seems less ideal if we can figure out a polymorphic solution.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.