Git Product home page Git Product logo

ox's Introduction

Ox

Ideas, suggestions, problems, questions CI Maven Central

Safe direct style concurrency and resiliency for Scala on the JVM. Requires JDK 21 & Scala 3. The areas that we'd like to cover with Ox are:

  • concurrency: developer-friendly structured concurrency, high-level concurrency operators, safe low-level primitives, communication between concurrently running computations
  • error management: retries, timeouts, a safe approach to error propagation, safe resource management
  • scheduling & timers
  • resiliency: circuit breakers, bulkheads, rate limiters, backpressure

All of the above should allow for observability of the orchestrated business logic. We aim to enable writing simple, expression-oriented code in functional style. Weโ€™d like to keep the syntax overhead to a minimum, preserving developer-friendly stack traces, and without compromising performance.

Some of the above are already addressed in the API, some are coming up in the future. Weโ€™d love your help in shaping the project!

To test Ox, use the following dependency, using either sbt:

"com.softwaremill.ox" %% "core" % "0.1.0"

Or scala-cli:

//> using dep "com.softwaremill.ox::core:0.1.0"

Documentation is available at https://ox.softwaremill.com, ScalaDocs can be browsed at https://javadoc.io.

Example

import ox.*
import ox.either.ok
import ox.channels.*
import ox.resilience.*
import scala.concurrent.duration.*

// run two computations in parallel
def computation1: Int = { sleep(2.seconds); 1 }
def computation2: String = { sleep(1.second); "2" }
val result1: (Int, String) = par(computation1, computation2)
// (1, "2")

// timeout a computation
def computation: Int = { sleep(2.seconds); 1 }
val result2: Either[Throwable, Int] = catching(timeout(1.second)(computation))

// structured concurrency & supervision
supervised {
  forkUser {
    sleep(1.second)
    println("Hello!")
  }
  forkUser {
    sleep(500.millis)
    throw new RuntimeException("boom!")
  }
}
// on exception, ends the scope & re-throws

// retry a computation
def computationR: Int = ???
retry(RetryPolicy.backoff(3, 100.millis, 5.minutes, Jitter.Equal))(computationR)

// create channels & transform them using high-level operations
supervised {
  Source.iterate(0)(_ + 1) // natural numbers
          .transform(_.filter(_ % 2 == 0).map(_ + 1).take(10))
          .foreach(n => println(n.toString))
}

// select from a number of channels
val c = Channel.rendezvous[Int]
val d = Channel.rendezvous[Int]
select(c.sendClause(10), d.receiveClause)

// unwrap eithers and combine errors in a union type
val v1: Either[Int, String] = ???
val v2: Either[Long, String] = ???

val result: Either[Int | Long, String] = either:
  v1.ok() ++ v2.ok()

More examples in the docs!.

Other projects

The wider goal of direct style Scala is enabling teams to deliver working software quickly and with confidence. Our other projects, including sttp client and tapir, also include integrations directly tailored towards direct style.

Moreover, also check out the gears project, an experimental multi-platform library also covering direct style Scala.

Contributing

All suggestions welcome :)

To compile and test, run:

sbt compile
sbt test

See the list of issues and pick one! Or report your own.

If you are having doubts on the why or how something works, don't hesitate to ask a question on discourse or via github. This probably means that the documentation, ScalaDocs or code is unclear and can be improved for the benefit of all.

In order to develop the documentation, you can use the doc/watch.sh script, which runs Sphinx using Python. Use doc/requirements.txt to set up your Python environment with pip. Moreover, you can use the compileDocumentation sbt task to verify, that all code snippets compile properly.

Project sponsor

We offer commercial development services. Contact us to learn more about us!

Copyright

Copyright (C) 2023-2024 SoftwareMill https://softwaremill.com.

ox's People

Contributors

adamw avatar ahoy-jon avatar amorfis avatar bishabosha avatar dybekk avatar efemelar avatar geminicaprograms avatar ghik avatar kciesielski avatar kluen avatar rucek avatar scalway avatar sethtisue avatar softwaremill-ci avatar windymelt avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ox's Issues

Add `groupBy` operator

Inspiration could be taken from here. Note that sub-channels created through groupBy should be eventually a subject of merge. How? When? TBC

Add License

What is the license for this library?
Please add a license file to the root dir.

Add a schedule-aware repeat

While the retry mechanism allows for repeating an operation - according to a Schedule in case of error, we'd also like to have a schedule-aware repeat, which would run an operation according to a Schedule or perhaps until a stop condition is met.

Some things to consider:

  • what happens if the operation fails before the scheduled number of repetitions? Maybe this could be expressed with a multi-policy (see: #55) that combines repeat and retry?
  • perhaps the stop condition could be thought of yet another Schedule like UntilResult[T](p: T => Boolean) or Until(p: => Boolean), or both?

Consider supporting Scala 2.13

Hey SoftwareMill,

Since the introduction of project Loom there is a lot of interest in going back to direct style code from monads. Yet, most of the industry is still using Scala 2. Structured concurrency and scoped values will be probably GA in one of the next JVM versions, much sooner than the industry will move to Scala 3 (if at all).
Please consider adding Scala 2.13 support.

Thanks.

Scope finalizers don't run on runtime shutdown

Ox offers a nice way to manage resources in scopes:

supervised {
  val serverBinding = useInScope(NettySyncServer().addEndpoints(endpoints).start())(_.stop())
  //...
  never
}

The release clause (_.stop()) is run only if the scope fails due to an unhandled exception. We often deal with long-living resources which should get released on server shutdown, which typically means a TERM signal to the JVM, handled by a shutdown hook.
This is handled in Cats Effect by IOApp, where users can compose resources into a final effectIO[ExitCode]. In case of runtime shutdown,main fiber will be canceled and resource finalizers will be called.
Maybe we can somehow achieve a similar effect in Ox? There is no notion of App, main fiber etc., but I'm leaving this issue as a remark that using resources in scopes has this particular limitation.

Composable retry policies

The current retry mechanism allows to provide a single policy for retrying the operation. Once this policy is "exhausted", i.e. when it's done retrying without success, it gives up.

We'd like to be able to compose multiple policies, so that more than one can be tried in the attempt to run the operation. For example, we might want to make a couple of immediate retries (with no delay) and, if still unsuccessful, retry indefinitely with exponential backoff.

The above could be written with a hypothetical fallbackTo combinator like:

val policy: RetryPolicy = RetryPolicy.immediate(3).fallbackTo(RetryPolicy.backoffForever(100.millis))

Add multi-policies for retries

The current retry mechanism only supports a single retry policy for a successful/erroneous result of a given operation, which can only be customized by providing a ResultPolicy that allows for re-defining success or failing fast on given errors. The limitation of this approach is that it doesn't support different retry strategies for different types of errors.

We'd like to be able to define a multi-policy, i.e. one that assigns different retry strategies to different errors, so that e.g. we retry immediately for some errors, but use exponential backoff for others.

A multi-policy could be thought of as a total function E => RetryPolicy, where E is the error type for the operation, e.g.

sealed trait Error

object Error:
  case class WithCode(code: Int) extends Error
  case object Other extends Error

val policy = MultiPolicy {
  case Error.WithCode(code) if code == 42 => RetryPolicy.immediate(3)
  case Error.WithCode(_) => RetryPolicy.delay(4, 1.second)
  case Other => RetryPolicy.backoff(5, 100.millis)
}

One thing to consider is whether we want a notion of "failing fast" within the multi-policy - since it can already be handled by a ResultPolicy(isWorthRetrying = _ => false). Allowing to define it within a multi-policy as well - although possibly useful - might introduce ambiguity.

Mocked time

Add delay and now functions. Add test runtime with emulated clock to test code with delays without spending actual time on delays.

Would be invaluable for testing asynchronous code, not sure is this even possible with current Loom design but might be worth a try.

There is similar feature in cats-effect, separate effect evaluator with emulated time: https://typelevel.org/cats-effect/docs/core/test-runtime#mocking-time

Add retries

Retries

Background

We'd like to have a retry mechanism supporting:

  • various retry schedules:
    • direct retry - retrying up to a given number of times with no delay between subsequent retries,
    • delayed retry - retrying up to a given number of times with a fixed delay between subsequent retries,
    • retry with backoff - retrying up to a given number of times with an increasing delay between subsequent retries; this can optionally include a jitter, i.e. a random factor in the delay between subsequent attempts (see: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/)
  • various definitions of the logic to retry:
    • direct, i.e. f: => T,
    • Either, i.e. f: => Either[E, T]
    • Try, i.e. f: => Try[T]

Proposed API

Heavily inspired by https://github.com/softwaremill/retry

import ox.retry
import scala.concurrent.duration.*

def foo: Int = ???
def bar: Either[String, Int] = ???
def baz: Try[Int]

// various logic definitions
retry.directly(3)(foo)
retry.directly(3).either(foo)
retry.directly(3).tried(foo) // can't use "try"

// various retry schedules
retry.directly(3)(foo)
retry.delay(3, 100 millis)(foo)
retry.backoff(3, 100 millis)(foo) // optional jitter defaults to Jitter.None

// Either with non-default jitter
retry.backoff(3, 100 millis, Jitter.Full).either(bar) // other jitters are: Equal, Decorrelated

Questions/doubts

  • wouldn't it be better to include the retry schedules (directly/delay/backoff) as another paramater to retry, e.g.
    retry(delay(3, 100 millis), foo) // however, how would we handle Try/Either?
  • is there any way to distinguish between f: => T and f: Try[T]/f: Either[E, T] without using dedicated functions like tried/either?

Rate control primitives?

Hi! At first, thank you for great async library! I tried examples and played exercises using documentation. It was full of fun!

My suggestion is providing rate control primitives (e.g. simple rate limit, smart (leaky bucket / token bucket) algorithm).

Sometimes I want rate control in production concurrent programming (e.g. batch processing along HTTP API).

Of course, we can implement simple control using par() and Source.tick(). But in production use, I think out-of-the-box rate control primitives is preferred.

`tell` method in actor is not working with sleep

The following code was not working as expected. (ox version: 0.0.24)

import ox.supervised
import ox.channels.*

class Stateful:
  private var counter: Int = 0

  def increment(delta: Int): Int =
    counter += delta
    Thread.sleep(100)
    println(counter)
    counter

supervised {
  val ref = Actor.create(new Stateful)
  ref.tell(_.increment(5))
}

expected results: prints 5
actual results: prints nothing

When Thread.sleep(100) line was removed, it worked as expected.

Integration with raise4s library

As I discussed with Adam on X, I opened this issue because I tried integrating my library raise4s with Ox. First, I tried to create a custom ErrorMode:

class RaiseErrorMode[E] extends ErrorMode[E, [T] =>> Raise[E] ?=> T] {

  private val evaluations: scala.collection.mutable.Map[Raise[E] ?=> Any, Either[E, Any]] =
    scala.collection.mutable.Map.empty

  override def isError[T](f: Raise[E] ?=> T): Boolean = {
    if (!evaluations.contains(f)) {
      val result: Either[E, T] = Raise.either(f)
      evaluations.put(f, result)
    }
    evaluations(f).isLeft
  }

  override def getError[T](f: Raise[E] ?=> T): E =
    evaluations(f) match
      case Left(error) => error
      case Right(_)    => throw new IllegalStateException("The raise execution is not an error")

  override def getT[T](f: Raise[E] ?=> T): T =
    evaluations(f) match
      case Right(value) => value.asInstanceOf[T]
      case Left(_)      => throw new IllegalStateException("The raise execution is an error")

  override def pure[T](t: T): Raise[E] ?=> T = t.succeed

  override def pureError[T](e: E): Raise[E] ?=> T = e.raise[T]
}

It works well with the par DSL. However, it doesn't respect the semantics of the race DSL.

val result: String raises Int =
    race(
      {
        sleep(200.millis)
        println("Lambda 1")
        "42"
      }, {
        sleep(100.millis)
        println("Lambda 2")
        Raise.raise(-1)
      }
    )

Raise.fold(
  block = result,
  recover = error => println(s"Error: $error"),
  transform = v => println(s"Transform $v")
)

The above code randomly executes the two lambdas. It does not take into consideration the real execution of the body of the lambda. The issue is in the actual implementation of the race method:

def race[E, F[_], T](em: ErrorMode[E, F])(fs: Seq[() => F[T]]): F[T] =
  unsupervised {
    val result = new ArrayBlockingQueue[Try[F[T]]](fs.size)
    fs.foreach(f => forkUnsupervised(result.put(Try(f()))))
// Omissis

The application f() in the case of a lambda defined in the context Raise[E] ?=> A is the Raise[E] ?=> A lambda itself and not the result of its execution.

So, to integrate the library raise4s with ox, I need to define a custom raceRaise method that uses the Either type:

def raceRaise[E, T](f1: T raises E, f2: T raises E): T raises E =
  raceR(EitherMode[E])(List(() => Raise.either(f1), () => Raise.either(f2))).bind()

Feel free to let me know if you need any more information.

Tasty exception after update

It seems the new artifacts published recently are compiled against a Scala 3.4 release candidate, which makes them not usable from Scala 3.3:

[error] error while loading resource$package,
[error] class file ox/resource$package.class is broken, reading aborted with class dotty.tools.tasty.UnpickleException
[error] TASTy signature has wrong version.
[error]  expected: {majorVersion: 28, minorVersion: 3}
[error]  found   : {majorVersion: 28, minorVersion: 4 [unstable release: 1]}
[error]
[error] This TASTy file was produced by an unstable release.
[error] To read this TASTy file, your tooling must be at the same version.
[error] The TASTy file was produced by Scala 3.4.0-RC1-bin-20230825-2616c8b-NIGHTLY-git-2616c8b.

Would it be possible to publish artifacts for Scala 3.3?

Naming things - unwrapping Eithers in boundary-break: .value or .?

Recent ox releases include a boundary-break implementation specialised to Eithers, which allows you to write "unwrap" an Either within a specified boundary, for example:

def lookupUser(id1: Int): Either[String, User] = ???
def lookupOrganization(id2: Int): Either[String, Organization] = ???

val result: Either[String, Assignment] = either:
  val user = lookupUser(1).value
  val org = lookupOrganization(2).value
  Assignment(user, org)

Here, .value is used for the "unwrapping". However, one alternative would be to use the Rust-inspired .?. On the one hand, @lbialy argues that .value is clash-prone, along with @Ichoran who says .value suggests that it's a safe accessor.

On the other, symbolic operators historically didn't work out that well, plus, as @alexandru points out, it looks awkward and doesn't play well with fewer-braces.

What's your take? Vote with a ๐Ÿ‘ on a comment with your pick, or propose your own!

scoped question

Hello..
I have one question scoped function for learning the library.

for example of java =>
var scope = new StructuredTaskScope.ShutdownOnFailure()
scope.fork(() -> // long time take task )
scope.fork(() -> exception happened)
scope.join()
scope.throwIfFailed()

the second task has an exception, the first task can be cancelled.

But in ox library, scoped function,
I need to set up two forks join.

How do I code same as java example?
If one of fork raises the exception, I need to cancel the other forks.

Thank you in advance.
Best Regards,
Richie.

Add side-effecting callback to retry attempts

When the retry mechanism performs another attempt to run the operation, it might be useful to execute a side effect - like logging or updating metrics.

Some things to consider:

  • how much context should such a callback have (e.g. which attempt is this)
  • should the callback be executed before or after the attempt? In the latter case it could include the result, which - particularly in case of error - might be useful for logging/metrics. Or perhaps both such callbacks should be available, so that we can hook into the retry lifecycle even more?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.