softwaremill / ox Goto Github PK
View Code? Open in Web Editor NEWSafe direct style concurrency and resiliency for Scala on the JVM
Home Page: https://ox.softwaremill.com
License: Apache License 2.0
Safe direct style concurrency and resiliency for Scala on the JVM
Home Page: https://ox.softwaremill.com
License: Apache License 2.0
What is the license for this library?
Please add a license file to the root dir.
Recent ox releases include a boundary-break implementation specialised to Either
s, which allows you to write "unwrap" an Either
within a specified boundary, for example:
def lookupUser(id1: Int): Either[String, User] = ???
def lookupOrganization(id2: Int): Either[String, Organization] = ???
val result: Either[String, Assignment] = either:
val user = lookupUser(1).value
val org = lookupOrganization(2).value
Assignment(user, org)
Here, .value
is used for the "unwrapping". However, one alternative would be to use the Rust-inspired .?
. On the one hand, @lbialy argues that .value
is clash-prone, along with @Ichoran who says .value
suggests that it's a safe accessor.
On the other, symbolic operators historically didn't work out that well, plus, as @alexandru points out, it looks awkward and doesn't play well with fewer-braces.
What's your take? Vote with a ๐ on a comment with your pick, or propose your own!
Check asSourceWithContext and SourceWithContext.fromTuples for inspiration.
Chunk
is now just a thin wrapper for an Array. It should be optimized to avoid unnecessary copying of data. Operations like concatenation,drop
, take
, splitAt
etc. should be reimplemented. Such approach has been taken by libs like fs2
and zio
, as well as Pekko's ByteString
. All these libraries use their representations to represent a byte array with performance optimizations, to keep good efficiency in stream processing.
ZIO: https://github.com/zio/zio/blob/76a463192ef27c5f1ccc038cf93fe35b42ad4869/core/shared/src/main/scala/zio/Chunk.scala
fs2: https://github.com/typelevel/fs2/blob/main/core/shared/src/main/scala/fs2/Chunk.scala
Pekko: https://github.com/apache/pekko/blob/d98942dd3b021874ac324464f3fd788f9be046df/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala
The current retry mechanism allows to provide a single policy for retrying the operation. Once this policy is "exhausted", i.e. when it's done retrying without success, it gives up.
We'd like to be able to compose multiple policies, so that more than one can be tried in the attempt to run the operation. For example, we might want to make a couple of immediate retries (with no delay) and, if still unsuccessful, retry indefinitely with exponential backoff.
The above could be written with a hypothetical fallbackTo
combinator like:
val policy: RetryPolicy = RetryPolicy.immediate(3).fallbackTo(RetryPolicy.backoffForever(100.millis))
The current retry mechanism only supports a single retry policy for a successful/erroneous result of a given operation, which can only be customized by providing a ResultPolicy
that allows for re-defining success or failing fast on given errors. The limitation of this approach is that it doesn't support different retry strategies for different types of errors.
We'd like to be able to define a multi-policy, i.e. one that assigns different retry strategies to different errors, so that e.g. we retry immediately for some errors, but use exponential backoff for others.
A multi-policy could be thought of as a total function E => RetryPolicy
, where E
is the error type for the operation, e.g.
sealed trait Error
object Error:
case class WithCode(code: Int) extends Error
case object Other extends Error
val policy = MultiPolicy {
case Error.WithCode(code) if code == 42 => RetryPolicy.immediate(3)
case Error.WithCode(_) => RetryPolicy.delay(4, 1.second)
case Other => RetryPolicy.backoff(5, 100.millis)
}
One thing to consider is whether we want a notion of "failing fast" within the multi-policy - since it can already be handled by a ResultPolicy(isWorthRetrying = _ => false)
. Allowing to define it within a multi-policy as well - although possibly useful - might introduce ambiguity.
Once websockets get implement the following flow should be modelled:
As described in KillSwitches
It seems the new artifacts published recently are compiled against a Scala 3.4 release candidate, which makes them not usable from Scala 3.3:
[error] error while loading resource$package,
[error] class file ox/resource$package.class is broken, reading aborted with class dotty.tools.tasty.UnpickleException
[error] TASTy signature has wrong version.
[error] expected: {majorVersion: 28, minorVersion: 3}
[error] found : {majorVersion: 28, minorVersion: 4 [unstable release: 1]}
[error]
[error] This TASTy file was produced by an unstable release.
[error] To read this TASTy file, your tooling must be at the same version.
[error] The TASTy file was produced by Scala 3.4.0-RC1-bin-20230825-2616c8b-NIGHTLY-git-2616c8b.
Would it be possible to publish artifacts for Scala 3.3?
The ox.ratelimiter.test.RateLimiterTest
fails randomly and needs to be fixed.
source.toFile
should work for Source[Chunk[Byte]]
and Source[String]
.Add delay
and now
functions. Add test runtime with emulated clock to test code with delays without spending actual time on delays.
Would be invaluable for testing asynchronous code, not sure is this even possible with current Loom design but might be worth a try.
There is similar feature in cats-effect, separate effect evaluator with emulated time: https://typelevel.org/cats-effect/docs/core/test-runtime#mocking-time
Ox offers a nice way to manage resources in scopes:
supervised {
val serverBinding = useInScope(NettySyncServer().addEndpoints(endpoints).start())(_.stop())
//...
never
}
The release clause (_.stop()
) is run only if the scope fails due to an unhandled exception. We often deal with long-living resources which should get released on server shutdown, which typically means a TERM signal to the JVM, handled by a shutdown hook.
This is handled in Cats Effect by IOApp
, where users can compose resources into a final effectIO[ExitCode]
. In case of runtime shutdown,main fiber will be canceled and resource finalizers will be called.
Maybe we can somehow achieve a similar effect in Ox? There is no notion of App
, main fiber
etc., but I'm leaving this issue as a remark that using resources in scopes has this particular limitation.
At present when error happens during element emission the source gets failed. How about:
.decodeUtf8
to Source[Chunk[Byte]]
to convert it to a Source[String]
.encodeUtf8
to Source[String]
to convert it to a Source[Byte]
or Source[Chunk[Byte]]
fs2
the default encoding produces Source[Byte]
, and a supplementary encodeC
produces a source of chunks. Not sure which approach we should take as the default._.chunk
and _.unchunk
for conversions between Source[Chunk[A]]
and Source[A]
Hey SoftwareMill,
Since the introduction of project Loom there is a lot of interest in going back to direct style code from monads. Yet, most of the industry is still using Scala 2. Structured concurrency and scoped values will be probably GA in one of the next JVM versions, much sooner than the industry will move to Scala 3 (if at all).
Please consider adding Scala 2.13 support.
Thanks.
Inspiration could be taken from here. Note that sub-channels created through groupBy
should be eventually a subject of merge
. How? When? TBC
While the retry mechanism allows for repeating an operation - according to a Schedule
in case of error, we'd also like to have a schedule-aware repeat
, which would run an operation according to a Schedule
or perhaps until a stop condition is met.
Some things to consider:
repeat
and retry
?Schedule
like UntilResult[T](p: T => Boolean)
or Until(p: => Boolean)
, or both?We'd like to have a retry mechanism supporting:
f: => T
,Either
, i.e. f: => Either[E, T]
Try
, i.e. f: => Try[T]
Heavily inspired by https://github.com/softwaremill/retry
import ox.retry
import scala.concurrent.duration.*
def foo: Int = ???
def bar: Either[String, Int] = ???
def baz: Try[Int]
// various logic definitions
retry.directly(3)(foo)
retry.directly(3).either(foo)
retry.directly(3).tried(foo) // can't use "try"
// various retry schedules
retry.directly(3)(foo)
retry.delay(3, 100 millis)(foo)
retry.backoff(3, 100 millis)(foo) // optional jitter defaults to Jitter.None
// Either with non-default jitter
retry.backoff(3, 100 millis, Jitter.Full).either(bar) // other jitters are: Equal, Decorrelated
retry
, e.g.
retry(delay(3, 100 millis), foo) // however, how would we handle Try/Either?
f: => T
and f: Try[T]
/f: Either[E, T]
without using dedicated functions like tried
/either
?The following code was not working as expected. (ox version: 0.0.24)
import ox.supervised
import ox.channels.*
class Stateful:
private var counter: Int = 0
def increment(delta: Int): Int =
counter += delta
Thread.sleep(100)
println(counter)
counter
supervised {
val ref = Actor.create(new Stateful)
ref.tell(_.increment(5))
}
expected results: prints 5
actual results: prints nothing
When Thread.sleep(100) line was removed, it worked as expected.
Check grouped for inspiration
As I discussed with Adam on X, I opened this issue because I tried integrating my library raise4s with Ox. First, I tried to create a custom ErrorMode
:
class RaiseErrorMode[E] extends ErrorMode[E, [T] =>> Raise[E] ?=> T] {
private val evaluations: scala.collection.mutable.Map[Raise[E] ?=> Any, Either[E, Any]] =
scala.collection.mutable.Map.empty
override def isError[T](f: Raise[E] ?=> T): Boolean = {
if (!evaluations.contains(f)) {
val result: Either[E, T] = Raise.either(f)
evaluations.put(f, result)
}
evaluations(f).isLeft
}
override def getError[T](f: Raise[E] ?=> T): E =
evaluations(f) match
case Left(error) => error
case Right(_) => throw new IllegalStateException("The raise execution is not an error")
override def getT[T](f: Raise[E] ?=> T): T =
evaluations(f) match
case Right(value) => value.asInstanceOf[T]
case Left(_) => throw new IllegalStateException("The raise execution is an error")
override def pure[T](t: T): Raise[E] ?=> T = t.succeed
override def pureError[T](e: E): Raise[E] ?=> T = e.raise[T]
}
It works well with the par
DSL. However, it doesn't respect the semantics of the race
DSL.
val result: String raises Int =
race(
{
sleep(200.millis)
println("Lambda 1")
"42"
}, {
sleep(100.millis)
println("Lambda 2")
Raise.raise(-1)
}
)
Raise.fold(
block = result,
recover = error => println(s"Error: $error"),
transform = v => println(s"Transform $v")
)
The above code randomly executes the two lambdas. It does not take into consideration the real execution of the body of the lambda. The issue is in the actual implementation of the race
method:
def race[E, F[_], T](em: ErrorMode[E, F])(fs: Seq[() => F[T]]): F[T] =
unsupervised {
val result = new ArrayBlockingQueue[Try[F[T]]](fs.size)
fs.foreach(f => forkUnsupervised(result.put(Try(f()))))
// Omissis
The application f()
in the case of a lambda defined in the context Raise[E] ?=> A
is the Raise[E] ?=> A
lambda itself and not the result of its execution.
So, to integrate the library raise4s
with ox
, I need to define a custom raceRaise
method that uses the Either
type:
def raceRaise[E, T](f1: T raises E, f2: T raises E): T raises E =
raceR(EitherMode[E])(List(() => Raise.either(f1), () => Raise.either(f2))).bind()
Feel free to let me know if you need any more information.
When the retry mechanism performs another attempt to run the operation, it might be useful to execute a side effect - like logging or updating metrics.
Some things to consider:
Hi! At first, thank you for great async library! I tried examples and played exercises using documentation. It was full of fun!
My suggestion is providing rate control primitives (e.g. simple rate limit, smart (leaky bucket / token bucket) algorithm).
Sometimes I want rate control in production concurrent programming (e.g. batch processing along HTTP API).
Of course, we can implement simple control using par()
and Source.tick()
. But in production use, I think out-of-the-box rate control primitives is preferred.
Hello..
I have one question scoped function for learning the library.
for example of java =>
var scope = new StructuredTaskScope.ShutdownOnFailure()
scope.fork(() -> // long time take task )
scope.fork(() -> exception happened)
scope.join()
scope.throwIfFailed()
the second task has an exception, the first task can be cancelled.
But in ox library, scoped function,
I need to set up two forks join.
How do I code same as java example?
If one of fork raises the exception, I need to cancel the other forks.
Thank you in advance.
Best Regards,
Richie.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.