Comments (8)
(originally posted this on #847, but on further investigation I think this is the correct issue)
@ods I have bisected this issue to #802
Consider the following script:
import asyncio
import logging
import time
import aiokafka
async def consume_task(consumer):
async for msg in consumer:
print(msg)
await consumer.commit()
async def lag_task(producer):
while True:
print("sleeping")
await asyncio.sleep(10)
print("inducing lag")
time.sleep(40)
print("sending message")
await producer.send_and_wait(
topic='some_topic',
value=b'a message',
)
async def main():
async with (
aiokafka.AIOKafkaProducer() as producer,
aiokafka.AIOKafkaConsumer(
'some_topic',
group_id='some_group',
enable_auto_commit=False,
) as consumer,
):
await consumer.seek_to_end()
task1 = asyncio.create_task(consume_task(consumer))
task2 = asyncio.create_task(lag_task(producer))
consumer._fetcher._fetch_task.add_done_callback(lambda t: print('fetch task done'))
await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)
print("something finished")
if __name__ == '__main__':
# logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
Prior to #802 this will print:
sleeping
inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group some_group.
sleeping
ConsumerRecord(topic='some_topic', partition=0, offset=10, timestamp=1715232041461, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())
After #802 this will print:
sleeping
inducing lag
sending message
Failed fetch messages from 0: [Error 7] RequestTimedOutError
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group some_group.
sleeping
fetch task done
Notice the ConsumerRecord
is not printed and fetch task done
is.
If you turn on debug logging you will see that after the induced lag we stop getting:
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition
No new messages will ever be received by consume_task
after this point.
Reverting this single line from #802 restores the original behavior:
diff --git a/aiokafka/conn.py b/aiokafka/conn.py
index da27fd2..2ceb9ba 100644
--- a/aiokafka/conn.py
+++ b/aiokafka/conn.py
@@ -450,7 +450,7 @@ class AIOKafkaConnection:
return self._writer.drain()
fut = self._loop.create_future()
self._requests.append((correlation_id, request.RESPONSE_TYPE, fut))
- return wait_for(fut, self._request_timeout)
+ return asyncio.wait_for(fut, self._request_timeout)
def _send_sasl_token(self, payload, expect_response=True):
if self._writer is None:
from aiokafka.
When we replaced the fetch task with fresh one, it did not receive new messages (there were messages in assigned partition) and got finished too, without logging anything. I see only one way for this task to get to finished state, and that is by swallowing the CancelledError
.
If the error got propagated, it would allow the app to restart and quickly resume consuming messages.
from aiokafka.
The only place where _fetch_task
is cancelled is the Fetcher.close()
method. Or do you see other options? Also, for this exception to be propagated, you have to await the task, which is also done in close()
method only.
from aiokafka.
Hmm, there is other option to get CancelledError
here:
for task in self._pending_tasks:
# Those tasks should have proper handling for
# cancellation
if not task.done():
task.cancel()
await task
from aiokafka.
Can you confirm this is a bug in aiokafka?
On our side, we had to implement a workaround
if consumer._fetcher._fetch_task.done():
# restart app
from aiokafka.
Hi @apmorton,
Thank you for reproducing the problem. Right, in Python from 3.8.6 there was a bug in asyncio.wait_for()
which was fixed in 3.12 by using the same approach, as used here. No surprise that a bug that doesn't handle some exception and another that swallows it may compensate each other.
from aiokafka.
Other similar places we also can have problems with:
- https://github.com/aio-libs/aiokafka/blob/master/aiokafka/consumer/fetcher.py#L456-L457
- https://github.com/aio-libs/aiokafka/blob/master/aiokafka/consumer/group_coordinator.py#L761-L762
- https://github.com/aio-libs/aiokafka/blob/master/aiokafka/consumer/group_coordinator.py#L889-L890
- https://github.com/aio-libs/aiokafka/blob/master/aiokafka/producer/sender.py#L98-L99
from aiokafka.
Here is a snippet to demonstrate the problem with just suppressing CancelledError
:
import asyncio
from time import time
async def task_with_cleanup():
try:
await asyncio.sleep(1000)
finally:
print("Cleanup", time() - started)
await asyncio.sleep(2)
async def worker():
task = asyncio.create_task(task_with_cleanup())
await asyncio.sleep(0)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("CancelledError is suppressed", time() - started)
await asyncio.sleep(10)
async def main():
await asyncio.wait_for(worker(), timeout=1)
elapsed = time() - started
assert elapsed < 2, elapsed
started = time()
asyncio.run(main())
from aiokafka.
Related Issues (20)
- [QUESTION] How to check readiness of kafka to receive msgs from producer?
- seek_to_committed does not work when committed offset is 0
- Unexpected error during batch delivery HOT 1
- IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol. HOT 1
- [QUESTION] Restarting `AIOKafkaConsumer` after `AIOKafkaConsumer.stop()` HOT 3
- Let's put `aiokafka` under the `aio-libs` org on PyPI HOT 2
- Add create_acls function for kafka admin client
- AIOKafkaProducer failed to produce message with headers HOT 3
- I keep getting MessageSizeTooLargeError, error message gives size much bigger than actual message were given to producer. HOT 9
- Add delete_records to the admin client HOT 5
- Can't connect to kafka docker HOT 1
- asyncio.exceptions.CancelledError
- Regarding Kafka Connection
- invalid Type AioKafkaAdminClient create_partitions
- Proposal to Add Type Hints HOT 12
- High Incoming request sum on Azure Event Hub
- [QUESTION] Unable connect to node with id: X: [Errno 111]: Connection refused
- admin client - failure to create topics (error code 41) HOT 3
- [QUESTION] How to get old messages from topics? HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aiokafka.