Git Product home page Git Product logo

repeater's People

Contributors

0xflotus avatar brainkim avatar dependabot[bot] avatar hwanders avatar jedwards1211 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

repeater's Issues

Array like helper functions (combinator module)

I think it would be handy to have array-like helper functions in the core or utils/helpers package.

const numbersChannel = new Channel<number>(() => {});

const oddNumbersChannel = filterChannel(numbersChannel, item => item % 2 !== 0);
const evenNumbersChannel = filterChannel(numbersChannel, item => item % 2 === 0);

const stringOddNumbersChannel = mapChannel(oddNumbersChannel, item => item.toString());

Can't think of about any other from the top of my head. These should probably be functions and not instance level methods so the API doesn't differ from async iterators.

Aggressive memory leak in Repeater.race

Reported by @elderapo.
When Repeater.race is called with an iterator and a promise which doesn’t settle, memory usage grows until the process crashes.

Reproduction:

import * as crypto from "crypto";
import {Repeater} from "@repeaterjs/repeater";
async function randomString(): Promise<string> {
  await new Promise((resolve) => setTimeout(resolve, 1));
  return crypto.randomBytes(10000).toString("hex")
}

async function *createStrings() {
  while (true) {
    yield randomString();
  }
}

function map<T, U>(iter: AsyncIterable<T>, fn: (value: T) => U): AsyncGenerator<U> {
  return new Repeater(async (push, stop) => {
    for await (const value of Repeater.race([iter, stop])) {
      push(fn(value as T) as U);
    }

    stop();
  });
}


(async function main() {
  let i = 0;
  for await (const value of map(createStrings(), (value) => value.split(""))) {
    if (i++ % 1000 === 0) {
      const usage = process.memoryUsage();
      const rssMiB = Math.round(usage.rss / (1024 ** 2) * 100) / 100;
      console.log(`Resident Set: ${rssMiB} MiB`);
    }
  }
})();

This will log an increasing resident set size until the process runs out of memory. It seems to be exacerbated by what is passed through the iterator which is racing the stop promise.

TypeError: Invalid attempt to iterate non-iterable instance.

I'm attempting to use repeaters in a react-native typescript project. When I try to for await any iterator it throws the exception

TypeError: Invalid attempt to iterate non-iterable instance.

This happens running the example verbatim from the documentation. Not really sure what's going on here. It must be something about my typescript configuration?

"@repeaterjs/repeater": "^3.0.4",

tsconfig.json

{
    "compilerOptions": {
        "sourceMap": true,
        "allowJs": false,
        "resolveJsonModule": true,
        "allowSyntheticDefaultImports": true,
        "esModuleInterop": true,
        "experimentalDecorators": true,
        "jsx": "react",
        "lib": ["es2019"],
        "moduleResolution": "node",
        "noErrorTruncation": true,
        "noImplicitReturns": true,
        "target": "ESNext",
        "strict": true,
        "noImplicitAny": true,
        "noFallthroughCasesInSwitch": true,
        "noEmit": true,
        "skipLibCheck": true,
        "types": ["node", "jest"],
        "typeRoots": ["./node_modules/@types"]
    },
    "include": ["src"],
    "exclude": [
        "**/node_modules",
        "node_modules",
        "**/node_modules/**/*",
        "**/.*/",
        "babel.config.js",
        "metro.config.js",
        "jest.config.js",
        "ios",
        "android",
        ".history",
        ".tsbuild"
    ]
}

Redesign Repeater.prototype.throw

Repeater.prototype.throw was initially implemented just to have parity with async generator objects in #2. It was designed so that it called stop on the repeater with an error and returned the final value, allowing consumers to catch the error, but after some thinking I’m wondering if we can also give producers a chance to catch the error as well. Specifically, what if calling throw caused the stop promise to reject? I was initially hesitant to implement throw in this way because I was worried about unhandled promise rejections, but after using repeaters for a while I think all responsible/non-toy repeaters involve some cleanup where the stop promise is awaited, and if the executor awaits the stop promise, throwing an error into the repeater should have the same effect. This means that the repeater would not be stopped if the executor caught the stop rejection, mimicking the behavior where generators can continue to yield values after a thrown error using try/catch/finally.

Way of checking if channel is closed

Currently, there is no synchronous and easy way of checking if the channel is closed. It would be very useful for debugging purposes. The best current solution I can think of is checking if stop promise has been resolved but this seems both hacky and there is no synchronous way of checking it.

There is however a question if allowing this is a great idea. I think it might make it easier to write some buggy/leaky code. For example, I was merging some channels in a very specific way and needed to act accordingly when other channels got closed, etc.

The API I suggest is channel.isClosed/channel.isClosed().

Merge close and stop arguments into a single argument

Had the realization that we can make close thenable, reducing the number of arguments passed to the executor from three to two. It never really made sense what the difference between close and stop was (any word used to describe “finishing” a channel is likely both a noun and a verb), and the naming didn’t clear up the distinction. By having close be promise-like, we can reduce the number of times we pass close but ignore the argument.

Maybe close should be renamed to stop as well. I keep running into issues where I forget to pass close but window.close exists, causing confusing errors. push and stop are both four letters too, which is a plus.

This is a super-breaking change but we’re below 1.0 so whatever.

Question: Convert Observable to Repeater and back

Hey,

I just wanted to play with repeater, and I was curious whether is it possible for example to convert from Rx.Observable to repeater and back.

Maybe you can point me to some example.

Thanks!

Lazy initialization should be configurable

Lazy initialization is a cool feature but not always needed/good. There are situations where it leads to a less predictable outcome or just makes things harder.

I think it should remain on by default but should be configurable by like new Channel<T>(..., { lazy: false })

WeakMap error at Repeater.return

Running into an issue with repeaterjs 3.0.4, my repeater is crashing when I try to stop it by calling return.

 Error: WeakMap error
      at Repeater.return 
    .....my app stack

Not a lot to go on here. I'm calling return() to stop a repeater and create a new one when some input conditions change. I'm wrapping a react-native-firebase firestore onSnapshot listener in a repeater and my query can change so I need to stop and re-create the listener and repeater when that happens.

Making push synchronous

I am not sure why push is asynchronous. From a quick look at the code, it seemed like there are only wrappers like Promise.resolve(value).

It complicated things because you have to either use await with every push which leads to more complex code (you have to consider that other things might execute meanwhile there is a push to the channel) or omit the await and just treat it as a synchronous call which makes eslint go nuts because it doesn't want to allow floating promises.

Question: observables and callback hell

I am reviewing the library, and running thruogh the documentation, I came across this:

Being a callback-based API makes using observables with async/await and promises awkward; in fact, observables suffer from the same issue of “callback hell” which promises were designed to solve. Observable libraries are aware of this and provide “higher-order observable operators,” which work on observables of observables, but these solutions are seldom used and virtually incomprehensible to human beings, who are unaccustomed to thinking in extradimensional spaces.

I have been using observables for a while and I wonder what you meant by callback hell occurring also with observables. Do you have an example? If by higher-order observable operators, you meant flatMap or switchMap and the likes, I have actually been using those pretty often, and I would disagree that they are incomprehensible to human beings. That is quite a strong statement.

Aside from developer experience, I guess my next question is: is there something that you can write with repeaters that you cannot with observables? Or to say the same, are repeaters more expressive than observables? I can think about backpressure as a possible candidate, but I haven't been able to figure the answer from the documentation to figure that out so asking here.

Thanks!!

Implement async generator combination functions similar to the ones from rx.js

  • implement Channel.race
  • implement Channel.merge
  • implement Channel.zip
  • implement Channel.latest
  • add variadic type parameters for these methods
  • write tests
  • test throwing behavior
  • add documentation

Channel.race calls Promise.race on each iteration, Channel.merge merges all async iterators into a single iterator, Channel.zip calls Promise.all on each iteration, and Channel.latest returns an iterator which iterates with the latest values when any individual iterator updates.

Unwrap promises passed to push

Allow push calls to pass a promise rather than a naked value. Mimic the async generator behavior where generators unwrap yielded promises before iteration resolves. This makes types like Channel<Promise<T>> impossible to create, but I think that’s a good thing. It’s similarly impossible to create a promise of a promise Promise<Promise<T>>.

The one thing we need to figure out is what to do if the passed promise rejects. I think it’s reasonable to close the channel and finish the iterator (clear the buffer) if this happens, because it implies that the error has been received before any pending values.

Event loop can die if stop() not called

Opening a separate issue from #49 for the sake of people googling why their process is just dying on them.

It seems a bit too easy to accidentally cause the event loop to die:

const { Repeater } = require('@repeaterjs/repeater')

const test = new Repeater((push, stop) => {
  push(1)
  // stop()
})

;(async () => {
  for await (const value of test) {
    console.log(value)
  }
  // event loop dies before this point unless you uncomment stop() above
  console.log('done')
})()

Run execution synchronously

Currently, channels will run the executor in a promise after next is called for the first time. It would be better if the executor is run synchronously during the first call so we don’t defer to the microtask queue.

Off-by-one issue with next parameter and push promise

Currently we resolve push results to the value passed to next. However, the repeater implementation is off-by-one in the sense that the first push call resolves to the first value passed to next, the second push to the second value and so on. This behavior differs from generators, which will resume yield with the value passed to the next call to next, dropping the first value passed to next entirely. To mimic async generators, we should be resolving the first push to the second value passed to next, and so on.

In other words, the following test is wrong:

  test("pushes resolve to value passed to next", async () => {
    let push: (value: number) => Promise<number | void>;
    const repeater = new Repeater(async (push1) => {
      push = push1;
    });
    const next1 = repeater.next(-1);
    const push1 = push!(1);
    await expect(push1).resolves.toEqual(-1);
    await expect(next1).resolves.toEqual({ value: 1, done: false });
    const push2 = push!(2);
    await expect(repeater.next(-2)).resolves.toEqual({ value: 2, done: false });
    await expect(push2).resolves.toEqual(-2);
  });

The correct version of this test should be:

    let push: (value: number) => Promise<number>;
    const repeater = new Repeater(async (push1) => {
      push = push1;
    });
    const next1 = repeater.next(-1);
    const next2 = repeater.next(-2);
    const next3 = repeater.next(-3);
    const next4 = repeater.next(-4);
    const push1 = push!(1);
    const push2 = push!(2);
    const push3 = push!(3);
    const push4 = push!(4);
    await expect(next1).resolves.toEqual({ value: 1, done: false });
    await expect(next2).resolves.toEqual({ value: 2, done: false });
    await expect(next3).resolves.toEqual({ value: 3, done: false });
    await expect(next4).resolves.toEqual({ value: 4, done: false });
    await expect(push1).resolves.toEqual(-2);
    await expect(push2).resolves.toEqual(-3);
    await expect(push3).resolves.toEqual(-4);
    await expect(
      Promise.race([
        push4,
        new Promise((resolve) => setTimeout(() => resolve(-1000), 1)),
      ]),
    ).resolves.toEqual(-1000);

Stop promise not entirely compatible with Promise.race

Hi, noticing in my tests that when i'm trying to use the stop promise in a call to Promise.race, it's not prioritising this promise even though it's clearly settled. I'm guessing this is happening because Stop is not a "native" promise, and Promise.race is not seeing it as immediately settled?

To demonstrate, here's some code that runs to completion:

    async function* increment() {
      let count = 0;
      for (;;) {
        ++count;
        await promisify(nextTick)(); // this line is required or we never exit
        yield count;
      }
    }

    const repeater = new Repeater<number>(async (push, stop) => {
      const iter = increment()[Symbol.asyncIterator]();
      //const stop2 = stop.then((a) => a); // workaround
      for (;;) {
        const result = await Promise.race([stop, iter.next()]); // workaround: use stop2 here instead of stop
        if (result === undefined) {
          // stopped by our caller
          await iter.return();
          break;
        }
        if (result.done) {
          // source iterator is done
          stop();
          break;
        }

        await push(result.value);
      }
    });

    for await (const count of repeater) {
      if (count >= 5) {
        break;
      }
    }

If you now try running this but this time comment out the "nextTick" line, you'll note the code hangs. What's actually happening is the implicit call to end the repeater on the final loop's break never ends, as the code is stuck in the repeater loop with the call to Promise.race always returning the promise result for iter.next, despite stop having clearly resolved and being listed first.

To "fix" this behaviour I can uncomment the workaround line and use stop2 in the race call. This then causes race to resolve to the stop2 promise and we therefore exit the loop.

Please let me know if there's a better way to structure this code (dealing with never ending source async iterators) to avoid this entirely.

Repeater.race typings are incorrect

Promises which are passed to Repeater.race are added to the inferred yield of the returned iterator. It should be added to the return value instead somehow.

Move utility packages to their own repositories

The yarn monorepo format is pain to deal with, is difficult to integrate with github/javascript/typescript tools and hasn’t really been useful in any meaningful way. I’ve twice accidentally published dependent packages without updating internal dependencies because packages are symlinked together. I have a github org with lots of space so why not use it to its full potential, and allow for better discoverability.

Consider renaming the library

A “Channel” is a generic and widely used concept in programming. There are lots of libraries (Google’s BrowserChannel, all the csp Channel libraries that this library is in direct competition with) and DOM API’s (MessageChannel, RTCDataChannel, BroadcastChannel), such that I feel like it’s in our interest, both for SEO and clarity to rename the base class and package name for this library.

I want something that communicates that this is THE constructor for custom async iterators, especially when dealing with callbacks, so maybe a synonym of iterator, with connotations that it is mutable, ephemeral, reactive, asynchronous, concurrent.

4.0 changes

3.0 was released last November and I’m itching to do some breaking changes. Here’s what I’m planning:

  1. Move the combinator functions to their own module. EDIT: I’m starting to think the static methods should stay on the class, even if we implement the other combinator functions.
    I initially wrote combinators as static methods (Repeater.race) on the exported Repeater class, both on the assumption that they would be commonly used and because it draws a nice parallel with Promise.race. If repeaters ever end up becoming a standard thing, this might be how the functions are defined, but as a library, having the combinator functions exposed as static methods messes with the bundle size, and is not tree-shakeable. Additionally, it’s been brought to my attention (#48) that unlike my previous assumption, there actually may be a need for repeaters in the implementation of combinators like map and filter and creating a module for these functions all defined in one place would be nice. I’ve checked out some other combinator libraries and it seems like many of them leak memory and cause deadlocks, so there is going to be value in creating a combinator module, especially for combinators like switchMap, which require a high degree of concurrency.

  2. Kill the monorepo. Package monorepos are annoying to deal with. I have made the mistake of not updating the internal versions of packages multiple times, there’s duplicated tooling across the packages, and I hate having to navigate a multi-level directory structure for what are essentially single-file modules. My plan is to get all the modules exported under the main repeater package and deprecate the other packages.

  3. Kill pubsub. I’m probably the only one using the pubsub module right now, but after a year of async iterator usage, what I believe is that you should always start with an EventTarget or callback-based abstraction when writing custom classes. Async iterators are high-level, always async abstractions and we shouldn’t encourage people to write abstractions which exclusively use them.

  4. Rename some functions. One thing I’m noticing is that the function names in limiters and timers are squatting on the expected name for the actual repeater object (const ??? = timeout(1000)). Most of the time you inline the function call directly in the for await statement, but for cases where you need to assign the repeater to a variable the naming can be kind of annoying. I’d like to maybe adopt some kind of consistent naming pattern like createTimeout, genTimeout or maybe sourceTimeout.

  5. Stop repeaters when the executor settles. EDIT: April 2021 I think this one also might not be worth it. I’m reading through code which uses Repeaters in the wild, and it might be a pain in the butt to have to fix all that code. This change strikes me as being less explicit and more confusing. (#49) Related to implementing combinators, I want to implicitly stop repeaters when the executor function returns. Whenever I write combinators I consistently feel compelled to write stop calls directly before the return statement or in a finally block. This is probably one of the more disruptive breaking changes as it will cause repeaters to stop unexpectedly if you didn’t await anything in the executor, but you can reconstruct the old behavior by awaiting or returning the stop promise. If your repeater executor exits synchronously, I encourage you to somehow await or return the stop promise to prepare for this change.

Some of the more fragile tests are failing in newer versions of node

yarn run v1.22.4
$ jest --config ./jest.config.js --rootDir ./ --color --no-colors
PASS src/__tests__/buffers.ts
FAIL src/__tests__/repeater.ts
  ● Repeater › throw method caught in async function

    expect(received).resolves.toEqual()

    Received promise rejected instead of resolved
    Rejected to value: [Error: throw method caught in async function]

      1207 |     });
      1208 |     await expect(r.next()).resolves.toEqual({ value: 0, done: false });
    > 1209 |     await expect(r.throw(error)).resolves.toEqual({
           |           ^
      1210 |       value: 1,
      1211 |       done: false,
      1212 |     });

      at expect (../../node_modules/expect/build/index.js:134:15)
      at Object.<anonymous> (src/__tests__/repeater.ts:1209:11)

FAIL src/__tests__/combinators.ts (12.27 s)
  ● combinators › Repeater.race › generator vs Promise.resolve

    expect(received).resolves.toEqual(expected) // deep equality

    - Expected  - 2
    + Received  + 2

      Object {
    -   "done": false,
    -   "value": 1,
    +   "done": true,
    +   "value": "z",
      }

      40 |         Promise.resolve("z"),
      41 |       ]);
    > 42 |       await expect(iter.next()).resolves.toEqual({ value: 1, done: false });
         |                                          ^
      43 |       await expect(iter.next()).resolves.toEqual({ value: "z", done: true });
      44 |       await expect(iter.next()).resolves.toEqual({ done: true });
      45 |     });

      at Object.toEqual (../../node_modules/expect/build/index.js:198:20)
      at Object.<anonymous> (src/__tests__/combinators.ts:42:42)

  ● combinators › Repeater.merge › Promise.resolve vs generator

    expect(received).toEqual(expected) // deep equality

    - Expected  - 1
    + Received  + 1

      Array [
    -   1,
        -1,
    +   1,
        2,
        3,
        4,
        5,
      ]

      257 |         }
      258 |       } while (!result.done);
    > 259 |       expect(nums).toEqual([1, -1, 2, 3, 4, 5]);
          |                    ^
      260 |       await expect(iter.next()).resolves.toEqual({ done: true });
      261 |     });
      262 | 

      at Object.<anonymous> (src/__tests__/combinators.ts:259:20)

Test Suites: 2 failed, 1 passed, 3 total
Tests:       3 failed, 140 passed, 143 total
Snapshots:   0 total
Time:        15.389 s
Ran all test suites.
error Command failed with exit code 1.
info Visit https://yarnpkg.com/en/docs/cli/run for documentation about this command.

Seems like some race conditions in tests which rely on async generators running in a specific order compared to async functions, as well as throwing promises not working.

Help with map transducer?

#48 (comment)

See https://github.com/yaacovCR/graphql-executor/blob/c254cdcc7254b07f583c6391d2103fe8e2761a52/src/execution/mapAsyncIterator.ts#L30-L38

which I changed to make sure the map does not hang.

...but there are a lot of failing tests now in yaacovCR/graphql-executor#54

I am not so concerned right now about the tests in https://github.com/yaacovCR/graphql-executor/blob/reform-map/src/execution/__tests__/mapAsyncIterator-test.ts because I am not sure how correct that file is.

But there are functional differences it seems in the order of next/return calls vs next/throw calls in the tests here https://github.com/yaacovCR/graphql-executor/blob/c254cdcc7254b07f583c6391d2103fe8e2761a52/src/execution/__tests__/subscribe-test.ts#L889-L976 where the first passes, and the second fails -- I am not sure which should be correct, but I would think they should be the same.

Doesn't quite match AsyncIterable interface?

I don't know if this causes problems in TS, but I noticed it because I'm trying to convert the defs to Flow, so I wanted to point it out.

The AsyncIterator interface is

// es2018.asyncgenerator.d.ts
interface AsyncGenerator<T = unknown, TReturn = any, TNext = unknown> 
    extends AsyncIterator<T, TReturn, TNext> {
    // NOTE: 'next' is defined using a tuple to ensure we report the correct assignability errors
    // in all places.
    next(...args: [] | [TNext]): Promise<IteratorResult<T, TReturn>>;
    return(value: TReturn): Promise<IteratorResult<T, TReturn>>;
    throw(e: any): Promise<IteratorResult<T, TReturn>>;
    [Symbol.asyncIterator](): AsyncGenerator<T, TReturn, TNext>;
}

next and return don't accept PromiseLike the way they do in Repeater.

push*(repeater) type API

One of the kind of cool things about zen-observable is you can

new Observable(observer => {
  checkSomethingSynchronously()
  return someOtherObservable.subscribe(observer)
})

I'm looking for an elegant way to do this with repeater. Seems like right now I'd have to use for..await:

new Repeater(async (push, stop) => {
  await checkSomethingAsynchronously()
  for await (const x of someOtherRepeater) push(x)
})

The syntax isn't bad really, but didn't we come to the conclusion awhile back that using for..await like this is dangerous? I can never remember the caveats exactly.

Implement `Channel.prototype.throw`

The AsyncIterator defines an optional throw method that can be used to make the iterator throw an error. This is useful when you have nested async iteration loops and you don’t want to wait for the child loop to terminate before continuing the parent loop, but you still want to propagate errors to avoid Promise rejections. Example:

for await (const a of iterator) {
  if (a.subscribe) {
    (async () => {
      for await (const b of a.subscribe()) {
        console.log(b);
      }
    // propagate errors to the parent
    })().catch((err) => iterator.throw(err));
  }
}

Figure out the rejection semantics for throw and how it interacts with next and return. My plan is for throw to close the channel with the error and await return. If the channel is already closed, should throw reject?

Not that great for implementing transducers?

Since the API is so similar to Observable, I find it great for consuming observable-like things but I'm not as sold for using it on non-observable-like things, like transducers or other async iterator combiners.

It seems almost simpler to implement transducers like map correctly using a raw async iterator protocol than using Repeater.

The naive assumption (in repeaterMap below) is to use a for await loop, but this keeps consuming input after the caller returns, which would be especially bad if the input is infinite. Whereas a raw async iterator can immediately forward return and throw calls to the input.

A less naive attempt (awkwardRepeaterMap) is to do the for await in a separate function and break once stop is detected. However more calls to next() still go through than with the raw async iterator.

Thoughts?

Code

const { Repeater } = require('@repeaterjs/repeater')

const range = n => ({
  i: 0,
  n,
  async next() {
    console.log('next()')
    const i = ++this.i
    if (i >= this.n) return { done: true }
    return { value: i, done: false }
  },
  async return() {
    console.log('return()')
    this.i = this.n
    return { done: true }
  },
  async throw(error) {
    console.log('throw()')
    this.i = this.n
    return { done: true }
  },
  [Symbol.asyncIterator]() {
    return this
  },
})

const repeaterMap = (iterable, iteratee) =>
  new Repeater(async (push, stop) => {
    for await (const value of iterable) push(iteratee(value))
    await stop
  })

const rawMap = (iterable, iteratee) => ({
  iterator: iterable[Symbol.asyncIterator](),
  async next() {
    const { value, done } = await this.iterator.next()
    return { value: iteratee(value), done }
  },
  return() {
    return this.iterator.return()
  },
  throw(error) {
    return this.iterator.throw(error)
  },
  [Symbol.asyncIterator]() {
    return this
  },
})


const awkwardRepeaterMap = (iterable, iteratee) =>
  new Repeater(async (push, stop) => {
    let stopped = false
    async function next() {
      for await (const value of iterable) {
        if (stopped) break
        push(iteratee(value))
      }
    }
    next()
    await stop
    stopped = true
  })

async function go() {
  console.log('rawMap')
  for await (const i of rawMap(range(10), i => i * 2)) {
    console.log(i)
    if (i >= 5) break
  }
  console.log('\nrepeaterMap')
  for await (const i of repeaterMap(range(10), i => i * 2)) {
    console.log(i)
    if (i >= 5) break
  }
  console.log('\nawkwardRepeaterMap')
  for await (const i of awkwardRepeaterMap(range(10), i => i * 2)) {
    console.log(i)
    if (i >= 5) break
  }
}

go()

Output

rawMap
next()
2
next()
4
next()
6
return()

repeaterMap
next()
next()
next()
2
next()
next()
next()
4
next()
next()
6
next()
next()

awkwardRepeaterMap
next()
next()
next()
2
next()
next()
next()
4
next()
next()
6
next()
return()

Repeater.prototype.return should accept a PromiseLike<TReturn>

The return method should accept a thenable which resolves to the final value as well as a final value (see spec). The typescript AsyncIterator interface requires return to be passed PromiseLike<TReturn> | TReturn. We should handle the case where a PromiseLike is passed to return in code and update types.

Dedicated errors

I think it would be cool to create custom errors for easier error handling. I suggest ts-custom-error library. It allows do something like:

const someInternalCode = () => {
  if (xxx) {
    throw new CannotReadFromEmptyBuffer()
  }
}


try {
  someInternalCode();
} catch (ex) {
  if (ex instanceof CannotReadFromEmptyBuffer) {
    console.error("You just tried to read from an empty buffer...")
    return;
  }

  console.error("Some error happened", ex)
}

Question about merging two Repeaters

Hi guys! I was trying out your library and faced the behaviour that I don't understand.

I have two Repeaters that randomly fetch jokes from the APIs. And I want these two Repeaters merged.

When ran apart each Repeater will yield 3 jokes as expected.

For some reason, this code will return me only 4 jokes when I expect 6 of them. Can someone explain to me what am I doing wrong here?

Here's the code:

const axios = require('axios');
const { Repeater } = require('@repeaterjs/repeater');

function getDadJokes(n) {
    return new Repeater(async (push) => {
        for (i = 0; i < n; i++) {
            const { data: { joke } } = await axios.request({
                url: 'https://v2.jokeapi.dev/joke/Any?type=single'
            });
            push(`${joke}`);
        }
    })
}

function getChuckNorrisJokes(n) {
    return new Repeater(async (push) => {
        for (i = 0; i < n; i++) {
            const { data: { value } } = await axios.request({
                url: 'https://api.chucknorris.io/jokes/random'
            });
            push(`${value}`);
        }
    });
}

(async function () {
    const merged = Repeater.merge([
        getChuckNorrisJokes(3),
        getDadJokes(3),
    ]);

    for await (const joke of merged) {
        console.log(`\n${joke}\n`);
    }
})();

Redesign Repeater.prototype.return and the stop promise.

After some initial work on redesigning Repeater.prototype.throw to close #35, I think we should redesign Repeater.prototype.return and the stop promise as well, insofar as current behavior creates additional discrepancies between repeaters and async generators. Current behavior is problematic for the following reasons:

  1. The stop promise resolves to the value passed to Repeater.prototype.return.

This is behavior which we cannot replicated in an equivalent async generator, insofar as there is no way with try/finally blocks to inspect the original return value, only overwrite it. By exposing the value passed to return, we create opportunities for abstraction leaks where you have to treat calls to return on a repeater differently from calls to return on an async generator.

  1. Overwriting the return value of repeaters in the case of early return is probably a bad idea anyways.

Let’s say that we actually wanted this extra ability to inspect the value passed to return via stop. After all, with repeaters, you can race the stopping/returning of the repeater with another promise to ensure timely returns, which is something you cannot do within an async generator. However, even if we wanted to inspect the value passed to return with the stop promise, the only way it could be used would be to overwrite the final result of the iterator. All code which runs in the executor after the stop promise settles in the case of early return is equivalent to code within a finally block in an async generator, and established best practices dictate that we should not put any control flow statements in finally blocks. We already have no means of putting yield statements in a repeater’s “finally” block, insofar as push becomes a no-op function once the repeater is stopped, so it makes sense that we should also disallow the equivalent of return as well. Using the return value of the executor in the case of early return is like promoting the usage of return statements in finally blocks for async generators, which is something we don’t want to do.

Therefore, making the value passed to the return method inspectable becomes unnecessary. The executor’s return value should only be used in the case of a repeater’s normal completion. The only situation where we should overwrite the result of an early return is in the case of the executor throwing an error.

  1. Trying to figure out what to return from the executor is annoying.

In the implementation of various functions like the Repeater static method combinators, as well as the implementation of repeater utilities, I often wondered if we should return the stop promise from the executor to preserve the return behavior of async generators. By simply ignoring normal completions in the case of early returns, we get the default behavior of async generators for free, and we can stop worrying about returning the stop promise or using its value.

TS errors when using "module": "NodeNext" and module type is commonjs

If I have "module": "nodenext" in my tsconfig and my module type is commonjs (the default) then I get a TS error on import { Repeater } from "@repeaterjs/repeater":

The current file is a CommonJS module whose imports will produce 'require' calls; however, the referenced file is an ECMAScript module and cannot be imported with 'require'. Consider writing a dynamic 'import("@repeaterjs/repeater")' call instead.

This is because it resolves to the repeater.d.ts listed in the "types" field of your package.json, but because your package.json declares "type": "module", that repeater.d.ts is ESM.

CJS/ESM is kind of a disaster but fortunately all it takes to fix this is an export map (I'll make a PR):

  "exports": {
    ".": {
      "require": {
        "types": "./cjs/repeater.d.ts",
        "default": "./cjs/repeater.js"
      },
      "types": "./repeater.d.ts",
      "default": "./repeater.js"
    }
  },

Should the Repeater automatically stop when its executor fulfills?

Wrote some code in #48 that was like:

function map(iterable, fn) {
  return new Repeater(async (push, stop) => {
    const iter = iterable[Symbol.asyncIterator]();
    let finalIteration;
    stop.then(() => {
      finalIteration = typeof iter.return === "function" ? iter.return() : {done: true};
    });
    while (!finalIteration) {
      const iteration = await iter.next();
      if (iteration.done) {
        stop();
        return iteration.value;
      }
      await push(fn(iteration.value));
    }
    // there’s no need to return finalIteration’s value here because when repeaters are returned, the return value will just be whatever was passed into the `return` method.
    await finalIteration;
  });
}

Note that we have to call stop manually before performing an early return in the repeater executor (stop(); return;). I’m starting to think that the stop could just be implicit when we return from the executor.

We automatically stop the executor when the repeater execution rejects or throws, so why don’t we automatically stop the executor when it returns or fulfills? I think I initially did not implement repeaters in this way because of tests where I would push in a sync function, but I have never written any actual real-life repeaters which don’t await stop somehow.

This would be a breaking change. It would require people who returned early from repeaters to await stop, but I’m not sure anyone uses repeaters like that.

RepeatOrNot

This is an idea for a new feature, either in Repeater or as a separate class in the same project.

The idea: The object returned by new Repeater() should have an additional .then method, which will be a one-time-use variant of .next.

Background: Repeater combinators, Crank JS components etc. can cleverly accept Promises in place of Async Iterables. I would like to write a library that does the reverse of this: provide values that other peoples’ code can use as either a Promise or an Async Iterable.

Example: Say I want to implement a file read() function which can be used in two ways:

const result = await read(name); // Buffered read

and

for await (const chunk of read(name)) { ... } // Streaming read

A good API to help me write this function might be:

function read(filename) {
  return new RepeatOrNot(async (push, stop) => {
    const emitter = getDataEventEmitter(filename);
    const buffer = [];

    emitter.on('data', (chunk) => {
      if (stop.oneTime) buffer.push(chunk);
      else push(chunk);
    });

    emitter.on('close', () => {
      if (stop.oneTime) push(buffer.join(''));
      else stop();
    }

    await stop();
    emitter.closeAndCleanUp();
  });
}

This could be implemented with the following additional behavior on top of Repeater:

When .then() is called on the returned object:

  • we remove [Symbol.asyncIterator], .next, .return etc. from the object. We would do the same to .then if .next were called first.
  • the executor is invoked, with an additional property on stop (name oneTime above) set to true.
  • When .push() is called by the executor, we resolve the stop promise immediately.
  • If the executor throws or calls stop with an argument before pushing a value, reject the promise.

Question about the abilities of async iteration

A few months ago I had to deal with large tree structures and started learning about stuff about Iteration and have been using it a lot since then (can't wait for the iterator-helper proposal).
And while reading up something aboud NodeJS-Streams i found this line in the docs: (section talking about their compatability from the docs)

With the support of async generators and iterators in JavaScript, async generators are effectively a first-class language-level stream construct at this point.

And after some learning experimenting i pretty much replaced using Stream-classes in favour of async-itterators/generators for all simpler use-cases, because I was never the biggest fan of the NodeJS-EventEmitter implementation.

I randomly stumbled over this project and quite like the things it does and the detailed explanations on the website taught me some new things. So i wanted to ask:
What are your thoughts on using modern async iteration to create an alternative EventEmitter with similiar capabilities?

Node already exposes to functions for working with promises and async iteration on EventEmitters:

function once (emitter: EventEmitter, event: string | symbol): Promise<any[]>
function on (emitter: EventEmitter, event: string | symbol): AsyncIterator<any[]>

I did some experimenting around creating such a class and i had some pretty good results, and ended up with quite simple code that implemented many of the safety features and other capabilities of the NodeJS-EventEmitter.

And now that i found this project i wanted to ask someone who knows a bit more about this topic than me.
The Pub/Sub sub-package already implements some similar things

Allow the `delay` function to be reused and make `timeout` return a channel and not a promise.

It feels like a waste to create a channel that fires only once and closes, which is what the delay function from @channel/timers does. Figure out a way to reuse delay timers.

We should probably also make timeout return a channel. It doesn’t make sense for a channel package function to return a promise.

Adding this behavior to delay/timeout will make it useful to use with Channel.race to timeout each iteration of a target async iterator.

Synchronous channels

Sometimes, I want to create synchronous iterators from synchronous callback-based APIs. For instance, the typescript compiler exposes a ts.forEachChild function which takes a node and a callback and walks the typescript AST. It would be nice to be able to consume this and other sync-callbacks with a for…of loop and it would be really nice if the Channel class could polymorphically return a synchronous iterator. We would return a sync iterator based on whether or not the executor returns a Promise or PromiseLike, and throw errors if push is called after the executor returns for a sync Channel. We could also make sure that the executor in the synchronous case does not read the stop argument, because it doesn’t really make sense for synchronous iteration. I’m not entirely sure this is possible to do typescript and might need something like conditional types.

This is related to #18 in that “Channel” seems to imply asynchrony, one possibility is that we rename Channel to X and make the current Channel class AsyncX or something, but this seems less ideal if we can figure out a polymorphic solution.

Create a logo

Renaming the library to repeater meant the old logo which I hacked together in figma didn’t make sense any more so I removed it. It would be nice to have a new logo, maybe using the same theme of a radio antennae.

Old Logo:
logo

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.