Git Product home page Git Product logo

coro's People

Contributors

dfalbel avatar lionel- avatar t-kalinowski avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

coro's Issues

yield_from()

To chain generators. yield_from() creates a loop that yields value from an iterator until done. Useful construct for implementing readiness-based await() which loops calls to an awaitable function until the value is ready.

Questions about yield differences between collect and loop/for

Hi @lionel-,

I am just playing with coro and it was perhaps surprising to me that when looping over an iterator with loop() and for, that the yields from the iterator are different than if I call collect(). Is this behaviour expected? Am I misunderstanding/misusing these functions? Should iterators only ever return single-element objects?

collect() returns a list with an element for each column-value for a given row, while loop()/for return the expected value which is a single row of data.

library(R6)
library(coro)

Foo <- R6Class(
  classname = "Foo",
  public = list(
    data = mtcars,
    .length = function() {
      nrow(self$data)
    },
    .getitem = function(i) {
      self$data[i, ]
    }
  )
)

gen <- generator(
  function() {
    x <- Foo$new()
    for (i in 1:x$.length()) {
      yield(x$.getitem(i))
    }
  }
)

iter <- gen()

collect(iter, 1)
#> [[1]]
#> [1] 21
#> 
#> [[2]]
#> [1] 6
#> 
#> [[3]]
#> [1] 160
#> 
#> [[4]]
#> [1] 110
#> 
#> [[5]]
#> [1] 3.9
#> 
#> [[6]]
#> [1] 2.62
#> 
#> [[7]]
#> [1] 16.46
#> 
#> [[8]]
#> [1] 0
#> 
#> [[9]]
#> [1] 1
#> 
#> [[10]]
#> [1] 4
#> 
#> [[11]]
#> [1] 4

loop(for (i in iter) {
  print(i)
})
#>               mpg cyl disp  hp drat    wt  qsec vs am gear carb
#> Mazda RX4 Wag  21   6  160 110  3.9 2.875 17.02  0  1    4    4
#>             mpg cyl disp hp drat   wt  qsec vs am gear carb
#> Datsun 710 22.8   4  108 93 3.85 2.32 18.61  1  1    4    1
#>                 mpg cyl disp  hp drat    wt  qsec vs am gear carb
#> Hornet 4 Drive 21.4   6  258 110 3.08 3.215 19.44  1  0    3    1
#>                    mpg cyl disp  hp drat   wt  qsec vs am gear carb
#> Hornet Sportabout 18.7   8  360 175 3.15 3.44 17.02  0  0    3    2
#>          mpg cyl disp  hp drat   wt  qsec vs am gear carb
# << TRUNCATED >>

programmatic async function creation

It would be nice to programmatically create an async function, such as when you want to convert an expression into an anonymous function.

An example interface could be:

as_async_function(
    { await(foo) }
)

At the moment we can only pass anonymous functions to coro::async because of the substitute call in coro::: assert_lambda. Technically I could use coro:::generator0(fn, type = "async"), but it seems it's not exported for a reason.

You can get half of the way with rlang::inject, but it's a bit hacky and you can't use it to set the formals of the anonymous function.

%do% operator?

Do you think it makes sense to support a %do% operator like in foreach?

@skeydan suggested something like this:

foreach(x=as_iterator(1:3)) %do%
  sqrt(x)

coro upkeep 2023

Necessary:

  • Update copyright holder in DESCRIPTION: person(given = "Posit Software, PBC", role = c("cph", "fnd"))
  • Double check license file uses '[package] authors' as copyright holder. Run use_mit_license()
  • Update email addresses *@rstudio.com -> *@posit.co
  • Update logo (https://github.com/rstudio/hex-stickers); run use_tidy_logo()
  • usethis::use_tidy_coc()
  • usethis::use_tidy_github_actions()

Optional:

  • Review 2022 checklist to see if you completed the pkgdown updates
  • Prefer pak::pak("org/pkg") over devtools::install_github("org/pkg") in README
  • Consider running use_tidy_dependencies() and/or replace compat files with use_standalone()
  • use_standalone("r-lib/rlang", "types-check") instead of home grown argument checkers
  • Add alt-text to pictures, plots, etc; see https://posit.co/blog/knitr-fig-alt/ for examples

iterate will not `break` if it's directly in the body

This is more like a bug in users code but... In the following code chunk, flowery won't break as expected.

library(flowery)

iter <- generator({
  for (x in letters) yield(x)
})

iterate(for(x in iter) {
  print(x)
  break
})

It's worth noting that this works as expected:

iterate(for(x in iter) {
  print(x)
  if (x == "b")
    break
})

Loops and missing else branches in `async()` functions should return promises to `NULL`

They currently return the exhaustion sentinel which is not thenable:

fn <- async(function() while (FALSE) NULL)

fn()
#> exhausted

promises::then(fn(), ~ NULL)
#> Error in as.promise.default(promise) :
#>   Don't know how to convert object of class name into a promise

For technical reasons this is only an issue in async functions that don't use await(). Low priority.

Running coro in R scripts

Hello,

Nice R package that I love to use now !

I would like to use coro in a R script, but when I run the example https://coro.r-lib.org/reference/async.html with Rscript, it ends immediatly.

How can I wait the termination of the promise before R exits ?

The only ( and dirty ) solution I found is :

# mycoroScript.R
# see https://coro.r-lib.org/reference/async.html
promises::promise_all( async_count_down(5), async_count_up(5) ) %>%  then( function(value) {
        writeLines("q( status = 0)", f <- file("/tmp/fifo", raw=T) ); close(f)
}

To run it from bash :
$ ( mkfifo /tmp/fifo; cat mycoroScript.R; cat /tmp/fifo)  | R --interactive --no-save

Thanks for your advice !

Arity-1 generators

Return values from yield() to create a push/pull coroutine instead of a pull-only generator.

Iterate could return invisibly

Iterate will always return a NULL when finishing the iteration. Can we make this invisible?

library(flowery)

iterate(
  for (x in as_iterator(1:3)) {
    print(x)
  }
)
#> [1] 1
#> [1] 2
#> [1] 3
#> NULL

Created on 2019-04-17 by the reprex package (v0.2.1)

Will need to think about `on.exit`

Document the behavior, at least. In general we would need to think about best practices for obtaining and releasing locks in generators, and also global context in general, e.g. default graphics device, options, etc.

How to pass rcmdcheck using generators

Hi all, I'm trying to use generators inside a library, but after declaring a generator and using it this happens:

f_iterator <- coro::generator(function(ret, from, to) {
    for (id in seq(length(from))){
      coro::yield(list(
        from = from[[id]],
        to = to[[id]],
        distance = ret[[id]]
      ))
    }
})
  f_iterator : <anonymous>: no visible
    binding for global variablegenerator_envf_iterator : <anonymous>: no visible
    binding for global variableexitsUndefined global functions or variables:
    exits generator_env id

What is the right way to handle this?

Usually, with rcmdcheck, use global variables are more like a workaround than a proper fix.

Thx!

coro as an optional dependency

Hi @lionel- Thanks for the awesome package! 🚀

I have a question regarding adding coro to "Suggests" in my package openalexR. The idea is to allow the user to crawl through one OpenAlex record at a time (PR). My current implementation is at https://github.com/ropensci/openalexR/blob/coro/R/coro.R. Do you have any suggestion on how to write tests for this generator function oa_generate? I tried using skip_if_not_installed("coro") and still getting errors...

The test file itself runs fine, but devtools::check(cran = TRUE) complains:

Error in oar() : could not find function "oar"
Execution halted

Any advice is much appreciated! Thanks again! 💯

Use an unlikely sentinel

e.g. as.symbol(".__exhausted__."). This symbol should never appear literally in code or be stored in static objects. This way we can loop over symbols e.g. in a namespace without risking unexpected exhaustion.

Async function errors should not be raised in the main thread

Using coro 1.0.3, promises 1.2.0.1, and future 1.28.0, I get the following unexpected behavior. Running the following code, the error is raised in the main thread (and not handled by the onRejected error handler, the execution does not reach the part of the code that defines the handlers):

f <- async(\() {
  stop(":(")
})

p <- f()

p %>% then(
  onFullfilled = \(value) { print(value) },
  onRejected = \(error) { print(error$message) }
)

The same goes if I have some await statement after stop(.). Of course, if I await a promise before the stop(.) all goes well.

It seems to me that whatever compiles this down to promises makes the first block of synchronous instructions execute immediately, which is also how things are implemented in other languages/execution environments (NodeJS for instance).

However, everything that happens inside an asynchronous function should be handled by declared handlers, and eventually, if some errors are not handled those should raise some warnings about unhandled rejections.

I am not sure if that should also be the case for promises as defined in the promises package, and maybe that is where the error originates from. In JavaScript for example, they have made the decision to also nicely handle direct errors raised within a Promise executor function (although one could argue the only proper way to raise an error for a promise in JavaScript is to call the reject handler).

Memory leak when a generator is created inside a function

It looks like that the function environment where the generator is created and used is never released, even if the generator itself is not in scope anymore.

Here's a reprex:

g <- function() {
  hello <- sample(1:1e7) # large object to easily verify the leak
  generate_abc <- coro::generator(function() {
    for (x in letters[1:3]) {
      coro::yield(x)
    }
  })
  coro::loop(for(x in generate_abc()) {
    y <- x
  })
}

for (x in 1:10) {
  g()
  gc()
  print(lobstr::mem_used())
}
#> 84,001,952 B
#> 124,300,760 B
#> 164,343,576 B
#> 204,386,400 B
#> 244,429,208 B
#> 284,472,048 B
#> 324,514,856 B
#> 364,557,664 B
#> 404,600,472 B
#> 444,643,344 B

Some weird behaviour in async

Hi @lionel- . Thanks for the great pkg!

I found some weird behaviour:

async_test = function (seconds = 1) {
  promises::promise(function(resolve, reject) {
    later::later(~resolve(42L), delay = seconds)
  })
}

my_async = async(function() {
  res <- tryCatch(
    await(async_test()),
    error = function(err) NULL
  )
  message(res)
  res
})

p1 = my_async()
# 3

not sure why result is 3...

my_async = async(function() {
  res <- await(async_test())
  message(res)
  res
})
p2 = my_async()
# 42

here 42 as expected

Apart from that if I use = as assignment (res = await(async_test())) then await throws exception:

my_async = async(function() {
  res = await(async_test())
  message(res)
  res
})
p2 = my_async()
#  Error: `await()` can't be called directly or within function arguments.

Question: is coro thread safe?

I have the following code, where the generator and the looping over the generated function is executed in parallel for different values.

now the lockfile (indicated in the code) is sometimes not deleted after the completion.

Therefore my question: is the usage of coro in a parallel processin=g environment safe, i.e. is cor thread safe?

Thanks,

Rainer

pbmcapply::pbmclapply(
            years,
            function(y) {

                    output_path <- file.path(SOME_BASEDIR, y).    ######### <<<<<------- EDIT HERE
                    oar <- openalexR::oa_generate(
                        ...
                    )

                # set <- vector("list", set_size)
                set <- NULL
                set_no <- 0

                file.create(file.path(output_path, "00_in_progress_00")). ## <- process lock file
                coro::loop(
                    for (x in oar) {
                        set <- c(set, list(x))
                        if ((length(set) >= set_size) | isTRUE(x == coro::exhausted())) {
                            saveRDS(set, file.path(output_path, paste0("set_", set_no, ".rds")))
                            # set <- vector("list", set_size) # reset recs
                            set <- list()
                            set_no <- set_no + 1
                        }
                    }
                )
                ### and save the last one
                saveRDS(set, file.path(output_path, paste0("set_", set_no, ".rds")))
                file.create(file.path(output_path, "00_complete_00"))
                file.rename(
                    file.path(output_path, "00_in_progress_00"),
                    file.path(output_path, "00_complete_00")
                )
            },
            mc.cores = mc_cores,
            mc.preschedule = FALSE
        )

iterate won't work if iterator was created in a pipe chain

Eg.

library(flowery)
library(magrittr)

make_iter <- function(x) {
  generator({
    for (elt in x) yield(elt)
  })
}

iter <- c("foo", "bar") %>% make_iter()

flowery::iterate(for(x in iter) {
  print(x)
})
#> <iterator>
#> function () 
#> {
#>     evalq(env, expr = {
#>         while (TRUE) {
#>             switch(`_state`, `1` = {
#>                 `_for_iter_2` <- x
#>                 if (base::is.factor(`_for_iter_2`)) {
#>                   `_for_iter_2` <- base::as.character(`_for_iter_2`)
#>                 }
#>                 `_for_iter_2` <- flowery::as_iterator(`_for_iter_2`)
#>                 if (flowery::is_done(`_for_iter_2`)) {
#>                   `_for_iter_2`()
#>                 }
#>                 `_goto`("2")
#>             }, `2` = {
#>                 if (flowery::advance(`_for_iter_2`)) {
#>                   elt <- flowery::deref(`_for_iter_2`)
#>                   `_goto`("3")
#>                 } else {
#>                   `_goto`("4")
#>                 }
#>             }, `3` = {
#>                 `_pause`("2", elt)
#>             }, `4` = {
#>                 return(invisible(NULL))
#>             }, `5` = {
#>                 .Primitive("return")(invisible(NULL))
#>             })
#>         }
#>     })
#> }
#> <environment: 0x7ff665992400>
#> [1] TRUE
#> NULL

Created on 2019-04-16 by the reprex package (v0.2.1)

It works as expected when creating the iterator without using the pipe:

library(flowery)
library(magrittr)

make_iter <- function(x) {
  generator({
    for (elt in x) yield(elt)
  })
}

iter <-  make_iter(c("foo", "bar"))

flowery::iterate(for(x in iter) {
  print(x)
})
#> [1] "foo"
#> [1] "bar"
#> NULL

Created on 2019-04-16 by the reprex package (v0.2.1)

Allow as_iterator extensions via S3 methods

In torch, in order to follow closely the Python implementation we implemented iterators as R6 classes with .iter() and .next() methods.

A minimal example would be something like this:

library(coro)
Range <- R6::R6Class(
  classname = "Range",
  public = list(
    n = NULL,
    i = 0,
    initialize = function(n) {
      self$n <- n
    },
    .next = function() {
      self$i <- self$i + 1
      if (self$i <= self$n)
        return(self$i)
      coro::exhausted()
    }
  )
)

as_iterator.Range <- function(x) {
  force(x)
  coro::as_iterator(function() {
    x$.next()
  })
}

coro::iterate(for (i in as_iterator.Range(Range$new(3))) {
  print(i)
})

This works nice, but it would be nice to avoid the explicit as_iterator.Range call:

coro::iterate(for (i in Range$new(3)) {
  print(i)
})

The $.iter() method can be used to decouple different strategies of iteration or simply to avoid the need of reinitializing the class. In torch, the context is something like this: The dl instance can be used for every epoch without the need to recreate the object. This would be possible by abusing as_iterator and implementing a method for Dataloader instances that returns a fresh iterator each time.

dl <- Dataloader$new(dataset)

for (i in 1:epochs) {
  iterate(for(batch in dl) {
    # train model
  })
}

Here's torch's code: https://github.com/mlverse/torch/blob/master/R/utils-data-dataloader.R#L84

Error when making generator a method of an R6 class

Not sure if you want to support that, but the following does not work:

g <- R6::R6Class(
  "generator",
  public = list(
    x = coro::generator(function() {
      for (i in 1:10)
        yield(i)
    })
  )
)

instance <- g$new()
instance$x
#> <generator>
#> Error in env_get(fn_env(x), "fn", inherit = TRUE): argument "default" is missing, with no default
instance$x()
#> Error in dots_split(..., .n_unnamed = 0:1): object '_parent' not found

R6 changes the function environments to the class env. It's probably also removing important information from the generator function.

Yield in nested iterators

Is the following behaviour expected? I am trying to yield while iterating over a generator. Something like this, where 1:10 could be any iterator created with as_iterator().

library(coro)

g <- coro::generator(function() {
  coro::iterate(for (x in 1:10) {
    if (x < 5)
      yield(x)
    else
      yield(-1)
  })
})

g <- g()
g()
#> Error: `yield()` can't be called directly or within function arguments

Created on 2020-11-16 by the reprex package (v0.3.0)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.