Comments (17)
@achille-roussel I would love to contribute a patch. Should we just check for io.EOF
here and return nil?
from kafka-go.
If yes ^ this seems like a really small change, and I'd like to contribute a test too.
Here are the test failures on my fork:
# github.com/briansorahan/kafka-go_test
./compression_test.go:35:11: cannot assign int to r1 (type kafka.Message) in multiple assignment
./compression_test.go:35:21: not enough arguments in call to m.CompressionCodec.Encode
have ()
want ([]byte, []byte)
./compression_test.go:41:11: cannot assign int to r2 (type kafka.Message) in multiple assignment
./compression_test.go:41:22: not enough arguments in call to r1.CompressionCodec.Decode
have ()
want ([]byte, []byte)
./compression_test.go:127:24: not enough arguments in call to msg.CompressionCodec.Encode
have ()
want ([]byte, []byte)
./compression_test.go:132:26: m1.Value undefined (type int has no field or method Value)
./compression_test.go:134:14: m1.Decode undefined (type int has no field or method Decode)
FAIL github.com/briansorahan/kafka-go [build failed]
With a little guidance on how to fix them, I'd be happy to do that too.
from kafka-go.
This definitely sounds like it's breaking what the documentation says the behavior should be.
The fix doesn't seem to hard (checking of io.EOF
), would you be able to contribute a fix for this?
from kafka-go.
After getting past these errors ^ I discovered that some of the tests also to have a kafka broker running locally.
At least that's how I interpret this:
--- FAIL: TestDialer (0.00s)
--- FAIL: TestDialer/looking_up_partitions_returns_the_list_of_available_partitions_for_a_topic (0.01s)
reader_test.go:262: bad conn
dialer_test.go:55: dial tcp 127.0.0.1:9092: connect: connection refused
I would suggest putting a build tag on these, something like
// +build integration
so that users can run go test
without a lot of fuss.
wdyt?
from kafka-go.
I would be careful about adding io.EOF
checks @achille-roussel. Gotta remember that kafka proactively kills the connection on protocol errors, this could be one of those.
There are 4 versions of create topics in the kafka protocol:
https://kafka.apache.org/protocol#The_Messages_CreateTopics
I'm not sure which we're using in kafka-go, but we'd need to rule that out.
@Pryz do we have an exact version of kafka that kafka-go works with for sure? We should put that in the readme.
from kafka-go.
@abraithwaite It's more about version of the message than version of Kafka. kafka-go will work just fine with all the kafka versions as long as you use message v0 and v1 formats. See : https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
from kafka-go.
Couldn't we just document that the way to acheive idempotency is to check for TopicAlreadyExists?
i.e. Can we just get rid of the switch?
from kafka-go.
In fact, when I set up my little test I was expecting to see something like this ^
from kafka-go.
I too am running into this issue. In my case, I get the EOF error when the topic does not exist and it does not get created.
Not sure if my request is wrong or if the client needs an update to be in line with new Kafka api.
Using kafka_2.11-v1.1.1-rc1
from kafka-go.
@briansorahan I'm not sure how different it would be to return the "TopciAlreadyExists" error rather than checking for err == nil
, do you have an example of how this would help you?
@abraithwaite you're making a good point that EOF may mean that the topic wasn't created as well... but in that case any future attempt to read or write to the topic will fail.
from kafka-go.
@achille-roussel I'm suggesting that we avoid hiding TopicAlreadyExists from callers since they can just detect it and make their own decision. As long as we document which errors the method can return, and how to interpret each one, then it's up to the caller to implement their own policy.
from kafka-go.
I’m a bit concerned about breaking programs that depend on the current behavior, but at the same time it’s true that we have no way of detecting if a topic already existed right now, so it seems like this is something that needs fixing.
Would you be able to send a pull request to make the change?
from kafka-go.
I can submit a PR with that change, but I'd like to document io.EOF and what it means.
from kafka-go.
@briansorahan do you still want to make this happen? I think the idea of not making CreateTopics idempotent is probably an improvement.
Regarding io.EOF
, it seems always unexpected to get this error on CreateTopics, so we may want to wrap it with a more descriptive error message (but I'm not sure under what conditions would Kafka close the connection in this case).
from kafka-go.
Honestly I've lost interest in this since confluent-kafka-go now supports creating topics
from kafka-go.
Echoing @abraithwaite, this sounds very much a case of the broker not understanding the version of the request and closing the connection. It looks like we were sending v2
of the CreateTopicsRequest
up until #81. Now we're sending v0
, which will have wider support. I'd expect this to no longer be an issue.
from kafka-go.
Closing this issue. Please re-open if you see this behavior on Kafka >= 0.10.0!
from kafka-go.
Related Issues (20)
- `reader.Close()` stuck randomly HOT 1
- Make `(msg *Message) totalSize()` public
- Asynchronous mode in kafka package not working
- Duplicate consumption occurs when join a new consumer and CommitInterval is configured HOT 1
- After increasing partition count, consumer group is not assigned to the new partitions. HOT 1
- Observability : Kafka Consumer/Producer metrics and tracing.
- Subscribe topic on demand
- A new method for Batch called ReadIntoMessage
- authorization retries
- What is the difference between kafka.NewConsumerGroup and kafka.NewReader,
- What is QueueCapacity param ?
- Offset is going ahead and we are missing message with lag HOT 3
- How do I specify partition when producing messages,I want to implement ordered messages HOT 1
- AllowAutoTopicCreation does not exist in writer config, but only in the writer struct HOT 1
- Go1.16 cannot run tests HOT 2
- Feture request: logger with different levels HOT 1
- Issue: Getting half partitions while Consumer Group Migration
- Do not panic on errors
- Making the producer backoff algorithm configurable
- a lot of unexpected EOF error when reading from partition
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-go.