Git Product home page Git Product logo

async-kit's Introduction

Vapor

Documentation Team Chat MIT License Continuous Integration Code Coverage Swift 5.7+ Mastodon


Vapor is an HTTP web framework for Swift. It provides a beautifully expressive and easy-to-use foundation for your next website, API, or cloud project.

Take a look at some of the awesome stuff created with Vapor.

💧 Community

Join the welcoming community of fellow Vapor developers on Discord.

🚀 Contributing

To contribute a feature or idea to Vapor, create an issue explaining your idea or bring it up on Discord.

If you find a bug, please create an issue.

If you find a security vulnerability, please contact [email protected] as soon as possible.

💛 Sponsors

Support Vapor's development by becoming a sponsor.

Broken Hands Emerge Tools Jari Donut Dane MacStadium

💚 Backers

Support Vapor's development by becoming a backer.

Moritz LangMaarten EngelsThomas KrajacicJesse TiptonSteve HumeMikkel UlstrupGeoffrey FosterPaul SchmiedmayerScott RobbinsSven A. SchmidtSpencer CurtisZach RausnitzTim „Timinator“ KretzschmarKlaasAndrew Edwards+Li, Inc.Stijn WillemsKyle NewsomeVia Aurelia SolutionsJakub KiermaszBrian DrellingMattes MohrJamieGalen RhodesLitmapsDavid RomanBrian StrobachKishikawa KatsumiAlex SherbakovSidetrackGreg KarpatiFrantišek MikšJeremy GreenwoodRay FixMićo MiloložaAlanJonas SannewaldTapEnvy.us, LLCJawadPARAIPAN SORINKalyn DavisYR ChenAarón Martínez Cuevas

async-kit's People

Contributors

0xtim avatar bennydebock avatar calebkleveter avatar gwynne avatar jaapwijnen avatar kemchenj avatar mahdibm avatar makleso6 avatar mordil avatar mrlotu avatar mrmage avatar niazoff avatar rjhancock avatar sevki avatar tanner0101 avatar vzsg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

async-kit's Issues

[Proposal] `whenAllComplete` Method to Flatten Array of Futures Without Failure

I've seen this requested a few times in the Vapor Discord, and I've wanted this myself for awhile (before I wrote a local implementation).

As it stands

Vapor's Core extension of flatten and syncFlatten takes an array of Future<Void> and flattens them into a single Future<Void> that either succeeds when all Futures in the array succeed, or fails as soon as one fails.

That is useful on its own, but some times there are cases where you need to batch run tasks (like make multiple API requests based on an array of parameters) that you then want to handle each case individually and independently.

My exact use case is where I am synchronizing a database cache of Slack user data, and I need to make 100+ individual profile requests, but I only care about:

  1. Each individual request response, to save the appropriate data to the database
  2. When all of the requests have completed, even if they failed.

Proposal

  1. Introduce a whenAllComplete method to EventLoop that always resolves when all of the EventLoopFutures in a collection complete (failure or success).
    • The resolved value of the new EventLoopFuture will be [Result<T, Error>]
    • The order of the Result values will be the same as the collection input
  2. Provide an overload of whenAllComplete that ignores the results of the collection of futures and returns a new EventLoopFuture that resolves Void. (A notification future)
extension EventLoop {
    public func whenAllComplete<T>(_ futures: [EventLoopFuture<T>]) -> EventLoopFuture<[Result<T, Error>]>

    public func whenAllComplete<T>(_ futures: [EventLoopFuture<T>]) -> EventLoopFuture<Void>
}

thread-specific userInfo on event loop

Currently there is no way to store arbitrary data on an event loop. It has been recommended previously to use a ThreadSpecificVariable instead. While this works, the purpose of the code is arguably less clear. I think it could be nice to implement a userInfo (or similar) data storage on EventLoop backed by a ThreadSpecificVariable. If at some point NIO decides to add such storage to the event loop, this could be removed.

extension EventLoop {
    var userInfo: [AnyHashable: Any] { 
        get { ... }
        set { ... }
    }
}

[Proposal] Future.nonExistant(or:)

In Core there is an extension to Future<OptionalType> adding an unwrap(or:) method that either returns the value if it's not nil or throws the error provided to the function.

This is really useful, but there is no method to do this the other way around. For example, if you're working with a database where a certain key is unique you might not want to rely on the datbase to error, but catch it yourself beforehand with some code like this:

User.query(on: req).filter(\.email, .ilike, providedEmail).first().map { optionalUser in
    guard optionalUser == nil else {
        throw Abort(.badRequest, "user with this email already exists")
    }
    // Continue ...
}

Proposal

Add a nonExistant(or:) function to Future<OptionalType> to allow for flows like this:

User.query(on: req)
    .filter(\.email, .ilike, providedEmail)
    .first()
    .nonExistant(or: Abort(.badRequest, "user with this email already exists"))
    .map {
        // Continue ...
    }

add iOS platform in Package.swift

I am using Vapor on iOS and needs to add iOS platform in Package.swift

let package = Package(
    name: "async-kit",
    platforms: [
       .macOS(.v10_15),
       .iOS(.v11)
    ],
    products: [
        .library(name: "AsyncKit", targets: ["AsyncKit"]),
    ],
    dependencies: [
        .package(url: "https://github.com/apple/swift-nio.git", from: "2.10.0"),
        .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
    ],
    targets: [
        .target(name: "AsyncKit", dependencies: [
            .product(name: "Logging", package: "swift-log"),
            .product(name: "NIO", package: "swift-nio"),
        ]),
        .testTarget(name: "AsyncKitTests", dependencies: [
            .target(name: "AsyncKit")
        ]),
    ]
)

Optional Map

Add optionalMap and optionalFlatMap methods to EventLoopFuture where the callback is only fired if the optional value in the future is not nil:

let id: Future<Int?> = ...
id.optionalMap { id in
    type(of: id) // Int
}

NIOTestCase

Consider adding an XCTestCase subclass to make the following boilerplate easier:

class MyTest: XCTestCase {
    private var group: EventLoopGroup!
    private var eventLoop: EventLoop {
        return self.group.next()
    }

    func setUp() {
        self.group = MultiThreadedEventLoopGroup(numberOfThreads: ...)
    }

    func tearDown() {
        XCTAssertNoThrow(self.group.syncShutdownGracefully())
        self.group = nil
    }

   [...]

I think this could be possible with the #if canImport(XCTest), but I'm not sure. We should look into it.

Permanent connection deadlock on many concurrent requests

Describe the bug

We are seeing a failure where a large number of concurrent requests (relative to the size of the server) can permanently deadlock the eventloop for any future requests coming in to the server. The connection pool is exhausted for some requests as expected, and added to the waitlist as expected, but will time out for a very simple query. The waitlist then appears to never be cleared/freed back to the pool and any future requests will always time out.

The reproducer below contains a more detailed rundown of the cloud environment we were first able to identify the issue in.

To Reproduce

I have a small reproducer here with a minimal serve/single fluent model that can be slammed with requests:
https://github.com/GNMoseke/PostgresNIODeadlockRecreator

Expected behavior

The pool should begin properly handling future requests once requests time out and the pool is capable of handling them again.

Environment

  • Vapor Framework version: 4.67.4
  • OS version: ubuntu 20.05
    See reproducer for full cloud environment details

connection pool circular buffer

#20 added a generic connection pool to this package. An implementation detail of this connection pool is that waiters are served FILO. This shouldn't cause any correctness problems, but ideally waiters would be served FIFO. A CircularBuffer may help do this without any performance loss.

Sync Flatten and Sync Reduce

Two methods that would come in very handy for handling futures that need to be run one after another:

  • [() -> EventLoopFuture<T>].syncFlatten(on: EventLoop) -> EventLoopFuture<[T]>:
    Flattens out the array of EventLoopFuture if they all succeed, similar to [EventLoopFuture<T>].flatten(on:), but instead each future from a closure must succeed before the next closure is called and the result of the future added the resulting array.

  • EventLoop.reduce(_: [() -> EventLoopFuture<T>], onSuccess: (T) throws -> ()) -> EventLoopFuture<Void>:
    Iterates over the lazy futures that are passed in. As each one succeeds, the onSuccess closure is called with the resulting value and then the next lazy future in the array is called. When all the calls succeed (or one fails), the result will be returned from the method in an EventLoopFuture<Void>.

Future Collection Mapping

Create a .mapElements method for EventLoopFuture<Sequence> where the callback receives a single element in the sequence instead of the whole sequence:

let users: Future<[User]> = ...
users.mapElements { user in
    print(user.id)
}

Renaming the variadic maps to zip

Just a thought, it would be more inline with rx

public func map<A, B, Result>(
    to result: Result.Type,
    _ futureA: Future<A>,
    _ futureB: Future<B>,
    _ callback: @escaping (A, B) throws -> (Result)
) -> Future<Result> {
    return futureA.flatMap(to: Result.self) { a in
        return futureB.map(to: Result.self) { b in
            return try callback(a, b)
        }
    }
}

vs

public func zip<A, B, Result>(
    to result: Result.Type,
    _ futureA: Future<A>,
    _ futureB: Future<B>,
    _ callback: @escaping (A, B) throws -> (Result)
) -> Future<Result> {
    return futureA.flatMap(to: Result.self) { a in
        return futureB.map(to: Result.self) { b in
            return try callback(a, b)
        }
    }
}

linking xctest

Unfortunately I just hit the issue that I had been worrying about with #5:

screen shot 2019-02-28 at 4 41 47 pm

Not sure why we weren't able to recreate this earlier, but unless we find a workaround, we may have to remove this helper.

testPerformance hangs in release mode

testPerformance never completes in release mode: a connection pool with a maximum of two connections is created but then three connections are requested and waited on:

let connA = try! pool.requestConnection().wait()
let connB = try! pool.requestConnection().wait()
let connC = try! pool.requestConnection().wait()

Either connA or connB need to be released for the connC future to be resolved.

Steps to reproduce

  1. run testPerformance in release mode

Expected behavior

testPerformance completes.

Actual behavior

testPerformance doesn't complete.

Environment

  • NIOKit version: 199a535
  • Linux or macOS? macOS

.andAllSync

Would something like this be useful to add?

extension EventLoopFuture {
    public static func andAllSync(
        _ futures: [() -> EventLoopFuture<Void>],
        eventLoop: EventLoop
    ) -> EventLoopFuture<Void> {
        ...
    }
}

The difference to .andAll is that each future is not started until the previous one completes. This is useful for things like database migrations where you must not start the next task until the previous is completed.

Detect connection pool deadlocks

Something like this will hang the client, and in db log we get

LOG: unexpected EOF on client connection with an open transaction

Client.find(id, on: req.db).flatMap {
    /// Validate we want to make updates
    req.db.transaction { db in
        // Do stuff

I don't really want to create a transaction if I don't have to. In the case where I discovered this I'm doing multiple queries against multiple tables, and only under certain conditions do I then create a transaction and update multiple different tables.

Consider using @autoclosure for EventLoopFuture.transform with future

The current signature for the transform function on EventLoopFuture that takes a future is:

public func transform<T>(to future: EventLoopFuture<T>) -> EventLoopFuture<T>

And can be used as follows:

futureA.transform(to: makeFutureB())

To me, this reads as future b starting the work after future a has finished. However, this is not the case since future b will be created "synchronously" and therefore the work will start immediately. This could be prevented by changing the implementation of transform to:

public func transform<T>(to future: @escaping @autoclosure () -> EventLoopFuture<T>) -> EventLoopFuture<T> {
    return self.flatMap { _ in
        future()
    }
}

Are there any drawbacks I'm not seeing? Is this considered a breaking change?

generic ConnectionPool

Vapor's DatabaseKit package contains a connection pool class for managing database connections. I think it could be useful to make that class more generic and move it here. That would allow for other connections, like HTTP client connections, to easily take advantage of pooling.

Here's an idea of what this might entail:

public protocol ConnectionPoolItem {
    var isClosed: Bool { get }
}
public protocol ConnectionPoolSource {
    associatedtype Connection: ConnectionPoolItem
    var eventLoop: EventLoop { get }
    func makeConnection() -> EventLoopFuture<Connection>
}

public struct ConnectionPoolConfig {
    var maxConnections: Int
}

public final class ConnectionPool<Source> where Source: ConnectionPoolSource  {
    init(config: ConnectionPoolConfig, source: Source) { ... }
    func requestConnection() -> EventLoopFuture<Source.Connection> { ... }
    func releaseConnection(_: Source.Connection)
}

extension ConnectionPool {
     func withConnection(_: (Source.Connection) -> EventLoopFuture<Void>) -> EventLoopFuture<Void> { ... }
}

Feature request: convert DatabaseConnectionPool into a protocol

I am currently using a custom database connection pool (https://gist.github.com/MrMage/8972472ee73ab7b7d506ffb1c2b6e42e). If the official DatabaseConnectionPoool was implemented as a protocol, I could more easily swap the regular one out with my own implementation.

The only problem might be associated type requirements for that protocol (currently the class has a generic Database type argument), but I think that could be worked around.

pool: replacing closed connections

The connection pool currently replaces closed available connections by replacing them when they are requested. This means that once a pool's active connection count increases, it will never decrease.

It would be better to check the available connections list first for any open connections before attempting to open a new replacement.

tryFuture executes closure on caller's thread

It seems the closure passed to tryFuture is executed on caller's thread.

According to its comment:

/// This method replaces this code:
///
/// ```swift
/// return something.eventLoop.future().flatMapThrowing {
/// ```
///
/// With this code:
///
/// ```swift
/// return something.eventLoop.tryFuture {
/// ```

/// This method replaces this code:
///
/// ```swift
/// return something.eventLoop.future().flatMapThrowing {
/// ```
///
/// With this code:
///
/// ```swift
/// return something.eventLoop.tryFuture {
/// ```

It surely replaces, but they are not identical. Look at the following code.

print(Thread.current.name) //Optional("")
try! el.future().flatMapThrowing {
    print(Thread.current.name) // Optional("NIO-ELT-0-#0")
}.wait()
try! el.tryFuture {
    print(Thread.current.name) // Optional("")
}.wait()
try! el.submit {
    print(Thread.current.name) // Optional("NIO-ELT-0-#0")
}.wait()

el.tryFuture's closure is executed on caller's thread.
el.tryFuture and el.submit have similar signature but execution threads are different. It can confuse users.
It'll be a problem if users do blocking task in tryFuture's closure, expecting the task will be executed on eventLoop's thread.

Hang in batch `Command`

This is a follow-up of a Discord discussion, FYI @0xTim !


We're seeing the following error in one of our Vapor batch jobs every once in a while (a few times per month, maybe once a week - hard to quantify precisely):

│   [ ERROR ] Connection request timed out. This might indicate a connection deadlock in your application. If you're running long running requests, consider increasing your connection timeout. [component: server, database-id: psql]                │

When this message occurs, the job HANGS - i.e. it locks up and does not terminate, preventing all further processing.

In normal operation the job spins up and typically processes for 3-6 seconds, running a number of queries against the Github APIs (we're processing batches of 25 Swift packages, updating Github metadata in the Swift Package Index).

I've done a little research and there seem to be a couple of cases where this can happen:

  1. too many db connections
  2. an error handling problem in networking code

I suspect it's 2). Last I checked we're not exhausing our db connections and our db utilization across our two envs is <15% and <10%.

I believe this issue first started happening after moving to async/await. This may be due to how we're launching async processes from a Command via our own AsyncCommand. (We previously used a semaphore-based implementation which also encountered hangs.)

Our logs show no unusual activity around the error, although we've not yet had it happen with DEBUG logging enabled.


We're also tracking this issue here: SwiftPackageIndex/SwiftPackageIndex-Server#2227, and I'll be adding more observations as they happen there.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.