Git Product home page Git Product logo

swift-async-algorithms's Introduction

swift-async-algorithms

Swift Async Algorithms is an open-source package of asynchronous sequence and advanced algorithms that involve concurrency, along with their related types.

This package has three main goals:

  • First-class integration with async/await
  • Provide a home for time-based algorithms
  • Be cross-platform and open source

Motivation

AsyncAlgorithms is a package for algorithms that work with values over time. That includes those primarily about time, like debounce and throttle, but also algorithms about order like combineLatest and merge. Operations that work with multiple inputs (like zip does on Sequence) can be surprisingly complex to implement, with subtle behaviors and many edge cases to consider. A shared package can get these details correct, with extensive testing and documentation, for the benefit of all Swift apps.

The foundation for AsyncAlgorithms was included in Swift 5.5 from AsyncSequence. Swift 5.5 also brings the ability to use a natural for/in loop with await to process the values in an AsyncSequence and Sequence-equivalent API like map and filter. Structured concurrency allows us to write code where intermediate state is simply a local variable, try can be used directly on functions that throw, and generally treat the logic for asynchronous code similar to that of synchronous code.

This package is the home for these APIs. Development and API design take place on GitHub and the Swift Forums.

Contents

Combining asynchronous sequences

  • chain(_:...): Concatenates two or more asynchronous sequences with the same element type.
  • combineLatest(_:...): Combines two or more asynchronous sequences into an asynchronous sequence producing a tuple of elements from those base asynchronous sequences that updates when any of the base sequences produce a value.
  • merge(_:...): Merges two or more asynchronous sequence into a single asynchronous sequence producing the elements of all of the underlying asynchronous sequences.
  • zip(_:...): Creates an asynchronous sequence of pairs built out of underlying asynchronous sequences.
  • joined(separator:): Concatenated elements of an asynchronous sequence of asynchronous sequences, inserting the given separator between each element.

Creating asynchronous sequences

  • async: Create an asynchronous sequence composed from a synchronous sequence.
  • AsyncChannel: An asynchronous sequence with back pressure sending semantics.
  • AsyncThrowingChannel: An asynchronous sequence with back pressure sending semantics that can emit failures.

Performance optimized asynchronous iterators

  • AsyncBufferedByteIterator: A highly efficient iterator useful for iterating byte sequences derived from asynchronous read functions.

Other useful asynchronous sequences

Asynchronous Sequences that transact in time

Obtaining all values from an asynchronous sequence

Effects

Each algorithm has specific behavioral effects. For throwing effects these can either be if the sequence throws, does not throw, or rethrows errors. Sendability effects in some asynchronous sequences are conditional whereas others require the composed parts to all be sendable to satisfy a requirement of Sendable. The effects are listed here.

Adding Swift Async Algorithms as a Dependency

To use the AsyncAlgorithms library in a SwiftPM project, add the following line to the dependencies in your Package.swift file:

.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),

Include "AsyncAlgorithms" as a dependency for your executable target:

.target(name: "<target>", dependencies: [
    .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
]),

Finally, add import AsyncAlgorithms to your source code.

Getting Started

⚠️ Please note that this package requires Xcode 14 on macOS hosts. Previous versions of Xcode do not contain the required Swift version.

Building/Testing Using Xcode on macOS

  1. In the swift-async-algorithms directory run swift build or swift test accordingly

Building/Testing on Linux

  1. Download the most recent development toolchain for your Linux distribution
  2. Decompress the archive to a path in which the swift executable is in the binary search path environment variable ($PATH)
  3. In the swift-async-algorithms directory run swift build or swift test accordingly

Source Stability

The Swift Async Algorithms package has a goal of being source stable as soon as possible; version numbers will follow Semantic Versioning. Source breaking changes to public API can only land in a new major version.

The public API of version 1.0 of the swift-async-algorithms package will consist of non-underscored declarations that are marked public in the AsyncAlgorithms module. Interfaces that aren't part of the public API may continue to change in any release, including patch releases.

Future minor versions of the package may introduce changes to these rules as needed.

We'd like this package to quickly embrace Swift language and toolchain improvements that are relevant to its mandate. Accordingly, from time to time, we expect that new versions of this package will require clients to upgrade to a more recent Swift toolchain release. Requiring a new Swift release will only require a minor version bump.

swift-async-algorithms's People

Contributors

elmetal avatar ezura avatar franzbusch avatar glessard avatar invalidname avatar jamieq avatar jefflewis avatar jordanekay avatar k-kohey avatar kateinoigakukun avatar kimgaeunnn avatar kingreza avatar kperryua avatar kristofferjohansson avatar ktan17 avatar kyle-ye avatar leogdion avatar markiv avatar maxdesiatov avatar ole avatar parkera avatar phausler avatar rustle avatar s2mr avatar stzn avatar swiftty avatar tevelee avatar tiagomaial avatar tkremenek avatar twittemb avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

swift-async-algorithms's Issues

[Question] What is the point of having AsyncThrowingChannel parametrised with Failure ?

Hi everyone,

I don't understand why AsyncThrowingChannel is parametrised with a Failure type. This generic type is not exploited.

The next() function can throw any error anyway.
The fail() function is available only when the Failure is Error, which removes the need for a generic Failure type IMO.

we could have:

public func fail(_ error: any Error) async { ... }

or better, compiler-wise:

public func fail<ErrorType: Error>(_ error: ErrorType) async { ... }

Am I missing something here ?

Thanks.

`zip` is not Rx compatible (take 2)

Retry of:

func test() async throws {
    let initial = ContinuousClock.now

    let countUpStream = AsyncTimerSequence(interval: .seconds(1), clock: .continuous)
        .map { initial.duration(to: $0).components.seconds }
    let countUpStream2 = AsyncTimerSequence(interval: .seconds(3), clock: .continuous)
        .map { initial.duration(to: $0).components.seconds }

    let zipped = zip(countUpStream, countUpStream2)

    for try await value in zipped {
        let currentSec = initial.duration(to: .now).components.seconds
        print("===> zip: at \(currentSec) sec, \(value)")
    }
}

Result:

===> zip: at 3 sec, (1, 3)
===> zip: at 6 sec, (3, 6)
===> zip: at 9 sec, (6, 9)
===> zip: at 12 sec, (9, 12)
===> zip: at 15 sec, (12, 15)
...

To Be:

===> zip: at 3 sec, (1, 3)
===> zip: at 6 sec, (2, 6)
===> zip: at 9 sec, (3, 9)
===> zip: at 12 sec, (4, 12)
===> zip: at 15 sec, (5, 15)
...
--1-2-3-4-5-6-7-8-9-->
------3-----6-----9-->

zip
----(1,3)--(2,6)--(3,9)--->

Marble diagrams should support cancellation testing

Testing for cancellation is pretty important, we should offer some sort of mechanism to test for that deterministically.

The default ascii diagram symbol could be ; taking up 1 tick (similarly to how other events take that up) in the output sequence. The input sequence could then verify the receipt of that cancellation by the same symbol.

test_bufferingOldest_error fails with an unexpected set of values.

swift-async-algorithms/Tests/AsyncAlgorithmsTests/TestBuffer.swift:454: error: -[AsyncAlgorithmsTests.TestBuffer test_bufferingOldest_error] : specification violation got "5" after iteration terminated at tick 7
swift-async-algorithms/Tests/AsyncAlgorithmsTests/TestBuffer.swift:454: error: -[AsyncAlgorithmsTests.TestBuffer test_bufferingOldest_error] : expected failure but got "3" at tick 7
swift-async-algorithms/Tests/AsyncAlgorithmsTests/TestBuffer.swift:454: error: -[AsyncAlgorithmsTests.TestBuffer test_bufferingOldest_error] : unexpected "4" at tick 7
swift-async-algorithms/Tests/AsyncAlgorithmsTests/TestBuffer.swift:454: error: -[AsyncAlgorithmsTests.TestBuffer test_bufferingOldest_error] : unexpected finish at tick 7
swift-async-algorithms/Tests/AsyncAlgorithmsTests/TestBuffer.swift:451: error: -[AsyncAlgorithmsTests.TestBuffer test_bufferingOldest_error] : failed - Validation failure:
Expected:
X------[12^]
Actual:
X------[1234|]

This needs to be determined if this is a valid failure or an invalid test.

[Help needed] Unable to use the Package in Xcode project

Hi!

I tried using the package on a personal project but because the public API is annotated
with @available(macOS 9999, iOS 9999, tvOS 9999, watchOS 9999, *)
the compiler doesn't build my project.

So I tried:

  • Using the latest Swift development toolchain in Xcode 13.4 and in Xcode 14 Beta
  • Passed -Xfrontend -disable-availability-checking to the compiler in the project build settings

But In the end neither of these made the errors go.

Is there some other way to work around the iOS availability error?
Thanks 🙏

Warnings when flag `-warn-concurrency` is enabled for `AsyncAlgorithms`

If I add warn-concurrency and enable-actor-data-race-check as swiftSettings for AsyncAlgorithms target I get some warnings. This is on macOS Ventura (13.0 Beta (22A5331f)) and using Xcode 14.0 beta 5 (14A5294e).

  .unsafeFlags([
           "-Xfrontend", "-warn-concurrency",
           "-Xfrontend", "-enable-actor-data-race-checks",
     ])

Screenshot 2022-09-10 at 22 42 53

It feels like such a serious and great project like this ought to lead the way in terms of correctness of structured concurrency, so I expect:

  1. These flags to be enabled
  2. Packages, or at least the public one AsyncAlgorithms to not have any concurrency warnings.

`Task.select` may hang indefinitely if the input sequence is empty

Task.select waits for the first task of a sequence to complete and then returns this task. If the sequence is empty, then this operation waits indefinitely:

await withUnsafeContinuation { continuation in
for task in tasks {
Task<Void, Never> {
_ = await task.result
let winner = state.withCriticalRegion { state -> Bool in
defer { state.complete = true }
return !state.complete
}
if winner {
continuation.resume(returning: task)
}
}
state.withCriticalRegion { state in
state.add(task)
}?.cancel()
}
}

In this withUnsafeContinuation block the for-in loop is a no-op and continuation.resume() is never called while according to the specifications it should be called exactly once. I think that for correctness there should be a precondition() checking for non-empty sequences.

I'm not sure how to do that properly since Sequence is not sized (or at least we want to avoid the O(n) count getter). Maybe requiring a RandomAccessCollection or less radical checking if TaskSelectState's tasks is not empty after the for loop?

Provide an option for chunking based on a signal to include empty chunks

For sequences that chunk based on a signal, the signal will be effectively ignored if no elements were received since the previous signal. Adding an option to pass an empty chunk on such a signal would be useful, particularly for use cases where you want to detect and report on the absence of something regularly happening.

An alternative approach would be to make this the default behaviour since there are other mechanisms that can be used to compose a sequence that only produces non-empty chunks.

`Task.select` naming

Task.select seems too general and doesn't really indicate that the first task to complete is chosen. I've seen (and have used myself) Task.race on the forums, which I think more aptly represents the underlying action.

Performance tests for merge only measure one active side

The test currently measures the performance of merge as such:

  func test_merge2() async {
    await measureSequenceThroughput(output: 1) {
      merge($0, (0..<10).async)
    }
  }

Ideally we should measure cases where we have two inputs continuously emitting values. This likely needs a second emitter being added into measureSequenceThroughput so that it can be used for zip and other such combination APIs.

AsyncChannel naming is provisional and needs review

The naming of AsyncChannel was introduced as a placeholder name. It deserves more consideration to existing paradigms, the current name is inspired from Channel. This type needs a guide and some research/discussion on what a good name for this type should be.

[AsyncChannel] Question: How is it possible to register twice the same Awaiting?

@phausler Hi. I guess this question is for you.

I'm working on a personal implementation of the AsyncChannel for a very specific need. I was wondering why should ensure the Set<Awaiting> does not contain twice the same Awaiting (same generation) in the next() function?

How could this happen since the generation is unique and thread safe established? and why then removing the Awaiting and considering it cancelled?

Thanks a lot for your answer.

if nexts.update(with: Awaiting(generation: generation, continuation: continuation)) != nil {
  nexts.remove(Awaiting(placeholder: generation))
  cancelled = true
}

Multi-consumption of an `AsyncSequence`

Hi, I have been reading the documentation for this repository and it left me wondering: is this planned to introduce a feature to allow several tasks to consume the same AsyncSequence?

If this is not the right place to offer ideas, please let me know. I don't think this would fit in a proposal.

Context

I recently had to stream a file using URLSession and two pieces of code were interested in the streamed values. Rather than starting a stream twice which would not be efficient, I wanted to allow the stream to be consumed by several tasks. The data that were already streamed would be sent when a new consumption is set up.

Basic implementation

I tried to implement such a solution (in a gist) that will use a reduce function on the already emitted values and emit the result when a new task starts consuming the sequence. It's far from being perfect but I think it might help understanding the idea.

Implementation

struct ReducedReplayAsyncStream<Element> {

    typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void

    private let storage: _Storage
    private var originalStream: AsyncStream<Element>

    init(
        bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
        initialResult: Element,
        reduce: @escaping Reduce,
        build: (AsyncStream<Element>.Continuation) -> Void
    ) {
        originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build)
        storage = _Storage(stored: initialResult, reduce: reduce)
    }

    private func makeStream() -> AsyncStream<Element> {
        AsyncStream<Element> { continuation in
            Task {
                var isFirst = false
                if await !storage.didStart {
                    await storage.setDidStart(true)
                    isFirst = true
                    startConsumingOriginalStream()
                }

                if !isFirst {
                    await continuation.yield(storage.stored)
                }
                await storage.appendContinuation(continuation)
            }
        }
    }

    private func startConsumingOriginalStream () {
        Task {
            for await value in originalStream {
                await storage.updateWith(value: value)
            }
            await storage.continuations.forEach { $0.finish() }
        }
    }
}

extension ReducedReplayAsyncStream {

    private actor _Storage {
        private let reduce: ReducedReplayAsyncStream.Reduce

        var didStart = false
        var stored: Element
        var continuations: [AsyncStream<Element>.Continuation] = []

        init(stored: Element, reduce: @escaping Reduce) {
            self.stored = stored
            self.reduce = reduce
        }

        func updateWith(value: Element) {
            reduce(&stored, value)
            continuations.forEach { $0.yield(value) }
        }

        func setDidStart(_ value: Bool) {
            didStart = value
        }

        func appendContinuation(_ continuation: AsyncStream<Element>.Continuation) {
            continuations.append(continuation)
        }
    }
}

extension ReducedReplayAsyncStream: AsyncSequence {
    typealias AsyncIterator = AsyncStream<Element>.AsyncIterator

    func makeAsyncIterator() -> AsyncIterator {
        let stream = makeStream()
        return stream.makeAsyncIterator()
    }
}

Usage

var subscriptions: Set<AnyCancellable> = []
var continuation: Stream.Continuation!

let replayStream = ReducedReplayAsyncStream<Int>(
    initialResult: 0,
    reduce: { partialResult, nextResult in partialResult = partialResult + nextResult },
    build: { continuation = $0 }
)

var counter = 0
Timer.publish(every: 0.4, on: .main, in: .default)
    .autoconnect()
    .sink { _ in
        if counter == 10 {
            continuation.finish()
        }
        continuation.yield(counter)
        counter += 1
    }
    .store(in: &subscriptions)

Task {
    for await value in replayStream {
        print("[A]", value)
    }
}

Task {
    try await Task.sleep(nanoseconds: 3_000_000_000)
    for await value in replayStream {
        print("[B]", value)
    }
}

Some considerations about efficiency can be found in the gist.

removeDuplicates naming is provisional and needs review

The name for removeDuplicates is fairly straightforward and has precedent in other frameworks, but should have greater consideration for naming and how it fits into the swift ecosystem. This type needs a guide and some research/discussion on what a good name for this type should be.

Provide a higher arity version of merge(…)

I've got a use case where I have an unknown number of AsyncChannels that I want to merge into a single stream of events.

It'd be wonderful if merge(…) and friends supported more than an arity of 3 - ideally accepting a sequence?

Thanks!

Crash upon a second send on a channel

I am using an AsyncChannel between two tasks, one task is sending events and the second task is running a for await loop where events are processed. Upon sending the second event I am getting this crash..
Screenshot 2022-06-28 at 19 43 47

throttle should define the behavior of the first element

I find there are significant flaws in current throttle implementation.

  1. First value is not emitted
  2. Value is not emitted at throttled-interval-end when latest = true

1. First value is not emitted

From test code:

  func test_rate_3() {
    validate {
      "abcdefghijk|"
      $0.inputs[0].throttle(for: .steps(3), clock: $0.clock)
      "--c--f--i--|"
    }
  }

AFAIK, this throttle behavior is not compatible with any existing Rx frameworks, e.g. Combine, RxSwift, ReactiveSwift.

The correct result should (at least) be:

  func test_rate_3() {
    validate {
      "abcdefghijk|"
      $0.inputs[0].throttle(for: .steps(3), clock: $0.clock)
      "a--d--g--j-|"
    }
  }

where first a is emitted correctly, then start throttling for intervals.

2. Value is not emitted at throttled-interval-end when latest = true

From existing framework's behavior, latest = true normally means the latest cached value will be emitted at "throttled-interval-end".

For example:

    func test() async throws {
        let now = ContinuousClock.now

        let countUpStream = AsyncTimerSequence(interval: .seconds(1), clock: .continuous)
            .map { now.duration(to: $0).components.seconds } // 1, 2, 3, ...

        let throttledStream = countUpStream.throttle(for: .seconds(2.1), latest: true)

        let task = Task {
            for try await value in countUpStream {
                print("===> count", value)
            }
        }

        for try await value in throttledStream {
            print("===> throttled", value)
        }

        _ = await task.result

        print("===> done")
    }

This test runs timer starting from 1 sec, and throttling interval is 2.1 sec.

When latest = true, the ideal result should print:

// Ideal result
===> count 1
===> count 2
===> throttled 2 // at 2.1 sec
===> count 3
===> count 4
===> throttled 4 // at 4.2 sec
===> count 5
===> count 6
===> throttled 6 // at 6.3 sec
===> count 7
===> count 8
===> throttled 8 // at 8.4 sec

But current impl results:

// Current result
===> count 1
===> count 2
===> count 3
===> throttled 3 // at 3 sec
===> count 4
===> count 5
===> count 6
===> throttled 6 // at 6 sec
===> count 7
===> count 8
===> count 9
===> throttled 9 // at 9 sec

Debounce not debouncing?

This seemingly simple expectation fails:

func testDebounce() async {
  let timer = AsyncTimerSequence.repeating(every: .seconds(1)) //  clock: .suspending
    .prefix(10)
    .debounce(for: .seconds(2))                                //  clock: .continuous

  var ticks = 0
  for await _ in timer {
    ticks += 1
  }
  XCTAssertEqual(ticks, 1)  // Only final tick should emit
}

❌ testDebounce(): XCTAssertEqual failed: ("9") is not equal to ("1")
Test Case '-[ClocksTests.ClocksTests testDebounce]' failed (10.207 seconds).

This is on the main branch using Xcode 14 beta 2's Swift.

The test took 10 seconds to run, but this makes it seems like it is completing too soon? I'd expect the debounce to make it take more like 12 seconds?

I also get different results depending on the clock. If I use a continuous clock for the timer (instead of the default suspending clock), I get the same failure:

func testDebounce() async {
  let timer = AsyncTimerSequence.repeating(every: .seconds(1), clock: .continuous)
    .prefix(10)
    .debounce(for: .seconds(2), clock: .continuous)

  var ticks = 0
  for await _ in timer {
    ticks += 1
  }
  XCTAssertEqual(ticks, 1)  // Only final tick should emit
}

❌ testDebounce(): XCTAssertEqual failed: ("9") is not equal to ("1")
Test Case '-[ClocksTests.ClocksTests testDebounce]' failed (0.017 seconds).

But I get this failure much, much, much more quickly (in 10 milliseconds instead of over 10 seconds 😲)...

Finally, with both clocks suspending, I get another result that seems wrong:

func testDebounce() async {
  let timer = AsyncTimerSequence.repeating(every: .seconds(1), clock: .suspending)
    .prefix(10)
    .debounce(for: .seconds(2), clock: .suspending)

  var ticks = 0
  for await _ in timer {
    ticks += 1
  }
  XCTAssertEqual(ticks, 1)  // Only final tick should emit
}

❌ testDebounce(): XCTAssertEqual failed: ("0") is not equal to ("1")
Test Case '-[ClocksTests.ClocksTests testDebounce]' failed (10.085 seconds).

This time it takes the full 10 seconds (again, should it probably be closer to 12?), and now I get no emissions at all.

[Zip] Reimplement to remove Sendable constraint and reduce Task creation

Hi,

This issue is just to keep track that I'm in the process of refactoring the zip operator to follow the work bootstrapped by @FranzBusch.

The goals are:

  • to remove the Sendable constraint on the base iterators
  • to optimise the number of created tasks

I will implement zip state machines which roles will be to compute the next state (and the output) given a current state and an event (as in a Mealy state machine), and use those state machines in the zip async sequences.

I'll keep you posted.

Collecting elements of `AsyncSequence` into a regular `Sequence`

Hi!

I'm thinking about the inverse operation of the async property: a way to collect async elements into a synchronous sequence.

POC implementation (collecting into an Array):

extension AsyncSequence {
    public func collect() async rethrows -> [Element] {
        try await reduce(into: []) { $0.append($1) }
    }
}

It's probably a better idea to return a dedicated type that conforms to Sequence.

Or using the recently pitched primary associated types:

extension AsyncSequence {
    public func collect() async rethrows -> some Sequence<Element> {
        try await reduce(into: .init()) { $0.append($1) }
    }
}

Do you think it's an equally important use-case?

Does `AsyncChannel` supports multiple producers/consumers?

I was playing with AsyncChannel and tried to implement a multiple producers multiple consumers pattern but stumbled on an issue: I wasn't able to terminate the async iteration of all consumers. When there is one consumer the solution is simple, just call AsyncChannel.finish() at the end of the production to send a nil value to the only consumer which then stops.

However, when there are multiple consumers all of them should receive a nil value from the producers. To achieve this I tried to call .finish() as many time as there are consumers but only one stopped.

Is this intended behavior, i.e. an AsyncChannel should only be used with one consumer, or is this a bug? Please see the code snippet bellow to reproduce it.

System:
MacBook Air early 2015
macOS v12.3.1
Xcode v13.3
Swift v5.6

PS: I just read in the documentation that .finish() returns immediately if the channel is already finished, so this means that calling it multiple times won't work for multiple consumers.

import Foundation
import AsyncAlgorithms

func producer(channel: AsyncChannel<Int>, id: Int) async {
    for _ in 0..<3 {
        await channel.send(id)
    }
}

/// Spawns `count` producers
func produce(channel: AsyncChannel<Int>, count: Int) async {
    await withTaskGroup(of: Void.self) { group in
        for id in 0..<count {
            group.addTask {
                await producer(channel: channel, id: id)
            }
        }
    }
    
    for _ in 0..<count {
        await channel.finish()
    }
}

func consumer(channel: AsyncChannel<Int>, id: Int) async {
    for await value in channel {
        try? await Task.sleep(nanoseconds: 1_000_000_000)
        print("Consumer \(id) consumed value produce by producer \(value)")
    }
    print("Consumer \(id) terminated")  // Only one consumer is stopped
}

/// Spawns `count` consumers
func consume(channel: AsyncChannel<Int>, count: Int) async {
    await withTaskGroup(of: Void.self) { group in
        for id in 0..<count {
            group.addTask {
                await consumer(channel: channel, id: id)
            }
        }
    }
}

@main
enum App {
    static func main() async throws {
        let channel = AsyncChannel<Int>()
        
        async let p: Void = produce(channel: channel, count: 4)
        async let c: Void = consume(channel: channel, count: 4)
        
        _ = await (p, c)
    }
}

`AsyncChannel` crash when cancelled with only one awaiting

Hi,

AsyncChannel can crash in the following context:

  • start an iteration in a Task
  • cancel the task while the AsyncChannel is being awaited
  • send a value in the channel

Here is a test that crashes:

func test_cancel_when_only_one_iterator_while_waiting() async throws {
  let channel = AsyncChannel<Int>()

  let task = Task {
    for await _ in channel {}
  }

  try await Task.sleep(nanoseconds: 1_000_000_000) // ensures the loop is waiting for a next value

  task.cancel() // sets the inner state of the channel to awaiting but with 0 nexts

  await channel.send(1) // tries to remove the first element of the awaiting state (which is empty) -> crash
}

linked to #143

Deadlock in Task.select when using clock's sleep

I'm using Swift dev snapshot 2022-03-02 with Xcode 13.3.

By making a small tweak to one of the Task.select tests I am able to produce a deadlock:

 let firstValue = await Task.select(Task {
+  try! await Task.sleep(until: .now + .seconds(1), clock: .continuous)
   return 1
 }, Task {
   try! await Task.sleep(until: .now + .seconds(2), clock: .continuous)
   return 2
 }).value

It's worth noting that it does not deadlock if you use Task.sleep without a clock:

let firstValue = await Task.select(Task {
  try! await Task.sleep(nanoseconds: NSEC_PER_SEC)
  return 1
}, Task {
  try! await Task.sleep(nanoseconds: 2*NSEC_PER_SEC)
  return 2
}).value

Is there something about clocks and sleeping that is causing this? I see the same behavior with suspending clocks too.

Subject equivalent?

We need a way to imperatively pipe events into a AsyncSequence same asi Subjects from Combine did, or Kotlin's MutableState/SharedFlow

AsynchChannel feels like a low level primitive to be used for state tracking the way CurrentValueSubject was

Deadlines for channels

This is (probably?) not possible to implement now, but it would be awesome if we could add deadlines to Channels like we did on Venice (backed by libdill). Like the test excerpt below:

    func testDoubleSendTimeout() throws {
        let channel = try Channel<Int>()

        let coroutine1 = try Coroutine {
            XCTAssertThrowsError(
                try channel.send(111, deadline: 50.milliseconds.fromNow()),
                error: VeniceError.deadlineReached
            )
        }

        let coroutine2 = try Coroutine {
            XCTAssertThrowsError(
                try channel.send(222, deadline: 50.milliseconds.fromNow()),
                error: VeniceError.deadlineReached
            )
        }

        try Coroutine.wakeUp(100.milliseconds.fromNow())

        let coroutine3 = try Coroutine {
            try channel.send(333, deadline: .never)
        }

        XCTAssertEqual(try channel.receive(deadline: .never), 333)

        coroutine1.cancel()
        coroutine2.cancel()
        coroutine3.cancel()
    }

If not possible right now, what would we need in place to make it happen?

Should channel operations automatically check for cancelation?

libdill's implementation of structured concurrency fails on channel operations when a coroutine (analogous to a Task in Swift) is canceled. I think it might be good practice to do the same with Swift's structured concurrency. This might be true not just for channels, but any other operation where it makes sense in the family of async libraries? In fact, that's what libdill does. It even fails on channel creation. I know there are concerns about "The proliferation of try and now await", but if we really want to work within cooperative scheduling, we need to be sure developers are actually "cooperating". I'm guilty of this myself, most of the times I simply forget to use try Task.checkCancelation. Maybe this is a broader discussion that could be moved to the Swift forums. Let me know what you think.

If not the first-party API, we should at least have overloads that do that for us (I'm not too happy about this approach, though).

This:

public final class AsyncChannel {
    ...
    
    func send(_ element: Element) async throws {
        try Task.checkCancellation()
        await send(element)
    }
    
    ...  
}

or this:

extension AsyncChannel {
    /// This overload checks for cancelation...
    @_disfavoredOverload
    func send(_ element: Element) async throws {
        try Task.checkCancellation()
        await send(element)
    }
}

A throttled `AsyncSequence` doesn't yield its parent sequence's final value

Not sure if this is expected behavior.

I make an AsyncStream called stream, and a throttled sequence from that stream throttledStream. I yield values into stream for a while, and eventually call finish() on its continuation. All the while in the background I am iterating through throttledStream - when finish() gets called on stream, my iteration through throttledStream finishes, but I don't get the final value that was sent to stream.

Sorry this is kind of a complicated way to explain the issue, but I will make a reproducible example project if this behavior sounds like it is indeed not intended!

Send and receive only channels

An important ability when passing around channels is to limit sending/receiving by passing a send-only or a receive-only channel. Right now, it is possible to erase receiving, since AsyncChannel is just an AsyncSequence. However, there's currently no mechanism to create a send-only version of a channel. The simplest solution I think is to create a wrapper type called SendingChannel or something. I think this is a good reason to rename AsyncChannel to just Channel (#47). Otherwise we would have SendingAsyncChannel, which is not too bad, but I really think the Async part doesn't help us much.

`zip` is not Rx compatible

    func test() async throws {
        let now = ContinuousClock.now

        let countUpStream = AsyncTimerSequence(interval: .seconds(0.1), clock: .continuous)
            .map { now.duration(to: $0).components.seconds }
        let countUpStream2 = AsyncTimerSequence(interval: .seconds(0.3), clock: .continuous)
            .map { now.duration(to: $0).components.seconds }

        let zipped = zip(countUpStream, countUpStream2)

        for try await value in zipped {
            print("===> zip", value)
        }
    }

Result:

===> zip (0, 0)
===> zip (0, 0)
===> zip (0, 0)
===> zip (0, 1)
===> zip (1, 1)
===> zip (1, 1)
===> zip (1, 2)
===> zip (2, 2)
===> zip (2, 2)
===> zip (2, 3)
===> zip (3, 3)
===> zip (3, 3)
===> zip (3, 3)
===> zip (3, 4)
===> zip (4, 4)
===> zip (4, 4)
===> zip (4, 5)
===> zip (5, 5)
===> zip (5, 5)
===> zip (5, 6)
===> zip (6, 6)
===> zip (6, 6)
===> zip (6, 6)
===> zip (6, 7)
===> zip (7, 7)
===> zip (7, 7)
===> zip (7, 8)

To Be:

===> zip (0, 0)
===> zip (1, 1)
===> zip (2, 2)
...

Iterating `combineLatest`, `merge`, `zip` and `debounce` when Task is already cancelled

Version 0.0.4 added a preconditionFailure() to combineLatest, merge, zip and debounce that prevents iteration if the task is already cancelled.

The following pattern reproduces the issue in each of the operators:

let t = Task {
  try? await Task.sleep(nanoseconds: 1_000_000_000)
  let c1 = AsyncChannel<Int>()
  let c2 = AsyncChannel<Int>()
  for await (v1, v2) in combineLatest(c1, c2) {
    print(v1, v2)
  }
}
t.cancel()

The failure occurs within each operators version of func cancelled():

  mutating func cancelled() -> CancelledAction? {
    switch self.state {
    case .initial:
      preconditionFailure("Internal inconsistency current state \(self.state) and received cancelled()")

Is this intentional? The workaround is a tedious manual check before any iteration...

if !Task.isCancelled {
  for await v1 in merge(c1, c2) {
    print(v1)
  }
}

[Merge] Improve conception

Hi @phausler, I see an attempt of avoiding creating too much tasks and share Iterators between them in the merge algorithm #185

Although it might work well, the resulting code is pretty complex.

I feel we could leverage async channels as a way of merging elements from several tasks that would consume the upstreams. It would simplify a lot the code. I have a working implementation on a branch that also handles rethrowing. I will open a PR tomorrow to suggest my implementation if you are willing to take a look.

thanks.

`Sequence`-based implementation of combining algorithms in addition to variadic ones

Existing implementations of combineLatest , zip , merge are great when the total number of sequences is known at compile time (2 or 3 without variadic generics). In some scenarios, these sequences have same element types, but the total number of sequences is computed at run time. Existing implementations won't work here, and I think the library would benefit from implementations that take a Sequence (or some other container protocol) of AsyncSequences that have the same element type to cover these use cases.

Archive failures when included in framework built with BUILD_LIBRARY_FOR_DISTRIBUTION=YES

My framework fails to archive successfully when this library is included and I have the library evolution flag of BUILD_LIBRARY_FOR_DISTRIBUTION=YES

Is this expected? Is it desired to have this library patched such that library evolution archives succeed? Specifically, I get the following errors:

swift-async-algorithms/Sources/AsyncAlgorithms/AsyncChunkedByGroupSequence.swift:108:10: error: 'let' property 'base' may not be initialized directly; use "self.init(...)" or "self = ..." instead
    self.base = base
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncChunkedByGroupSequence.swift:109:10: error: 'let' property 'grouping' may not be initialized directly; use "self.init(...)" or "self = ..." instead
    self.grouping = grouping
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncChunkedOnProjectionSequence.swift:89:10: error: 'let' property 'base' may not be initialized directly; use "self.init(...)" or "self = ..." instead
    self.base = base
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncChunkedOnProjectionSequence.swift:90:10: error: 'let' property 'projection' may not be initialized directly; use "self.init(...)" or "self = ..." instead
    self.projection = projection
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncChunksOfCountSequence.swift:74:10: error: 'let' property 'base' may not be initialized directly; use "self.init(...)" or "self = ..." instead
    self.base = base
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncChunksOfCountSequence.swift:75:10: error: 'let' property 'count' may not be initialized directly; use "self.init(...)" or "self = ..." instead
    self.count = count
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncInterspersedSequence.swift:38:10: error: 'let' property 'base' may not be initialized directly; use "self.init(...)" or "self = ..." instead
    self.base = base
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncInterspersedSequence.swift:39:10: error: 'let' property 'separator' may not be initialized directly; use "self.init(...)" or "self = ..." instead
    self.separator = separator
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncInterspersedSequence.swift:67:12: error: 'let' property 'separator' may not be initialized directly; use "self.init(...)" or "self = ..." instead
      self.separator = separator
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncJoinedBySeparatorSequence.swift:133:10: error: 'let' property 'base' may not be initialized directly; use "self.init(...)" or "self = ..." instead
    self.base = base
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncJoinedBySeparatorSequence.swift:134:10: error: 'let' property 'separator' may not be initialized directly; use "self.init(...)" or "self = ..." instead
    self.separator = separator
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncRemoveDuplicatesSequence.swift:52:12: error: 'let' property 'predicate' may not be initialized directly; use "self.init(...)" or "self = ..." instead
      self.predicate = predicate
swift-async-algorithms/Sources/AsyncAlgorithms/AsyncRemoveDuplicatesSequence.swift:111:12: error: 'let' property 'predicate' may not be initialized directly; use "self.init(...)" or "self = ..." instead
      self.predicate = predicate

[AsyncChannel] Question: is task cancellation in sending operations too violent?

Hi @phausler

I was wondering if the task cancellation when sending an element in an AsyncChannel was not a bit too violent ?

This is the code when sending an element:

func _send(_ element: Element) async {
  await withTaskCancellationHandler {
    terminateAll()
  } operation: { ... }

the terminateAll() function will resume all the suspended operations (producers and consumers). What if the sending operations are performed in several Tasks? it means the async channel is finished for every one of them, which seem a bit violent right ?

Tests fail with the iOS simulator

The unit tests fail when being run on the iOS simulator. I think we already had this conversation and it might be related to the fact that the iOS simulator cannot spawn concurrent tasks because of the numbers of core.

@phausler As you are planning a v1.0 soon, I was wondering if we should make sure the tests are OK (if this is even possible) on every platform ?

[Question `AsyncThrowingSequence`] Should failure resume and fail all producers/consumers

Hi,

We’ve recently merged a PR #152 where the finish operation instantly resumes all the pending producers and consumers (with a nil value), thus ending all the iterators.

I was wondering if we should have the same behaviour for failures. As it is a terminal event, we have the same kind of risk for infinite suspensions. I guess when sending a failure, all the pending operations should resume instantly. All the awaiting consumers should receive the failure and the future iterations should also resume instantly.

@phausler what do you think ?

The fail(_:) function would also be not async. It makes sense since, as a terminal event, no back pressure should be applied.

Sendable conformance on _AsyncBytesBuffer.Storage allows unsafe concurrent access

In AsyncBufferedByteIterator.swift, _AsyncBytesBuffer.Storage (a class) conforms to Sendable, allowing it to be passed across concurrency boundaries. Since concurrent access is allowed, classes conforming to Sendable are responsible for implementing internal synchronization, but Storage doesn't have any.

This means that it's possible to construct data races on the buffer of an _AsyncBytesBuffer.Storage (through copies of an AsyncBufferedByteIterator). (See this gist for an example.) Here's what happens:

  1. Multiple instances of _AsyncBytesBuffer value can share a single Storage. If the multiple instances are read from on separate threads, then their readFunctions may be called concurrently, with the same pointer argument. Since the job of the readFunction is to fill in the pointee bytes, there is a write/write data race on those bytes

  2. Similarly, one copy may be reading from the Storage while another is calling its readFunction, producing a read/write data race on the bytes

`Task.select` loser-tasks cancellation

Current TaskSelect.swift impl does not seem to support loser-tasks cancellation.

For example:

final class TestTaskSelect: XCTestCase {
  func test_first() async throws {
    let firstValue = try await Task.select(Task { // Winner task
      return 1
    }, Task { // Loser task
      try await Task.sleep(until: .now + .seconds(2), clock: .continuous)

      fatalError("Should not reach here") // ADDED

      return 2
    }).value

    try await Task.sleep(/* wait for both completion & cancellation */) // ADDED

    XCTAssertEqual(firstValue, 1)
  }
}

(Modified from original test code)
When first task is completed, second task should be automatically cancelled, so fatalError should not be reached.

Is this an intended impl or considered a bug?

Tools for testing Swift concurrency

A couple of weeks ago I brought up on the forums that it's basically impossible to write reliable tests for concurrent code: https://forums.swift.org/t/reliably-testing-code-that-adopts-swift-concurrency/57304

This repo has a bunch of internal code that aids in testing isolated async code, e.g. its sequences, in predictable, reproducible, and fast ways. This includes custom clocks, executors, and more.

Tools like these would ideally be more widely available to users that want to test async code, and should be extracted from this repo. They still won't make it possible to test async code that is threaded between actors, but hopefully this issue is also on Apple's radar, and a solution is in the works.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.