Comments (9)
Are you by any chance using ack=0
?
from aiokafka.
@Drizzt1991 No we are using default acks=1
from aiokafka.
@Drizzt1991 What do you think about that problem? I could use retry_backoff_ms
to limit reconnecting so it is not instant. Should I go for it?
from aiokafka.
@Artimi Hi there, sorry for the long response.
It seems to me like this is not a bug in closing the connection, but rather in some logic part of the application.
Did you try some fix already and it helped?
from aiokafka.
@Artimi The deal here is that you got Connection at ... closed
. This is not an error you get on connection break (it would write Connection at ... broken
), but raised when you close the connection using conn.close()
.
The only places connections are closed are you're fix for KafkaTimeout, producer.stop()
, consumer.stop()
. And I'm still not sure how you even got that situation.
from aiokafka.
And could you confirm, that there's no Unexpected error in sender routine
logs.
from aiokafka.
Hey @Drizzt1991 thanks for the response. I again walk through the logs and found that there was CorrelationIdError
May 23rd 2017, 08:25:57.908 Unable to request metadata from node with id 0: CorrelationIdError: Correlation ids do not match: sent 18, recv 19
May 23rd 2017, 08:25:57.909 Unable to update metadata from [0]
May 23rd 2017, 08:25:57.912 Got error produce response: ConnectionError: Connection at kafka...:1111 closed
May 23rd 2017, 08:36:50.484 Error during sending message\
Traceback (most recent call last):\
File "/home/artimi/env/lib/python3.5/site-packages/messaging_client/mqclient/kafka_producer.py", line 72, in _run\
await self._producer.send(topic, payload)\
File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/producer.py", line 279, in send\
tp, key_bytes, value_bytes, self._request_timeout_ms / 1000)\
File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/message_accumulator.py", line 208, in add_message\
raise KafkaTimeoutError()\
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
Second occurrence was similar:
May 17th 2017, 09:40:54.684 Unable to request metadata from node with id 0: ConnectionError: Connection at kafka...:1111 closed
May 17th 2017, 09:40:54.684 Got error produce response: CorrelationIdError: Correlation ids do not match: sent 50801, recv 50802
May 17th 2017, 09:40:54.685 Unable to update metadata from [0]
May 17th 2017, 09:41:32.498 Error during sending message\
Traceback (most recent call last):\
File "/home/artimi/env/lib/python3.5/site-packages/messaging_client/mqclient/kafka_producer.py", line 72, in _run\
await self._producer.send(topic, payload)\
File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/producer.py", line 279, in send\
tp, key_bytes, value_bytes, self._request_timeout_ms / 1000)\
File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/message_accumulator.py", line 208, in add_message\
raise KafkaTimeoutError()\
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
As I'm looking at it now I think that there might be some help from my side. We are calling producer.client._metadata_update
directly at start of our producer to have current metadata (it is because we want to force kafka to create topic for producing if missing). However result of _metadata_update
was not checked and retried. So it could happen that we fail at getting new metadata and don't retry it. In producer we have default metadata_max_age_ms = 300000
so it can wait another 5 minute to get metadata and this might somehow affect the message accumulator. Maybe it will be just enough to try get metadata until I succeed at start. But I think that the connection should be reset in AIOKafkaProducer._send_produce_req
because it is retriable error.
from aiokafka.
@Drizzt1991 It seems that since we fixed our error with _metadata_update
we no longer have this problem. So it seems that problem was indeed failed _metadata_update
that was not retried and before it could be updated again it raised KafkaTimeoutError
and ended in that weird state. I will watch that but I want to believe it was solved. Should I close the issue?
from aiokafka.
@Artimi Hey, so I still could not find how you managed to get a CorrelationIdError
, but we made a lot of fixes to producer in v0.2.3. Please try it and re-open or open a new ticket if you have any other issues.
from aiokafka.
Related Issues (20)
- [QUESTION] How to check readiness of kafka to receive msgs from producer?
- seek_to_committed does not work when committed offset is 0
- Unexpected error during batch delivery HOT 1
- Can I use Azure Event Hubs with `aiokafka`? HOT 1
- 0.9.0 zstd codec depends on cramjam but missing in documentation HOT 1
- Let's put `aiokafka` under the `aio-libs` org on PyPI HOT 2
- Add create_acls function for kafka admin client
- AIOKafkaProducer failed to produce message with headers HOT 3
- I keep getting MessageSizeTooLargeError, error message gives size much bigger than actual message were given to producer. HOT 6
- Add delete_records to the admin client HOT 5
- Can't connect to kafka docker HOT 1
- asyncio.exceptions.CancelledError
- Regarding Kafka Connection
- invalid Type AioKafkaAdminClient create_partitions
- Proposal to Add Type Hints HOT 12
- Consumer stopped consuming, task Fetcher._fetch_task has finished HOT 8
- High Incoming request sum on Azure Event Hub
- [QUESTION] Unable connect to node with id: X: [Errno 111]: Connection refused
- admin client - failure to create topics (error code 41) HOT 3
- [QUESTION] How to get old messages from topics? HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aiokafka.