Git Product home page Git Product logo

Comments (8)

apmorton avatar apmorton commented on June 2, 2024 1

(originally posted this on #847, but on further investigation I think this is the correct issue)

@ods I have bisected this issue to #802

Consider the following script:

import asyncio
import logging
import time

import aiokafka


async def consume_task(consumer):
    async for msg in consumer:
        print(msg)
        await consumer.commit()


async def lag_task(producer):
    while True:
        print("sleeping")
        await asyncio.sleep(10)
        print("inducing lag")
        time.sleep(40)
        print("sending message")
        await producer.send_and_wait(
            topic='some_topic',
            value=b'a message',
        )


async def main():
    async with (
        aiokafka.AIOKafkaProducer() as producer,
        aiokafka.AIOKafkaConsumer(
            'some_topic',
            group_id='some_group',
            enable_auto_commit=False,
        ) as consumer,
    ):
        await consumer.seek_to_end()

        task1 = asyncio.create_task(consume_task(consumer))
        task2 = asyncio.create_task(lag_task(producer))

        consumer._fetcher._fetch_task.add_done_callback(lambda t: print('fetch task done'))

        await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)
        print("something finished")


if __name__ == '__main__':
    # logging.basicConfig(level=logging.DEBUG)
    asyncio.run(main())

Prior to #802 this will print:

sleeping
inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group some_group.
sleeping
ConsumerRecord(topic='some_topic', partition=0, offset=10, timestamp=1715232041461, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())

After #802 this will print:

sleeping
inducing lag
sending message
Failed fetch messages from 0: [Error 7] RequestTimedOutError
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group some_group.
sleeping
fetch task done

Notice the ConsumerRecord is not printed and fetch task done is.

If you turn on debug logging you will see that after the induced lag we stop getting:

DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition

No new messages will ever be received by consume_task after this point.

Reverting this single line from #802 restores the original behavior:

diff --git a/aiokafka/conn.py b/aiokafka/conn.py
index da27fd2..2ceb9ba 100644
--- a/aiokafka/conn.py
+++ b/aiokafka/conn.py
@@ -450,7 +450,7 @@ class AIOKafkaConnection:
             return self._writer.drain()
         fut = self._loop.create_future()
         self._requests.append((correlation_id, request.RESPONSE_TYPE, fut))
-        return wait_for(fut, self._request_timeout)
+        return asyncio.wait_for(fut, self._request_timeout)

     def _send_sasl_token(self, payload, expect_response=True):
         if self._writer is None:

from aiokafka.

AxTheB avatar AxTheB commented on June 2, 2024

When we replaced the fetch task with fresh one, it did not receive new messages (there were messages in assigned partition) and got finished too, without logging anything. I see only one way for this task to get to finished state, and that is by swallowing the CancelledError.
If the error got propagated, it would allow the app to restart and quickly resume consuming messages.

from aiokafka.

ods avatar ods commented on June 2, 2024

The only place where _fetch_task is cancelled is the Fetcher.close() method. Or do you see other options? Also, for this exception to be propagated, you have to await the task, which is also done in close() method only.

from aiokafka.

ods avatar ods commented on June 2, 2024

Hmm, there is other option to get CancelledError here:

                    for task in self._pending_tasks:
                        # Those tasks should have proper handling for
                        # cancellation
                        if not task.done():
                            task.cancel()
                        await task

from aiokafka.

pavelschon avatar pavelschon commented on June 2, 2024

Can you confirm this is a bug in aiokafka?

On our side, we had to implement a workaround

if consumer._fetcher._fetch_task.done():
   # restart app

from aiokafka.

ods avatar ods commented on June 2, 2024

Hi @apmorton,
Thank you for reproducing the problem. Right, in Python from 3.8.6 there was a bug in asyncio.wait_for() which was fixed in 3.12 by using the same approach, as used here. No surprise that a bug that doesn't handle some exception and another that swallows it may compensate each other.

from aiokafka.

ods avatar ods commented on June 2, 2024

Other similar places we also can have problems with:

from aiokafka.

ods avatar ods commented on June 2, 2024

Here is a snippet to demonstrate the problem with just suppressing CancelledError:

import asyncio
from time import time

async def task_with_cleanup():
    try:
        await asyncio.sleep(1000)
    finally:
        print("Cleanup", time() - started)
        await asyncio.sleep(2)

async def worker():
    task = asyncio.create_task(task_with_cleanup())
    await asyncio.sleep(0)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("CancelledError is suppressed", time() - started)
    await asyncio.sleep(10)

async def main():
    await asyncio.wait_for(worker(), timeout=1)
    elapsed = time() - started
    assert elapsed < 2, elapsed

started = time()
asyncio.run(main())

from aiokafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.