r-lib / coro Goto Github PK
View Code? Open in Web Editor NEWCoroutines for R
Home Page: https://coro.r-lib.org/
License: Other
Coroutines for R
Home Page: https://coro.r-lib.org/
License: Other
To chain generators. yield_from()
creates a loop that yields value from an iterator until done. Useful construct for implementing readiness-based await()
which loops calls to an awaitable function until the value is ready.
Hi @lionel-,
I am just playing with coro
and it was perhaps surprising to me that when looping over an iterator
with loop()
and for
, that the yields from the iterator are different than if I call collect()
. Is this behaviour expected? Am I misunderstanding/misusing these functions? Should iterators only ever return single-element objects?
collect()
returns a list with an element for each column-value for a given row, while loop()/for
return the expected value which is a single row of data.
library(R6)
library(coro)
Foo <- R6Class(
classname = "Foo",
public = list(
data = mtcars,
.length = function() {
nrow(self$data)
},
.getitem = function(i) {
self$data[i, ]
}
)
)
gen <- generator(
function() {
x <- Foo$new()
for (i in 1:x$.length()) {
yield(x$.getitem(i))
}
}
)
iter <- gen()
collect(iter, 1)
#> [[1]]
#> [1] 21
#>
#> [[2]]
#> [1] 6
#>
#> [[3]]
#> [1] 160
#>
#> [[4]]
#> [1] 110
#>
#> [[5]]
#> [1] 3.9
#>
#> [[6]]
#> [1] 2.62
#>
#> [[7]]
#> [1] 16.46
#>
#> [[8]]
#> [1] 0
#>
#> [[9]]
#> [1] 1
#>
#> [[10]]
#> [1] 4
#>
#> [[11]]
#> [1] 4
loop(for (i in iter) {
print(i)
})
#> mpg cyl disp hp drat wt qsec vs am gear carb
#> Mazda RX4 Wag 21 6 160 110 3.9 2.875 17.02 0 1 4 4
#> mpg cyl disp hp drat wt qsec vs am gear carb
#> Datsun 710 22.8 4 108 93 3.85 2.32 18.61 1 1 4 1
#> mpg cyl disp hp drat wt qsec vs am gear carb
#> Hornet 4 Drive 21.4 6 258 110 3.08 3.215 19.44 1 0 3 1
#> mpg cyl disp hp drat wt qsec vs am gear carb
#> Hornet Sportabout 18.7 8 360 175 3.15 3.44 17.02 0 0 3 2
#> mpg cyl disp hp drat wt qsec vs am gear carb
# << TRUNCATED >>
It would be nice to programmatically create an async function, such as when you want to convert an expression into an anonymous function.
An example interface could be:
as_async_function(
{ await(foo) }
)
At the moment we can only pass anonymous functions to coro::async
because of the substitute call in coro::: assert_lambda
. Technically I could use coro:::generator0(fn, type = "async")
, but it seems it's not exported for a reason.
You can get half of the way with rlang::inject
, but it's a bit hacky and you can't use it to set the formals of the anonymous function.
Do you think it makes sense to support a %do%
operator like in foreach
?
@skeydan suggested something like this:
foreach(x=as_iterator(1:3)) %do%
sqrt(x)
Necessary:
person(given = "Posit Software, PBC", role = c("cph", "fnd"))
use_mit_license()
use_tidy_logo()
usethis::use_tidy_coc()
usethis::use_tidy_github_actions()
Optional:
pak::pak("org/pkg")
over devtools::install_github("org/pkg")
in READMEuse_tidy_dependencies()
and/or replace compat files with use_standalone()
use_standalone("r-lib/rlang", "types-check")
instead of home grown argument checkersToo similar to recipes step functions as @topepo pointed out at the group meeting.
This is more like a bug in users code but... In the following code chunk, flowery won't break
as expected.
library(flowery)
iter <- generator({
for (x in letters) yield(x)
})
iterate(for(x in iter) {
print(x)
break
})
It's worth noting that this works as expected:
iterate(for(x in iter) {
print(x)
if (x == "b")
break
})
They currently return the exhaustion sentinel which is not thenable:
fn <- async(function() while (FALSE) NULL)
fn()
#> exhausted
promises::then(fn(), ~ NULL)
#> Error in as.promise.default(promise) :
#> Don't know how to convert object of class name into a promise
For technical reasons this is only an issue in async functions that don't use await()
. Low priority.
Hello,
Nice R package that I love to use now !
I would like to use coro in a R script, but when I run the example https://coro.r-lib.org/reference/async.html with Rscript, it ends immediatly.
How can I wait the termination of the promise before R exits ?
The only ( and dirty ) solution I found is :
# mycoroScript.R
# see https://coro.r-lib.org/reference/async.html
promises::promise_all( async_count_down(5), async_count_up(5) ) %>% then( function(value) {
writeLines("q( status = 0)", f <- file("/tmp/fifo", raw=T) ); close(f)
}
To run it from bash :
$ ( mkfifo /tmp/fifo; cat mycoroScript.R; cat /tmp/fifo) | R --interactive --no-save
Thanks for your advice !
Return values from yield()
to create a push/pull coroutine instead of a pull-only generator.
Iterate will always return a NULL
when finishing the iteration. Can we make this invisible?
library(flowery)
iterate(
for (x in as_iterator(1:3)) {
print(x)
}
)
#> [1] 1
#> [1] 2
#> [1] 3
#> NULL
Created on 2019-04-17 by the reprex package (v0.2.1)
Document the behavior, at least. In general we would need to think about best practices for obtaining and releasing locks in generators, and also global context in general, e.g. default graphics device, options
, etc.
Hi all, I'm trying to use generators inside a library, but after declaring a generator and using it this happens:
f_iterator <- coro::generator(function(ret, from, to) {
for (id in seq(length(from))){
coro::yield(list(
from = from[[id]],
to = to[[id]],
distance = ret[[id]]
))
}
})
f_iterator : <anonymous>: no visible
binding for global variable ‘generator_env’
f_iterator : <anonymous>: no visible
binding for global variable ‘exits’
Undefined global functions or variables:
exits generator_env id
What is the right way to handle this?
Usually, with rcmdcheck, use global variables are more like a workaround than a proper fix.
Thx!
Hi @lionel- Thanks for the awesome package! 🚀
I have a question regarding adding coro to "Suggests" in my package openalexR. The idea is to allow the user to crawl through one OpenAlex record at a time (PR). My current implementation is at https://github.com/ropensci/openalexR/blob/coro/R/coro.R. Do you have any suggestion on how to write tests for this generator function oa_generate
? I tried using skip_if_not_installed("coro")
and still getting errors...
The test file itself runs fine, but devtools::check(cran = TRUE)
complains:
Error in oar() : could not find function "oar"
Execution halted
Any advice is much appreciated! Thanks again! 💯
e.g. as.symbol(".__exhausted__.")
. This symbol should never appear literally in code or be stored in static objects. This way we can loop over symbols e.g. in a namespace without risking unexpected exhaustion.
Using coro
1.0.3, promises
1.2.0.1, and future
1.28.0, I get the following unexpected behavior. Running the following code, the error is raised in the main thread (and not handled by the onRejected
error handler, the execution does not reach the part of the code that defines the handlers):
f <- async(\() {
stop(":(")
})
p <- f()
p %>% then(
onFullfilled = \(value) { print(value) },
onRejected = \(error) { print(error$message) }
)
The same goes if I have some await statement after stop(.)
. Of course, if I await a promise before the stop(.)
all goes well.
It seems to me that whatever compiles this down to promises makes the first block of synchronous instructions execute immediately, which is also how things are implemented in other languages/execution environments (NodeJS for instance).
However, everything that happens inside an asynchronous function should be handled by declared handlers, and eventually, if some errors are not handled those should raise some warnings about unhandled rejections.
I am not sure if that should also be the case for promises as defined in the promises
package, and maybe that is where the error originates from. In JavaScript for example, they have made the decision to also nicely handle direct errors raised within a Promise
executor function (although one could argue the only proper way to raise an error for a promise in JavaScript is to call the reject
handler).
It looks like that the function environment where the generator is created and used is never released, even if the generator itself is not in scope anymore.
Here's a reprex:
g <- function() {
hello <- sample(1:1e7) # large object to easily verify the leak
generate_abc <- coro::generator(function() {
for (x in letters[1:3]) {
coro::yield(x)
}
})
coro::loop(for(x in generate_abc()) {
y <- x
})
}
for (x in 1:10) {
g()
gc()
print(lobstr::mem_used())
}
#> 84,001,952 B
#> 124,300,760 B
#> 164,343,576 B
#> 204,386,400 B
#> 244,429,208 B
#> 284,472,048 B
#> 324,514,856 B
#> 364,557,664 B
#> 404,600,472 B
#> 444,643,344 B
cc @gaborcsardi
Hi @lionel- . Thanks for the great pkg!
I found some weird behaviour:
async_test = function (seconds = 1) {
promises::promise(function(resolve, reject) {
later::later(~resolve(42L), delay = seconds)
})
}
my_async = async(function() {
res <- tryCatch(
await(async_test()),
error = function(err) NULL
)
message(res)
res
})
p1 = my_async()
# 3
not sure why result is 3...
my_async = async(function() {
res <- await(async_test())
message(res)
res
})
p2 = my_async()
# 42
here 42 as expected
Apart from that if I use =
as assignment (res = await(async_test())
) then await
throws exception:
my_async = async(function() {
res = await(async_test())
message(res)
res
})
p2 = my_async()
# Error: `await()` can't be called directly or within function arguments.
I have the following code, where the generator and the looping over the generated function is executed in parallel for different values.
now the lockfile (indicated in the code) is sometimes not deleted after the completion.
Therefore my question: is the usage of coro in a parallel processin=g environment safe, i.e. is cor thread safe?
Thanks,
Rainer
pbmcapply::pbmclapply(
years,
function(y) {
output_path <- file.path(SOME_BASEDIR, y). ######### <<<<<------- EDIT HERE
oar <- openalexR::oa_generate(
...
)
# set <- vector("list", set_size)
set <- NULL
set_no <- 0
file.create(file.path(output_path, "00_in_progress_00")). ## <- process lock file
coro::loop(
for (x in oar) {
set <- c(set, list(x))
if ((length(set) >= set_size) | isTRUE(x == coro::exhausted())) {
saveRDS(set, file.path(output_path, paste0("set_", set_no, ".rds")))
# set <- vector("list", set_size) # reset recs
set <- list()
set_no <- set_no + 1
}
}
)
### and save the last one
saveRDS(set, file.path(output_path, paste0("set_", set_no, ".rds")))
file.create(file.path(output_path, "00_complete_00"))
file.rename(
file.path(output_path, "00_in_progress_00"),
file.path(output_path, "00_complete_00")
)
},
mc.cores = mc_cores,
mc.preschedule = FALSE
)
Eg.
library(flowery)
library(magrittr)
make_iter <- function(x) {
generator({
for (elt in x) yield(elt)
})
}
iter <- c("foo", "bar") %>% make_iter()
flowery::iterate(for(x in iter) {
print(x)
})
#> <iterator>
#> function ()
#> {
#> evalq(env, expr = {
#> while (TRUE) {
#> switch(`_state`, `1` = {
#> `_for_iter_2` <- x
#> if (base::is.factor(`_for_iter_2`)) {
#> `_for_iter_2` <- base::as.character(`_for_iter_2`)
#> }
#> `_for_iter_2` <- flowery::as_iterator(`_for_iter_2`)
#> if (flowery::is_done(`_for_iter_2`)) {
#> `_for_iter_2`()
#> }
#> `_goto`("2")
#> }, `2` = {
#> if (flowery::advance(`_for_iter_2`)) {
#> elt <- flowery::deref(`_for_iter_2`)
#> `_goto`("3")
#> } else {
#> `_goto`("4")
#> }
#> }, `3` = {
#> `_pause`("2", elt)
#> }, `4` = {
#> return(invisible(NULL))
#> }, `5` = {
#> .Primitive("return")(invisible(NULL))
#> })
#> }
#> })
#> }
#> <environment: 0x7ff665992400>
#> [1] TRUE
#> NULL
Created on 2019-04-16 by the reprex package (v0.2.1)
It works as expected when creating the iterator without using the pipe:
library(flowery)
library(magrittr)
make_iter <- function(x) {
generator({
for (elt in x) yield(elt)
})
}
iter <- make_iter(c("foo", "bar"))
flowery::iterate(for(x in iter) {
print(x)
})
#> [1] "foo"
#> [1] "bar"
#> NULL
Created on 2019-04-16 by the reprex package (v0.2.1)
In torch, in order to follow closely the Python implementation we implemented iterators as R6 classes with .iter()
and .next()
methods.
A minimal example would be something like this:
library(coro)
Range <- R6::R6Class(
classname = "Range",
public = list(
n = NULL,
i = 0,
initialize = function(n) {
self$n <- n
},
.next = function() {
self$i <- self$i + 1
if (self$i <= self$n)
return(self$i)
coro::exhausted()
}
)
)
as_iterator.Range <- function(x) {
force(x)
coro::as_iterator(function() {
x$.next()
})
}
coro::iterate(for (i in as_iterator.Range(Range$new(3))) {
print(i)
})
This works nice, but it would be nice to avoid the explicit as_iterator.Range
call:
coro::iterate(for (i in Range$new(3)) {
print(i)
})
The $.iter()
method can be used to decouple different strategies of iteration or simply to avoid the need of reinitializing the class. In torch, the context is something like this: The dl
instance can be used for every epoch without the need to recreate the object. This would be possible by abusing as_iterator
and implementing a method for Dataloader instances that returns a fresh iterator each time.
dl <- Dataloader$new(dataset)
for (i in 1:epochs) {
iterate(for(batch in dl) {
# train model
})
}
Here's torch's code: https://github.com/mlverse/torch/blob/master/R/utils-data-dataloader.R#L84
This seems like a great package, I'd love to incorporate it into usage. Are there any plans to publish this to CRAN?
Not sure if you want to support that, but the following does not work:
g <- R6::R6Class(
"generator",
public = list(
x = coro::generator(function() {
for (i in 1:10)
yield(i)
})
)
)
instance <- g$new()
instance$x
#> <generator>
#> Error in env_get(fn_env(x), "fn", inherit = TRUE): argument "default" is missing, with no default
instance$x()
#> Error in dots_split(..., .n_unnamed = 0:1): object '_parent' not found
R6 changes the function environments to the class env. It's probably also removing important information from the generator function.
Is the following behaviour expected? I am trying to yield
while iterating over a generator. Something like this, where 1:10
could be any iterator
created with as_iterator()
.
library(coro)
g <- coro::generator(function() {
coro::iterate(for (x in 1:10) {
if (x < 5)
yield(x)
else
yield(-1)
})
})
g <- g()
g()
#> Error: `yield()` can't be called directly or within function arguments
Created on 2020-11-16 by the reprex package (v0.3.0)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.