Git Product home page Git Product logo

aio-pika's Introduction

aio-pika

ReadTheDocs Coveralls Github Actions Latest Version

A wrapper around aiormq for asyncio and humans.

Check out the examples and the tutorial in the documentation.

If you are a newcomer to RabbitMQ, please start with the adopted official RabbitMQ tutorial.

Note

Since version 5.0.0 this library doesn't use pika as AMQP connector. Versions below 5.0.0 contains or requires pika's source code.

Note

The version 7.0.0 has breaking API changes, see CHANGELOG.md for migration hints.

Features

  • Completely asynchronous API.
  • Object oriented API.
  • Transparent auto-reconnects with complete state recovery with connect_robust (e.g. declared queues or exchanges, consuming state and bindings).
  • Python 3.7+ compatible.
  • For python 3.5 users, aio-pika is available via aio-pika<7.
  • Transparent publisher confirms support.
  • Transactions support.
  • Complete type-hints coverage.

Installation

pip install aio-pika

Usage example

Simple consumer:

import asyncio
import aio_pika
import aio_pika.abc


async def main(loop):
    # Connecting with the given parameters is also possible.
    # aio_pika.connect_robust(host="host", login="login", password="password")
    # You can only choose one option to create a connection, url or kw-based params.
    connection = await aio_pika.connect_robust(
        "amqp://guest:[email protected]/", loop=loop
    )

    async with connection:
        queue_name = "test_queue"

        # Creating channel
        channel: aio_pika.abc.AbstractChannel = await connection.channel()

        # Declaring queue
        queue: aio_pika.abc.AbstractQueue = await channel.declare_queue(
            queue_name,
            auto_delete=True
        )

        async with queue.iterator() as queue_iter:
            # Cancel consuming after __aexit__
            async for message in queue_iter:
                async with message.process():
                    print(message.body)

                    if queue.name in message.body.decode():
                        break


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Simple publisher:

import asyncio
import aio_pika
import aio_pika.abc


async def main(loop):
    # Explicit type annotation
    connection: aio_pika.RobustConnection = await aio_pika.connect_robust(
        "amqp://guest:[email protected]/", loop=loop
    )

    routing_key = "test_queue"

    channel: aio_pika.abc.AbstractChannel = await connection.channel()

    await channel.default_exchange.publish(
        aio_pika.Message(
            body='Hello {}'.format(routing_key).encode()
        ),
        routing_key=routing_key
    )

    await connection.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Get single message example:

import asyncio
from aio_pika import connect_robust, Message


async def main(loop):
    connection = await connect_robust(
        "amqp://guest:[email protected]/",
        loop=loop
    )

    queue_name = "test_queue"
    routing_key = "test_queue"

    # Creating channel
    channel = await connection.channel()

    # Declaring exchange
    exchange = await channel.declare_exchange('direct', auto_delete=True)

    # Declaring queue
    queue = await channel.declare_queue(queue_name, auto_delete=True)

    # Binding queue
    await queue.bind(exchange, routing_key)

    await exchange.publish(
        Message(
            bytes('Hello', 'utf-8'),
            content_type='text/plain',
            headers={'foo': 'bar'}
        ),
        routing_key
    )

    # Receiving message
    incoming_message = await queue.get(timeout=5)

    # Confirm message
    await incoming_message.ack()

    await queue.unbind(exchange, routing_key)
    await queue.delete()
    await connection.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

There are more examples and the RabbitMQ tutorial in the documentation.

See also

aiormq is a pure python AMQP client library. It is under the hood of aio-pika and might to be used when you really loving works with the protocol low level. Following examples demonstrates the user API.

Simple consumer:

import asyncio
import aiormq

async def on_message(message):
    """
    on_message doesn't necessarily have to be defined as async.
    Here it is to show that it's possible.
    """
    print(f" [x] Received message {message!r}")
    print(f"Message body is: {message.body!r}")
    print("Before sleep!")
    await asyncio.sleep(5)   # Represents async I/O operations
    print("After sleep!")

async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    # Declaring queue
    declare_ok = await channel.queue_declare('helo')
    consume_ok = await channel.basic_consume(
        declare_ok.queue, on_message, no_ack=True
    )

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()

Simple publisher:

import asyncio
from typing import Optional

import aiormq
from aiormq.abc import DeliveredMessage

MESSAGE: Optional[DeliveredMessage] = None

async def main():
    global MESSAGE
    body = b'Hello World!'

    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost//")

    # Creating a channel
    channel = await connection.channel()
    declare_ok = await channel.queue_declare("hello", auto_delete=True)

    # Sending the message
    await channel.basic_publish(body, routing_key='hello')
    print(f" [x] Sent {body}")

    MESSAGE = await channel.basic_get(declare_ok.queue)
    print(f" [x] Received message from {declare_ok.queue!r}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

assert MESSAGE is not None
assert MESSAGE.routing_key == "hello"
assert MESSAGE.body == b'Hello World!'

The patio and the patio-rabbitmq

PATIO is an acronym for Python Asynchronous Tasks for AsyncIO - an easily extensible library, for distributed task execution, like celery, only targeting asyncio as the main design approach.

patio-rabbitmq provides you with the ability to use RPC over RabbitMQ services with extremely simple implementation:

from patio import Registry, ThreadPoolExecutor
from patio_rabbitmq import RabbitMQBroker

rpc = Registry(project="patio-rabbitmq", auto_naming=False)

@rpc("sum")
def sum(*args):
    return sum(args)

async def main():
    async with ThreadPoolExecutor(rpc, max_workers=16) as executor:
        async with RabbitMQBroker(
            executor, amqp_url="amqp://guest:guest@localhost/",
        ) as broker:
            await broker.join()

And the caller side might be written like this:

import asyncio
from patio import NullExecutor, Registry
from patio_rabbitmq import RabbitMQBroker

async def main():
    async with NullExecutor(Registry(project="patio-rabbitmq")) as executor:
        async with RabbitMQBroker(
            executor, amqp_url="amqp://guest:guest@localhost/",
        ) as broker:
            print(await asyncio.gather(
                *[
                    broker.call("mul", i, i, timeout=1) for i in range(10)
                 ]
            ))

FastStream is a powerful and easy-to-use Python library for building asynchronous services that interact with event streams..

If you need no deep dive into RabbitMQ details, you can use more high-level FastStream interfaces:

from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

@broker.subscriber("user")
async def user_created(user_id: int):
    assert isinstance(user_id, int)
    return f"user-{user_id}: created"

@app.after_startup
async def pub_smth():
    assert (
        await broker.publish(1, "user", rpc=True)
    ) ==  "user-1: created"

Also, FastStream validates messages by pydantic, generates your project AsyncAPI spec, supports In-Memory testing, RPC calls, and more.

In fact, it is a high-level wrapper on top of aio-pika, so you can use both of these libraries' advantages at the same time.

Socket.IO is a transport protocol that enables real-time bidirectional event-based communication between clients (typically, though not always, web browsers) and a server. This package provides Python implementations of both, each with standard and asyncio variants.

Also this package is suitable for building messaging services over RabbitMQ via aio-pika adapter:

import socketio
from aiohttp import web

sio = socketio.AsyncServer(client_manager=socketio.AsyncAioPikaManager())
app = web.Application()
sio.attach(app)

@sio.event
async def chat_message(sid, data):
    print("message ", data)

if __name__ == '__main__':
    web.run_app(app)

And a client is able to call chat_message the following way:

import asyncio
import socketio

sio = socketio.AsyncClient()

async def main():
    await sio.connect('http://localhost:8080')
    await sio.emit('chat_message', {'response': 'my response'})

if __name__ == '__main__':
    asyncio.run(main())

Taskiq is an asynchronous distributed task queue for python. The project takes inspiration from big projects such as Celery and Dramatiq. But taskiq can send and run both the sync and async functions.

The library provides you with aio-pika broker for running tasks too.

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker()

@broker.task
async def test() -> None:
    print("nothing")

async def main():
    await broker.startup()
    await test.kiq()

With over 25 million downloads, Rasa Open Source is the most popular open source framework for building chat and voice-based AI assistants.

With Rasa, you can build contextual assistants on:

  • Facebook Messenger
  • Slack
  • Google Hangouts
  • Webex Teams
  • Microsoft Bot Framework
  • Rocket.Chat
  • Mattermost
  • Telegram
  • Twilio

Your own custom conversational channels or voice assistants as:

  • Alexa Skills
  • Google Home Actions

Rasa helps you build contextual assistants capable of having layered conversations with lots of back-and-forth. In order for a human to have a meaningful exchange with a contextual assistant, the assistant needs to be able to use context to build on things that were previously discussed โ€“ Rasa enables you to build assistants that can do this in a scalable way.

And it also uses aio-pika to interact with RabbitMQ deep inside!

Versioning

This software follows Semantic Versioning

For contributors

Setting up development environment

Clone the project:

git clone https://github.com/mosquito/aio-pika.git
cd aio-pika

Create a new virtualenv for aio-pika:

python3 -m venv env
source env/bin/activate

Install all requirements for aio-pika:

pip install -e '.[develop]'

Running Tests

NOTE: In order to run the tests locally you need to run a RabbitMQ instance with default user/password (guest/guest) and port (5672).

The Makefile provides a command to run an appropriate RabbitMQ Docker image:

make rabbitmq

To test just run:

make test

Editing Documentation

To iterate quickly on the documentation live in your browser, try:

nox -s docs -- serve

Creating Pull Requests

Please feel free to create pull requests, but you should describe your use cases and add some examples.

Changes should follow a few simple rules:

  • When your changes break the public API, you must increase the major version.
  • When your changes are safe for public API (e.g. added an argument with default value)
  • You have to add test cases (see tests/ folder)
  • You must add docstrings
  • Feel free to add yourself to "thank's to" section

aio-pika's People

Contributors

akhoronko avatar altvod avatar blazewicz avatar chibby0ne avatar codecorrupt avatar cprieto avatar darsstar avatar decaz avatar dhontecillas avatar dxist avatar heckad avatar hellysmile avatar iselind avatar jkr78 avatar jmccarrell avatar lancetnik avatar leenr avatar levrik avatar lfse-slafleur avatar mapeper avatar mosquito avatar olegt0rr avatar pixelneo avatar przemyslawfierek-silvair avatar smagafurov avatar taybin avatar tilsche avatar torarvid avatar tvallois avatar zware avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aio-pika's Issues

'passive' param is not accepted by 'Channel.declare_exchange()'

Is there a reason why passive param is not supported while declaring an exchange? It should just be a matter of passing another param to PIKA method (http://pika.readthedocs.io/en/0.10.0/modules/channel.html#pika.channel.Channel.exchange_declare).

I have a case where I'd like to bail out if the exchange does not exist already instead of attempting to create one.

I'd be more than happy to contribute a patch that fixes this issue, but wanted to consult here first :).

Publish to unexistent queue

Exchange.publish doesn't seem to handle unexistent queue. So, if message is published request hangs forever. There should be a way to figure out that the queue is absent ant raise corresponding exception

AMQP Client connect_robust - How to recover gracefully?

I am running 20 consumers concurrently on my app, each one of them running

await queue.consume(consumer_func)

During the lifecycle of my app, I am encountering multiple exceptions like:
RuntimeError('Connection closed'), ConnectionClosed, ChannelClosed
And trying to recover with a decorator and refresh connection method.

self.rmq_channel = await self.rmq_connection.channel()
  File "/Users/eldar/virtual_env/lib/python3.6/site-packages/aio_pika/connection.py", line 26, in wrap
    raise RuntimeError("Connection closed")
RuntimeError: Connection closed
  1. What is the best practice to recover lost/corrupted/blocked connections?

I'm attaching my client and decorator:

Client

class AsyncAMQP:
    def __init__(self, prefetch_count: int = 1):
        self.rmq_connection = None
        self.rmq_channel = None
        self.prefetch_count = prefetch_count

        self.is_connecting = False

    async def init_connetion(self):
        if not all([self.rmq_connection, self.rmq_channel]):
            self.is_connecting = True
            self.rmq_connection = await connect_robust(RABBIT_URL, ssl=True, heartbeat_interval=10)
            self.rmq_channel = await self.rmq_connection.channel()
            # This tells RabbitMQ not to give more than one message to a worker at a time
            await self.rmq_channel.set_qos(prefetch_count=self.prefetch_count or 1)

        self.is_connecting = False
        return self.rmq_connection, self.rmq_channel

    async def close(self):
        if self.rmq_channel:
            await self.rmq_channel.close()

        if self.rmq_connection and not self.rmq_connection.is_closed:
            await self.rmq_connection.close()

    @rabbit_recoverer
    async def post(self, queue_name: str, message: dict):
        rmq_connection, rmq_channel = await self.init_connetion()

        try:
            await rmq_channel.declare_queue(queue_name, durable=True)
            await rmq_channel.default_exchange.publish(Message(message,
                                                               delivery_mode=DeliveryMode.PERSISTENT),
                                                       routing_key=queue_name)
        finally:
            await self.close()

    async def refresh_connection(self):
        if not self.is_connecting:
            await self.close()
            self.rmq_channel = None
            self.rmq_connection = None
            self.rmq_connection, self.rmq_channel = await self.init_connetion()

Decorator

def rabbit_recoverer(func):
    @wraps(func)
    async def wrapped(self, *args, **kwargs):
        response = None
        refresh_conn = False
        try:
            response = await func(self, *args, **kwargs)
        except RuntimeError as e:
            if str(e) == 'Connection closed':
                refresh_conn = True
            else:
                raise
        except (ConnectionClosed, ChannelClosed, ConnectionError, TimeoutError) as e:
            refresh_conn = True
        finally:
            if refresh_conn:
                await self.refresh_connection()
                response = await func(self, *args, **kwargs)
        return response
    return wrapped

Thanks for the help, kudos on the great work!

basic_nack not implemented

Hi,

That's not a big issue since we can still use reject to perform almost the same operation, but although basic_nack is mentioned here in aio-pika's documentation, I don't see any way to perform a nack using aio-pika. Therefore, it's not possible to "reject" multiple messages at once. Moreover, it's semantically a bit different.

Thanks.

Changelog

Hi!
Would it be possible to provide a changelog between releases? It's quite hard to figure out what changed or even if there are breaking changes. Therefore, I'm not confident for updating aio-pika in production.

Context manager for channel

I think a context manager would be nice for a channel, especially in cases where multiple coroutines share a connection and need to handle the case where the task is cancelled

class Channel(...):
    ...

    @asyncio.coroutine
    def __aenter__(self) -> 'Channel':
        return self

    @asyncio.coroutine
    def __aexit__(self, exc_type, exc_val, exc_tb):
        yield from self.close()

Example usage, the queue should auto delete when the task is cancelled but that only happens when the channel closes.

import aio_pika
import asyncio

conn = aio_pika.connect_robust('amqp://...')

async def task():
    async with await conn.channel() as channel:
        exchange = await channel.declare_exchange('exchange')
        queue = await channel.declare_queue(auto_delete=True)
        await queue.bind(exchange, routing_key='key')

        async for msg in queue: # Task is cancelled here
            with msg.process():
                print(msg.body.decode())

if __name__ == '__main__':
    done, pending = asyncio.wait([task(), asyncio.sleep(5.0)], return_when=asyncio.FIRST_COMPLETED)

    for task in pending:
        task.cancel()

crash when delivery mode not specified in message

I started testing compatibility between this module and the asynqp Python AMQP module. aio-pika crashes with a TypeError when reading a message that doesn't specify delivery mode, in message.py line 39:
self.delivery_mode = DeliveryMode(int(delivery_mode)).value
It fails to convert NoneType to an int.
I am not positive whether the delivery mode field of a message is required, but it would be nice to have either cleaner reporting of the problem or defaulting of the value in aio-pika, to check whether it is None before trying to convert it to an int.

Log priorities

This code uses log.warning() in a few places where lower levels would be more appropriate because it is normal operation: deleting a queue or exchange or purging a queue. Those should be debug or info level; I would favor debug.

message.publish hangs forever sometimes

Hi!

It happens that message.publish hangs forever although the message was successfully published. See here.

It happens that I reject messages at the end of a basic_consume callback. Though, when I call reject(), it happens that messages are not actually rejected, and just remain in unacked state until I close the connection.

As soon as I add logging around reject(), it starts working again. Do you see any reason why this is happening and how to workaround this ?

I've been digging into pika and in BaseConnection, there's the _handle_write function that is supposed to write to the socket. As you explained in #7 this might be non-blocking (although I don't see why, because it seems to write to the socket explicitly and expects timeout ; there must be a gotcha). Any chance that this is not flushed before the end of the task under some circumstances?

Thanks!
Gilb's

Consume messages like an async generator

Are there any plans to introduce an async generator consumer?

async for message in queue.consume(...):
    await so_something(message)

(just started looking into the library, so might miss something)

Typo in readme.rst?

An example with message publishing is preceded with "Sample receiver:".
Maybe "Sample publisher" was really meant?

Support x-delayed-message feature!

It will be extremely helpful if you can add x-delayed-message to ExchangeType Enum class.
Currently it's impossible to create such an exchange.

@unique
class ExchangeType(Enum):
    FANOUT = 'fanout'
    DIRECT = 'direct'
    TOPIC = 'topic'
    HEADERS = 'headers'
    X_DELAYED_MESSAGE = 'x-delayed-message'

Queue consume callback is not called

$ cat test.py

import asyncio

from aio_pika import Message, connect

async def on_message(message):
    print(message)

async def test(queue_name):
    connection = await connect('amqp://guest:guest@localhost:5672/')
    channel = await connection.channel()
    await channel.queue_delete(queue_name)
    queue = await channel.declare_queue(queue_name)
    queue.consume(on_message)

    msg = Message(b'Test')
    await channel.default_exchange.publish(msg, routing_key=queue_name)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test(queue_name='test_queue'))
$ pip list | grep pika
aio-pika (0.21.0)
pika (0.10.0)
$ python3.6 test.py
IncomingMessage:{'app_id': None,
 'body_size': 4,
...
 'user_id': None}
$ pip install aio-pika==1.0.0
...
Successfully installed aio-pika-1.0.0
$ pip list | grep pika
aio-pika (1.0.0)
pika (0.10.0)
$ python3.6 test.py
$ 

There are no messages while consuming, but there is one message in the queue (can see it through the management plugin). It's ok within v0.21.0, the problem appears from v1.0.0. Same problem with the latest v1.3.2.

Error in Channel._channel_maker ?

First of all, thanks for making this library available - Having robust asynchronous AMQP clients is something exciting.

I may be wrong (I'm very new to asyncio so this is all a bit confusing to me still), but there seem to be an error in _channel_maker function at channel.py#L68

def _channel_maker(self):
        return self._connection._connection.channel

The first _connection refers to AsyncioConnection, but it does not have a _connection property itself. This means every time the connection is closed, and then re-established, I get this error thrown out:

Traceback (most recent call last):
 [...]
    return self._connection._connection.channel
AttributeError: 'NoneType' object has no attribute 'channel'

Editing the code to remove the second _connection seems to fix the issue for me.

Details:

  • python 3.6.2
  • aio-pika 1.4.2

How to handle ConnectionClose in consume scope?

Hi @mosquito,

I have a consumer for a specific queue (let's call the queue: queue_a).
In queue_a.consume(consume_callback),I'm ack/reject messages manually.

async def consume_callback(message: IncomingMessage):
   try:
        # do stuff
        # message.ack
   except:
        message.reject()

while calling message.reject() I get sometime pika.exceptions.ConnectionClosed exception which I find difficult to refresh!

Any suggestion how to refresh the connection from within consume_callback?

Use pika==0.11.0

Hello,

I am just curious, why aio-pika is restricted to used pika with the version less than 0.11.0? Is there something that might broke code functionality?

Thank you.

RobustChannel does not reset it's delivery-tag after reconnect.

I may be wrong here, I'm not 100% sure on how clients should handle this.

Given I have a RobustConnection, with a channel, and a non-durable, auto-delete queue, and one consumer. All flags are the default.

I can happily publish a messages to the queue.

When I restart rabbit, causing the connection to disconnect, once the connection has recovered, publishing a message results in

2018-01-24 14:45:25 [info     ] Unknown delivery tag 1 for message confirmation "Basic.Ack" [aio_pika.channel]

Additionally, once an unknown delivery tag is found, consumers using that channel no longer appear to consume.

the issue resolves itself by adding self._delivery_tag = 0 to Channel.initalize.

aiopika to aio-libs?

I suggest to rename the project to aiopika and (maybe) move the repository to the https://github.com/aio-libs organization. It would be more pythonic relatively to other asyncio libraries and perhaps bring more users, testers, developers and contributors to the project.

@mosquito what do you think about it?

Race condition _on_delivery_confirmation

InvalidStateError when shutdown.

Connection closed, all aio-pika internal futures cancelled, but still try set result

SystemExit: 0
  File "run.py", line 52, in <module>
    loop.run_until_complete(task)
  File "asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "asyncio/base_events.py", line 1389, in _run_once
    event_list = self._selector.select(timeout)
  File "python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
  File "app/fetcher/parser.py", line 405, in <lambda>
    signal.signal(signal.SIGTERM, lambda *_: finish())
  File "app/fetcher/parser.py", line 403, in finish
    exit(0)
  File "python3.6/_sitebuiltins.py", line 26, in __call__
    raise SystemExit(code)

InvalidStateError: invalid state
  File "aio_pika/channel.py", line 171, in _on_delivery_confirmation
    future.set_result(True)

InvalidStateError: invalid state
  File "pika/callback.py", line 236, in process
    callback(*args, **keywords)
  File "aio_pika/channel.py", line 177, in _on_delivery_confirmation
    future.set_exception(e)

bug found: routing_key not work

in publish example: i.e.
publisher publish to a exchange with routing key, and subscriber bind queue and exchange with routing key.
It shows that routing_key takes no effect and turn out to be error, when the 'queue name' is not the same as 'routing key name', hope fix in the future.

Error "Future exception was never retrieved"

Let's assume that the RabbitMQ server is unavailable. How such error can be suppressed? I'm already trying to catch ConnectionError..

import asyncio
from aio_pika import connect

async def main(loop):
    try:
        connection = await connect('amqp://guest:guest@rabbitmq', loop=loop)
    except ConnectionError:
        print('Error!')

loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()
$ python rabbitmq.py 
Error!
Future exception was never retrieved
future: <Future finished exception=ConnectionRefusedError(gaierror(-3, 'Temporary failure in name resolution'),)>
Traceback (most recent call last):
  File "rabbitmq.py", line 8, in main
    connection = await connect('amqp://guest:guest@rabbitmq', loop=loop)
  File "/tmp/venv/lib/python3.6/site-packages/aio_pika/connection.py", line 209, in connect
    yield from connection.connect()
  File "/tmp/venv/lib/python3.6/site-packages/aio_pika/connection.py", line 147, in connect
    yield from f
ConnectionRefusedError: [Errno -3] Temporary failure in name resolution

Submit payments to maintainers of the Python ecosystem

Since you insist on using bleeding edge components in production, and feel you have the right to harass maintainers when those bleeding edge components have problems, you should be paying the maintainers for their unpaid volunteer time. Contact maintainers to request their Patreon accounts (or other similar services) and send them at least 4 hours of their hourly rate per month.

Reference: pypa/setuptools#1043

API improvements

Interesting looking project! Any plans to port over the improvements you've made in the simple pika asyncio adapter in pull request 768, such as using a pika Credentials object in connect() and saving the basic consumer tag to support canceling it when shutting down? This future-based API is more pleasant to work with.

Support the amqps:// URL scheme

I noticed that while aio-pika accepted URL in its connect functions, the scheme is ignored.

For example:

connection = await aio_pika.connect('amqps://localhost/')

What it does: connect to localhost:5672, without SSL.

What it should do: connect to localhost:5671, with SSL.

Pika already implement that logic with the URLParameters class, so it might be a good idea to delegate the URL parsing functionality to it, and ensure aio-pika's URLs are consistent with pika's.

await queue.get() seems to gets stuck when the queue is empty (not getting new elements)

What should be the correct behaviour in the following case :

async def consumer(queue_in):
    while True:
        msg = await queue_in.get()
        await do_stuff(msg)

From what I observe is that the queue just gets stuck waiting for an element. But when an element is added to the queue, the get() doesn't "see" it, and keeps waiting: 2017-09-07 15:13:50,959 aio_pika.queue DEBUG Awaiting message from queue: <Queue(queue_in): auto_delete=False, durable=None, exclusive=False, arguments=None>

However after adding the timeout:

async def consumer(queue_in):
    while True:
        try:
            msg = await queue_in.get(timeout=3.0)
            await do_stuff(msg)
        except TimeoutError:
            await asyncio.sleep(1.5)

The get() times out (even when an element is added to the queue within the timeout), and in the next iteration it reads all the elements from the queue added in the meantime.

Am I doing something wrong ?

How to deal with ConnectionError

Please document how ConnectionError should be handled. I have the following (simplified) setup:

import asyncio
import logging
from aio_pika import connect

logging.basicConfig(level=logging.DEBUG)

conn = None

async def rmq_connect():
    global conn
    loop = asyncio.get_event_loop()
    conn = await connect('amqp://guest:[email protected]/', loop=loop)

async def close():
    global conn
    await conn.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(rmq_connect())

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.run_until_complete(close())
loop.close()

If the RabbitMQ server goes away, the following happens:

ERROR:pika.adapters.base_connection:Read empty data, calling disconnect
DEBUG:pika.heartbeat:Removing timeout for next heartbeat interval
WARNING:pika.adapters.base_connection:Socket closed when connection was open
DEBUG:pika.callback:Added: {'one_shot': True, 'arguments': None, 'only': None, 'callback': <bound method Connection._on_connection_start of <aio_pika.adapter.AsyncioConnection object at 0x10497e320>>, 'calls': 1}
WARNING:pika.connection:Disconnected from RabbitMQ at 127.0.0.1:5672 (0): Not specified
DEBUG:pika.callback:Processing 0:_on_connection_closed
DEBUG:pika.callback:Calling <function Connection.connect.<locals>._on_connection_lost at 0x103d946a8> for "0:_on_connection_closed"
ERROR:asyncio:Future exception was never retrieved
future: <Future finished exception=ConnectionError('Not specified', 0)>
ConnectionError: [Errno Not specified] 0

While the ERROR from pika.adapters.base_connection is ok, the one from asyncio clearly is not. I've been trying to wrap try-except blocks to every possible place without much luck. I also tried with ensure_future from https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed

I'm clearly missing something obvious here, but i couldn't find much useful after hours of looking around in code, search engines etc.

Client does not seem to be sending heartbeats to RabbitMQ

I'm trying to configure worker -> rabbitmq using aio-pika and I'd like to keep a long lived connection open, but I'm running in to disconnection issues if there is a lull in activity. I'm noticing that if I set heartbeat_interval, this will change how long rabbitmq waits to determine if a connection is 'dead', but has no affect on the interval in which the client sends heartbeats out. In fact, I'm not actually seeing any heartbeats being sent out from the aio-pika client in tcpdump.

Is this a bug in aio-pika or in pika?

Exceptions thrown during reconnect

I am trying to create an AMQP connections that quietly tries to reconnect while the RabbitMQ server is down.

Here's a slightly modified version of the client example:

import asyncio
from aio_pika.robust_connection import RobustConnection

import aio_pika


def on_amqp_lost(connection):
    print('###### on_ampq_lost', connection)


async def ampq(current_loop: asyncio.AbstractEventLoop):
    connection = await aio_pika.connect_robust(
        "amqp://amqp_user:**********@ip-*********.eu-west-1.compute.internal:5672",
        reconnect_interval=10,
        loop=current_loop
    )  # type: RobustConnection

    connection.add_connection_lost_callback(on_amqp_lost)

    async with connection:

        channel = await connection.channel()

        queue = await channel.declare_queue("some_queue", durable=True)

        print('###### connected')

        async for message in queue:
            with message.process():
                print(message.body)


def on_loop_exception(current_loop, context):
    exc = context.get('exception')
    print('###### on_loop_exception %r ' % (exc,))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(ampq(loop))
    loop.set_exception_handler(on_loop_exception)
    loop.run_forever()

This connects fine:

###### connected
b'{"message": "hello"}'

After restarting the RabbitMQ server (takes about 3 seconds) an Auto Reconnect Error is thrown but the connection seems to recover

###### on_ampq_lost amqp://amqp_user:********@ip-*********.eu-west-1.compute.internal:5672
###### on_loop_exception ConnectionError('Auto Reconnect Error',) 
b'{"message": "hello"}'

If I shut down the RabbitMQ server I get the last 3 exceptions shown below repeatedly:

###### on_ampq_lost amqp://amqp_user:********@ip-*********.eu-west-1.compute.internal:5672
###### on_loop_exception ConnectionError('Auto Reconnect Error',) 
###### on_loop_exception AttributeError("'NoneType' object has no attribute 'channel'",) 
###### on_loop_exception ConnectionError('Auto Reconnect Error',) 

Once the RabbitMQ server is up again the connection recovers.

How can I avoid these exceptions? In my use case there are other connections open on the same loop and I want to stop it once any of these disconnects.

Error log for normal shutdown

Hi,

When connection is closed normally, aio_pika outputs an error log invariably : aio_pika.channel - ERROR - Channel <pika.channel.Channel object at 0x7fdc76040c50> closed: 0. See

log.error("Channel %r closed: %d - %s", channel, code, reason)
.

This should happen only when the connection is closed abnormally.

ACK is not awaitable

Hi there,

Looking at this section of the documentation, message.ack() is not awaited, which means that the ack will be blocking. Do you hope that it won't block long enough to be significant? What if RabbitMQ timeouts? Is that due to some implementation difficulty? (AFAICS, neither can Twisted consumer await the ack).

Thanks.

Sample code not working

I'm refering to the sample code in the project's README.

First, I had to change acyncio in asyncio. Next I had to also import Message from aio_pika. But even then, I get this error:

Traceback (most recent call last):
  File "t.py", line 47, in <module>
    loop.run_until_complete(main(loop))
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
    return future.result()
  File "t.py", line 35, in main
    incoming_message = yield from queue.get(timeout=5)
  File "venv/lib/python3.6/site-packages/aio_pika/queue.py", line 145, in get
    no_ack=no_ack,
  File "venv/lib/python3.6/site-packages/aio_pika/message.py", line 150, in __init__
    self.consumer_tag = envelope.consumer_tag
AttributeError: 'GetOk' object has no attribute 'consumer_tag'

What is going on?

Thanks for aio-pika!

Publishing while consuming

The worker should get new task from the task_queue, publish new message into the task_queue2 and acknowledge the received task from the task_queue.

Let's assume that the worker is not running. Now run new_task.py: 3 tasks were sent into the task_queue. Now run worker.py: 3 tasks were received from the task_queue, 3 tasks were sent into the task_queue2, BUT 1 task left unacknowledged in the task_queue. Can you help me to get to the reason, why? Maybe I am doing something wrong?

P.S.: Number of tasks in the new_task.py (default is 3) can be changed - the higher this number is, the more unacknowledged tasks are left in the task_queue after running the worker.

new_task.py

import asyncio
from aio_pika import connect, Message, DeliveryMode

async def main():
    connection = await connect("amqp://guest:guest@rabbit/")
    channel = await connection.channel()
    message_body = b"Hello World!"
    message = Message(message_body, delivery_mode=DeliveryMode.PERSISTENT)
    for _ in range(3):
        await channel.default_exchange.publish(message, routing_key='task_queue')
        print("Sent %r" % message)
    await connection.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

worker.py

import asyncio
from aio_pika import connect, IncomingMessage, Message, DeliveryMode

channel = None

async def on_message(message):
    with message.process():
        print("Received message %r" % message.body)
        new_message = Message(b'New message', delivery_mode=DeliveryMode.PERSISTENT)
        await channel.default_exchange.publish(new_message, routing_key='task_queue2')

async def main():
    global channel
    connection = await connect("amqp://guest:guest@rabbit/")
    channel = await connection.channel()
    queue = await channel.declare_queue('task_queue', durable=True)
    await channel.declare_queue('task_queue2', durable=True)
    queue.consume(on_message)

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

`basic_cancel` is not implemented

Hi there,

As far as I can see in the code and in the documentation, basic_cancel is not implemented in aio-pika. Moreover, it's currently not possible to hack around this, since aio-pika's consume method doesn't return the consumer tag.

May I submit a PR for this?

Thanks,
Gilb's

Event loop is closed when exiting QueueIterator

I've been trying to use QueueIterator. I run the example from the home page of docs for simple producer and consumer. I'm getting this error:

Exception ignored in: <bound method QueueIterator.__del__ of <aio_pika.queue.QueueIterator object at 0x10d7da6a0>>
Traceback (most recent call last):
  File "/Users/bitrut/.virtualenvs/conradlib/lib/python3.6/site-packages/aio_pika/queue.py", line 362, in __del__
    self.close()
  File "/Users/bitrut/.virtualenvs/conradlib/lib/python3.6/site-packages/aio_pika/queue.py", line 359, in close
    return self.loop.create_task(self._close())
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 282, in create_task
    self._check_closed()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

I can't figure out what's wrong.

Expose message/consumer counts from declare_queue

Due to the Queue abstractions it's currently impossible to safely get the actual result of the queue.declare operation, along with the message_count and consumer_count.

A trivial solution would be to add message_count and consumer_count to Queue, and then set it here along with name. Would this be an acceptable approach for a PR?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.