Git Product home page Git Product logo

rabbitmq-nio's Introduction

RabbitMQNIO

Platform macOS | Linux Swift 5.7

A Swift implementation of AMQP 0.9.1 protocol: decoder + encoder (AMQPProtocol) and non-blocking client (AMQPClient).

Heavy inspired by projects: amq-protocol (https://github.com/cloudamqp/amq-protocol.cr) and amqp-client (https://github.com/cloudamqp/amqp-client.cr).

Swift-NIO related code is based on other NIO projects like:

State of the project

!!! WARNING !!!
This project is in alpha stage and still under development - API can change in the near future before 1.0 release.
Please do extensive tests of Your use case before using it on production!
Nevertheless, current client operations are tested and appears to be stable so do not be afraid to use it.
Please report bugs or missing features.
!!! WARNING !!!

AMQPProtocol library currently should cover all of AMQP 0.9.1 specification.

AMQPClient library's architecture using NIO Channels is already done and all of AMQP operations (without WebSockets) should be supported. Current work is focused on testing, finding bugs, API stabilization and code refactoring / polishing (based on Swift Server Side Community feedback).

Basic usage

Create a connection and connect to the AMQP broker using connection string.

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)

var connection: AMQPConnection

do {
    connection = try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(url: "amqp://guest:guest@localhost:5672/%2f"))

    print("Succesfully connected")
} catch {
    print("Error while connecting", error)
}

Create a connection and connect to the AMQP broker using configuration object.

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)

var connection: AMQPConnection

do {
    connection = try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(connection: .plain, server: .init()))

    print("Succesfully connected")
} catch {
    print("Error while connecting", error)
}

Open a channel.

var channel: AMQPChannel
 
do {
    channel = try await connection.openChannel()

    print("Succesfully opened a channel")
} catch {
    print("Error while opening a channel", error)
}

Declare a queue.

do {
    try await channel.queueDeclare(name: "test", durable: false)

    print("Succesfully created queue")
} catch {
    print("Error while creating queue", error)
}

Publish a message to queue.

do {
    let deliveryTag = try await channel.basicPublish(
        from: ByteBuffer(string: "{}"),
        exchange: "",
        routingKey: "test"
    )

    print("Succesfully publish a message")
} catch {
    print("Error while publishing a message", error)
}

Consume a single message.

do {
    guard let msg = try await channel.basicGet(queue: "test") else {
        print("No message currently available")
        return
    }

    print("Succesfully consumed a message", msg)
} catch {
    print("Error while consuming a message", error)
}

Set a QOS limit to prevent memory overflow of consumer.

try await channel.basicQos(count: 1000)

Consume a multiple message as AsyncThrowingStream.

do {
    let consumer = try await channel.basicConsume(queue: "test")

    for try await msg in consumer {
        print("Succesfully consumed a message", msg)
        break
    }
} catch {
    print("Delivery failure", error)
}

Consumer will be automatically cancelled on deinitialization. Can be also manually cancelled.

try await channel.basicCancel(consumerTag: consumer.name)

Close a channel, connection.

do {
    try await channel.close()
    try await connection.close()

    print("Succesfully closed", msg)
} catch {
    print("Error while closing", error)
}

Connection recovery patterns.

Handling broker closing channel or connection disconnects. Connection to AMQP broker is sustained by answering to heartbeat messages, however on network problem or broker restart connection can be broken. Broker can also close channel or connection on bad command or other error. Currently RabbitMQNIO do not support any connection nor channel recovery / re-connect mechanism so clients have to handle it manually. After connection is interrupted all of channels created by it and connection itself must be re-created manually.

Example recovery patterns.

Checking channel and connection state (safe channel pattern - wrap standard connection and channel in a class and reuse channel before for ex. any produce operation).

@available(macOS 12.0, *)
class SimpleSafeConnection {
    private let eventLoop: EventLoop
    private let config: AMQPConnectionConfiguration

    private var channel: AMQPChannel?
    private var connection: AMQPConnection?

    init(eventLoop: EventLoop, config: AMQPConnectionConfiguration) {
        self.eventLoop = eventLoop
        self.config = config
    }
    
    func reuseChannel() async throws -> AMQPChannel {
        guard let channel = self.channel, channel.isOpen else {
            if self.connection == nil || self.connection!.isConnected {
                self.connection = try await AMQPConnection.connect(use: self.eventLoop, from: self.config)
            }

            self.channel = try await connection!.openChannel()
            return self.channel!
        }
        return channel
    }
}

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let connection = SimpleSafeConnection(eventLoop: eventLoopGroup.next(), config: .init(connection: .plain, server: .init()))

while(true) {
    let deliveryTag = try await connection.reuseChannel().basicPublish(
        from: ByteBuffer(string: "{}"),
        exchange: "",
        routingKey: "test"
    )
}

Handling channel and connection close errors (simple retry pattern - re-create channel or connection when errors occurs).

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var connection =  try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(connection: .plain, server: .init()))
var channel = try await connection.openChannel()

for _ in 0..<3 {
    do {
        let deliveryTag = try await channel.basicPublish(
            from: ByteBuffer(string: "{}"),
            exchange: "",
            routingKey: "test"
        )
        break
    } catch AMQPConnectionError.channelClosed {
        do {
            channel = try await connection.openChannel()
        } catch AMQPConnectionError.connectionClosed {
            connection = try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(connection: .plain, server: .init()))
            channel = try await connection.openChannel()
        }
    } catch {
        print("Unknown problem", error)
    }
}

Above recovery patterns can be mixed together.

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let connection = SimpleSafeConnection(eventLoop: eventLoopGroup.next(), config: .init(connection: .plain, server: .init()))
var channel = try await connection.reuseChannel()

for _ in 0..<3 {
    do {
        let deliveryTag = try await channel.basicPublish(
            from: ByteBuffer(string: "{}"),
            exchange: "",
            routingKey: "test"
        )
        break
    } catch AMQPConnectionError.channelClosed {
        channel = try await connection.reuseChannel()
    } catch {
        print("Unknown problem", error)
    }
}

rabbitmq-nio's People

Contributors

funcmike avatar hasnat avatar sliemeobn avatar stevapple avatar xtremekforever avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-nio's Issues

Add descriptions to errors

AMQPConnectionError and ProtocolError should implement descriptions so users know what happened without going through the source code.

Also, there is the whole "public enum" problem of technically breaking API when adding a case.
Maybe we'll have @extensible enums before this hits 1.0 ; )

EventLoop aware APIs

Currently you have APIs that return EventLoopFutures. It is very hard for users to anticipate on which EL those EventLoopFutures will be running. Because of this users who are deep into NIO will need to .hop(to:) after each call. This is a burden that we shouldn't put on users...

Code example:
https://github.com/funcmike/rabbitmq-nio/blob/poc/Sources/AMQPClient/AMQPChannel.swift#L83-L94

Instead the function should take an optional EventLoop parameter. If an EventLoop is passed, the returned EventLoopFuture must execute on the provided EL. If none is passed the ELF should always run on the connections EL.

basicConsume listener (possible message lost)

Currently basicConsume listener is registered just after receiving ConsumeOk frame (with consumerTag) from broker.
It's theoretically possible to miss some messages before listener is added (when NIO channel is handled by different thread).

Build error on alpha release

I've tried to update my package dependency to point at the 0.1.0 release, but I'm now getting an error from Xcode: the target 'AMQPClient' in product 'AMQPClient' contains unsafe build flags which seems to be due to versioned packages not allowing unsafe flags (at least if this discussion is correct: https://forums.swift.org/t/question-about-contains-unsafe-build-flags-in-spm/47677/4 )

It works if I stay pointed at main

That is:

.package(url: "https://github.com/funcmike/rabbitmq-nio.git", branch: "main"),

instead of

.package(url: "https://github.com/funcmike/rabbitmq-nio.git", from: "0.1.0-alpha"),

Large payloads are not supported

Current implementation does not handle payloads larger than max frame size correctly.

for large payloads:
the sending part causes a frame_too_large error on rabbitmq
the receiving part does not collect multiple frames into one message (resulting in an incomplete payload)

Refactor Protocol Enums rawValue

From

   public enum Kind {
        case method
        case header
        case body
        case heartbeat

        public init?(rawValue: UInt8)
        {
            switch rawValue {
            case 1:
                self = .method
            case 2:
                self = .header
            case 3:
                self = .body
            case 8:
                self = .heartbeat
            default:
                return nil
            }
        }

        public var rawValue: UInt8 {
            switch self {
            case .method:
                return 1
            case .header:
                return 2
            case .body:
                return 3
            case .heartbeat:
                return 8
            }
        }
    }
}

To

public enum Kind: UInt8 {
        case method = 1
        case header = 2
        case body = 3
        case heartbeat = 8
}

Improve AsyncStream usage

Currently basicCancel() needs to be called explicitly when iteration has finished - it can be done automatically on consumer deinit.

for await msg in consumer {
    guard case .success(let delivery) = msg else {
        print("Delivery failure", msg)
        return
    }

    print("Succesfully consumed a message", delivery)
    break
}

try await channel.basicCancel(consumerTag: consumer.name) // users should not have to call this. this should be automatic once you leave the iteration

Refactor Channel Handling

New handler proposal

protocol AMPQChannelHandlerParent {
  func write(payload: Payload, promise: EventLoopPromise<Void>?)
}

final class AMPQFrameHandler {
  var channels: [ChannelID: AMPQChannelHandler<Self>]
  var context: ChannelHandlerContext!

  func handlerAdded(context: ChannelHandlerContext) {
    self.context = context
  }

  func handlerRemoved(context: ChannelHandlerContext) {
    self.context = nil
  }

  func openChannel() -> EventLoopFuture<AMPQChannelHandler<Self>> {
    let nextChannelID = ...
    // sendMessage to create channel
    // when done create AMPQChannelHandler with ChannelID
  }
}

extension AMPQFrameHandler: AMPQChannelHandlerParent {
  func write(frame: Frame, promise: EventLoopPromise<Void>?) {
    // write into the channel pipeline using self.context
  }
}

final class AMPQChannelHandler<Parent: AMPQChannelHandlerParent> {
  let parent: Parent
  let eventLoop: EventLoop

  init(parent: Parent, channelID: ChannelID, eventLoop: EventLoop) {
    self.parent = parent
  }

  func send(payload: Payload) -> EventLoopFuture<Response> {
    let frame = Frame(channelID: self.channelID, payload: Payload)
    self.parent.write(frame: frame)
  }

  func receive(payload: Payload) {

  }
}

Less boilerplate decoding of integers in bytebuffer

Add extension

https://github.com/vapor/postgres-nio/blob/08c0dc590f4e149857d99dc91be4da342444dece/Sources/PostgresNIO/Utilities/NIOUtils.swift#L5

From

  guard let classID = buffer.readInteger(as: UInt16.self) else {
      throw ProtocolError.decode(type: UInt16.self, context: self)
  }

  guard let kind = Kind(rawValue: classID) else {
      throw ProtocolError.unsupported(value: classID, context: self)
  }

To

  guard let classID = buffer.readInteger(as: Kind.self) else {
      throw ProtocolError.decode(type: Kind.self, context: self)
  }

Support distributed tracing

I added a tiny wrapper around this library to participate in distributed tracing, works like a charm with nodejs services (communicating with swift services through rabbitmq.)

Ideally, this can come out of the box. We'd either need some sort of config for it or maybe a middleware system?

Here is what I use:

struct AMQPHeaderPropagation: Extractor, Injector {
    typealias Carrier = Table?

    func extract(key: String, from carrier: Carrier) -> String? { carrier?[key]?.asString }

    func inject(_ value: String, forKey key: String, into carrier: inout Carrier) {
        carrier = carrier ?? Table()
        carrier![key] = .longString(value)
    }
}

private extension Field {
    var asString: String? {
        switch self {
        case let .longString(s): return s
        // NOTE: maybe support more data types
        default: return nil
        }
    }
}

extension ServiceContext {
    var asAqmpHeaders: Table? {
        var headers: Table?
        InstrumentationSystem.instrument.inject(self, into: &headers, using: AMQPHeaderPropagation())
        return headers
    }
}

extension AMQPResponse.Channel.Message.Delivery {
    var serviceContext: ServiceContext {
        var context = ServiceContext.topLevel
        InstrumentationSystem.instrument.extract(properties.headers, into: &context, using: AMQPHeaderPropagation())
        return context
    }
}

More robust handling of unexpected payloads

Currently unexpected payloads sent by the broker will result in either a crash (precondition) or undefined behavior, because it is basically ignored.

All preconditions that can actually occur should rather cause an error and take the channel down cleanly (eg: a valid frame from the broker that is unexpected).

On the other hand, valid broker response frames that arrive without a matching requests are ignored - or worse, just forwarded up the chain to a mismatching request - only to be then converted to an error (outside of the channel handling pipeline).

Ideally, the channel pipeline understands these mismatches and also cleanly takes the channel down with an error.

Mark types as `Sendable`

I highly recommend marking the types that can be accessed from different threads as Sendable. You only support Swift 5.7+ where Sendable works quite well.

This way you would have noticed that this lock is not needed:

private let lock = NIOLock()

structs that contain only sendable types are sendable as well. The sensibility is enforced through the copy semantics that structs have.

AMQPClient isConnect concurrency issue

There is no guarantee to run code below in correct order.

self.connection = connection
self.isConnect.store(false, ordering: .relaxed)

Less fragile design is to just use locks and switch to state enum (proposed by @fabianfett )

    enum State {
        case connecting
        case connected(AMQPConnection)
        ...
    }

Refactor Protocol Frame Enum to Struct with channelID property

From

    public typealias ChannelID = UInt16

    case method(ChannelID, Method)
    case header(ChannelID, Header)
    case body(ChannelID, body: ByteBuffer)
    case heartbeat(ChannelID)
}

To

public struct Frame: PayloadDecodable, PayloadEncodable {
    var channelID: ChannelID

    var payload: Payload

    enum Payload {
        case method(Method)
        case header(Header)
        case body(ByteBuffer)
        case heartbeat
    }
}

Closing the channel after cancelling an AMQPSequence (consume) sometimes throws an .invalidResponse error

I created a reproducer test here, sometimes it works, sometimes it fails - I added a loop around to force it

https://github.com/sliemeobn/rabbitmq-nio/blob/1a9b6901d1dd8381416172115423989c61fe7646/Tests/AMQPClientTests/AMQPChannelTest.swift#L322-L345

My theory is that there the cancellation and the channel-close responses get mixed up, at least I see that a Basic.cancel response ends up in here:

.flatMapThrowing { response in
guard case .channel(let channel) = response, case .closed = channel else {
throw AMQPConnectionError.invalidResponse(response)
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.