Git Product home page Git Product logo

kafka-go's Introduction

kafka-go CircleCI Go Report Card GoDoc

Motivations

We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Go client libraries for Kafka at the time of this writing was not ideal. The available options were:

  • sarama, which is by far the most popular but is quite difficult to work with. It is poorly documented, the API exposes low level concepts of the Kafka protocol, and it doesn't support recent Go features like contexts. It also passes all values as pointers which causes large numbers of dynamic memory allocations, more frequent garbage collections, and higher memory usage.

  • confluent-kafka-go is a cgo based wrapper around librdkafka, which means it introduces a dependency to a C library on all Go code that uses the package. It has much better documentation than sarama but still lacks support for Go contexts.

  • goka is a more recent Kafka client for Go which focuses on a specific usage pattern. It provides abstractions for using Kafka as a message passing bus between services rather than an ordered log of events, but this is not the typical use case of Kafka for us at Segment. The package also depends on sarama for all interactions with Kafka.

This is where kafka-go comes into play. It provides both low and high level APIs for interacting with Kafka, mirroring concepts and implementing interfaces of the Go standard library to make it easy to use and integrate with existing software.

Note:

In order to better align with our newly adopted Code of Conduct, the kafka-go project has renamed our default branch to main. For the full details of our Code Of Conduct see this document.

Kafka versions

kafka-go is currently tested with Kafka versions 0.10.1.0 to 2.7.1. While it should also be compatible with later versions, newer features available in the Kafka API may not yet be implemented in the client.

Go versions

kafka-go requires Go version 1.15 or later.

Connection GoDoc

The Conn type is the core of the kafka-go package. It wraps around a raw network connection to expose a low-level API to a Kafka server.

Here are some examples showing typical use of a connection object:

// to produce messages
topic := "my-topic"
partition := 0

conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
    log.Fatal("failed to dial leader:", err)
}

conn.SetWriteDeadline(time.Now().Add(10*time.Second))
_, err = conn.WriteMessages(
    kafka.Message{Value: []byte("one!")},
    kafka.Message{Value: []byte("two!")},
    kafka.Message{Value: []byte("three!")},
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := conn.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}
// to consume messages
topic := "my-topic"
partition := 0

conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
    log.Fatal("failed to dial leader:", err)
}

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    n, err := batch.Read(b)
    if err != nil {
        break
    }
    fmt.Println(string(b[:n]))
}

if err := batch.Close(); err != nil {
    log.Fatal("failed to close batch:", err)
}

if err := conn.Close(); err != nil {
    log.Fatal("failed to close connection:", err)
}

To Create Topics

By default kafka has the auto.create.topics.enable='true' (KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='true' in the bitnami/kafka kafka docker image). If this value is set to 'true' then topics will be created as a side effect of kafka.DialLeader like so:

// to create topics when auto.create.topics.enable='true'
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0)
if err != nil {
    panic(err.Error())
}

If auto.create.topics.enable='false' then you will need to create topics explicitly like so:

// to create topics when auto.create.topics.enable='false'
topic := "my-topic"

conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer controllerConn.Close()


topicConfigs := []kafka.TopicConfig{
    {
        Topic:             topic,
        NumPartitions:     1,
        ReplicationFactor: 1,
    },
}

err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
    panic(err.Error())
}

To Connect To Leader Via a Non-leader Connection

// to connect to the kafka leader via an existing non-leader connection rather than using DialLeader
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer connLeader.Close()

To list topics

conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

partitions, err := conn.ReadPartitions()
if err != nil {
    panic(err.Error())
}

m := map[string]struct{}{}

for _, p := range partitions {
    m[p.Topic] = struct{}{}
}
for k := range m {
    fmt.Println(k)
}

Because it is low level, the Conn type turns out to be a great building block for higher level abstractions, like the Reader for example.

Reader GoDoc

A Reader is another concept exposed by the kafka-go package, which intends to make it simpler to implement the typical use case of consuming from a single topic-partition pair. A Reader also automatically handles reconnections and offset management, and exposes an API that supports asynchronous cancellations and timeouts using Go contexts.

Note that it is important to call Close() on a Reader when a process exits. The kafka server needs a graceful disconnect to stop it from continuing to attempt to send messages to the connected clients. The given example will not call Close() if the process is terminated with SIGINT (ctrl-c at the shell) or SIGTERM (as docker stop or a kubernetes restart does). This can result in a delay when a new reader on the same topic connects (e.g. new process started or new container running). Use a signal.Notify handler to close the reader on process shutdown.

// make a new reader that consumes from topic-A, partition 0, at offset 42
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092","localhost:9093", "localhost:9094"},
    Topic:     "topic-A",
    Partition: 0,
    MaxBytes:  10e6, // 10MB
})
r.SetOffset(42)

for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
        break
    }
    fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
}

Consumer Groups

kafka-go also supports Kafka consumer groups including broker managed offsets. To enable consumer groups, simply specify the GroupID in the ReaderConfig.

ReadMessage automatically commits offsets when using consumer groups.

// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:   "consumer-group-id",
    Topic:     "topic-A",
    MaxBytes:  10e6, // 10MB
})

for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
        break
    }
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
}

There are a number of limitations when using consumer groups:

  • (*Reader).SetOffset will return an error when GroupID is set
  • (*Reader).Offset will always return -1 when GroupID is set
  • (*Reader).Lag will always return -1 when GroupID is set
  • (*Reader).ReadLag will return an error when GroupID is set
  • (*Reader).Stats will return a partition of -1 when GroupID is set

Explicit Commits

kafka-go also supports explicit commits. Instead of calling ReadMessage, call FetchMessage followed by CommitMessages.

ctx := context.Background()
for {
    m, err := r.FetchMessage(ctx)
    if err != nil {
        break
    }
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
    if err := r.CommitMessages(ctx, m); err != nil {
        log.Fatal("failed to commit messages:", err)
    }
}

When committing messages in consumer groups, the message with the highest offset for a given topic/partition determines the value of the committed offset for that partition. For example, if messages at offset 1, 2, and 3 of a single partition were retrieved by call to FetchMessage, calling CommitMessages with message offset 3 will also result in committing the messages at offsets 1 and 2 for that partition.

Managing Commits

By default, CommitMessages will synchronously commit offsets to Kafka. For improved performance, you can instead periodically commit offsets to Kafka by setting CommitInterval on the ReaderConfig.

// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    MaxBytes:       10e6, // 10MB
    CommitInterval: time.Second, // flushes commits to Kafka every second
})

Writer GoDoc

To produce messages to Kafka, a program may use the low-level Conn API, but the package also provides a higher level Writer type which is more appropriate to use in most cases as it provides additional features:

  • Automatic retries and reconnections on errors.
  • Configurable distribution of messages across available partitions.
  • Synchronous or asynchronous writes of messages to Kafka.
  • Asynchronous cancellation using contexts.
  • Flushing of pending messages on close to support graceful shutdowns.
  • Creation of a missing topic before publishing a message. Note! it was the default behaviour up to the version v0.4.30.
// make a writer that produces to topic-A, using the least-bytes distribution
w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:   "topic-A",
	Balancer: &kafka.LeastBytes{},
}

err := w.WriteMessages(context.Background(),
	kafka.Message{
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

Missing topic creation before publication

// Make a writer that publishes messages to topic-A.
// The topic will be created if it is missing.
w := &Writer{
    Addr:                   kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
    Topic:                  "topic-A",
    AllowAutoTopicCreation: true,
}

messages := []kafka.Message{
    {
        Key:   []byte("Key-A"),
        Value: []byte("Hello World!"),
    },
    {
        Key:   []byte("Key-B"),
        Value: []byte("One!"),
    },
    {
        Key:   []byte("Key-C"),
        Value: []byte("Two!"),
    },
}

var err error
const retries = 3
for i := 0; i < retries; i++ {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // attempt to create topic prior to publishing the message
    err = w.WriteMessages(ctx, messages...)
    if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
        time.Sleep(time.Millisecond * 250)
        continue
    }

    if err != nil {
        log.Fatalf("unexpected error %v", err)
    }
    break
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

Writing to multiple topics

Normally, the WriterConfig.Topic is used to initialize a single-topic writer. By excluding that particular configuration, you are given the ability to define the topic on a per-message basis by setting Message.Topic.

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
    // NOTE: When Topic is not defined here, each Message must define it instead.
	Balancer: &kafka.LeastBytes{},
}

err := w.WriteMessages(context.Background(),
    // NOTE: Each Message has Topic defined, otherwise an error is returned.
	kafka.Message{
        Topic: "topic-A",
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
        Topic: "topic-B",
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
        Topic: "topic-C",
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

NOTE: These 2 patterns are mutually exclusive, if you set Writer.Topic, you must not also explicitly define Message.Topic on the messages you are writing. The opposite applies when you do not define a topic for the writer. The Writer will return an error if it detects this ambiguity.

Compatibility with other clients

Sarama

If you're switching from Sarama and need/want to use the same algorithm for message partitioning, you can either use the kafka.Hash balancer or the kafka.ReferenceHash balancer:

  • kafka.Hash = sarama.NewHashPartitioner
  • kafka.ReferenceHash = sarama.NewReferenceHashPartitioner

The kafka.Hash and kafka.ReferenceHash balancers would route messages to the same partitions that the two aforementioned Sarama partitioners would route them to.

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:    "topic-A",
	Balancer: &kafka.Hash{},
}

librdkafka and confluent-kafka-go

Use the kafka.CRC32Balancer balancer to get the same behaviour as librdkafka's default consistent_random partition strategy.

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:    "topic-A",
	Balancer: kafka.CRC32Balancer{},
}

Java

Use the kafka.Murmur2Balancer balancer to get the same behaviour as the canonical Java client's default partitioner. Note: the Java class allows you to directly specify the partition which is not permitted.

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:    "topic-A",
	Balancer: kafka.Murmur2Balancer{},
}

Compression

Compression can be enabled on the Writer by setting the Compression field:

w := &kafka.Writer{
	Addr:        kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:       "topic-A",
	Compression: kafka.Snappy,
}

The Reader will by determine if the consumed messages are compressed by examining the message attributes. However, the package(s) for all expected codecs must be imported so that they get loaded correctly.

Note: in versions prior to 0.4 programs had to import compression packages to install codecs and support reading compressed messages from kafka. This is no longer the case and import of the compression packages are now no-ops.

TLS Support

For a bare bones Conn type or in the Reader/Writer configs you can specify a dialer option for TLS support. If the TLS field is nil, it will not connect with TLS. Note: Connecting to a Kafka cluster with TLS enabled without configuring TLS on the Conn/Reader/Writer can manifest in opaque io.ErrUnexpectedEOF errors.

Connection

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},
}

conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")

Reader

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})

Writer

Direct Writer creation

w := kafka.Writer{
    Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), 
    Topic:   "topic-A",
    Balancer: &kafka.Hash{},
    Transport: &kafka.Transport{
        TLS: &tls.Config{},
      },
    }

Using kafka.NewWriter

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},
}

w := kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
	Topic:   "topic-A",
	Balancer: &kafka.Hash{},
	Dialer:   dialer,
})

Note that kafka.NewWriter and kafka.WriterConfig are deprecated and will be removed in a future release.

SASL Support

You can specify an option on the Dialer to use SASL authentication. The Dialer can be used directly to open a Conn or it can be passed to a Reader or Writer via their respective configs. If the SASLMechanism field is nil, it will not authenticate with SASL.

SASL Authentication Types

mechanism := plain.Mechanism{
    Username: "username",
    Password: "password",
}
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

Connection

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")

Reader

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092","localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})

Writer

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

// Transports are responsible for managing connection pools and other resources,
// it's generally best to create a few of these and share them across your
// application.
sharedTransport := &kafka.Transport{
    SASL: mechanism,
}

w := kafka.Writer{
	Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:     "topic-A",
	Balancer:  &kafka.Hash{},
	Transport: sharedTransport,
}

Client

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

// Transports are responsible for managing connection pools and other resources,
// it's generally best to create a few of these and share them across your
// application.
sharedTransport := &kafka.Transport{
    SASL: mechanism,
}

client := &kafka.Client{
    Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
    Timeout:   10 * time.Second,
    Transport: sharedTransport,
}

Reading all messages within a time range

startTime := time.Now().Add(-time.Hour)
endTime := time.Now()
batchSize := int(10e6) // 10MB

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    Topic:     "my-topic1",
    Partition: 0,
    MaxBytes:  batchSize,
})

r.SetOffsetAt(context.Background(), startTime)

for {
    m, err := r.ReadMessage(context.Background())

    if err != nil {
        break
    }
    if m.Time.After(endTime) {
        break
    }
    // TODO: process message
    fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
}

Logging

For visiblity into the operations of the Reader/Writer types, configure a logger on creation.

Reader

func logf(msg string, a ...interface{}) {
	fmt.Printf(msg, a...)
	fmt.Println()
}

r := kafka.NewReader(kafka.ReaderConfig{
	Brokers:     []string{"localhost:9092", "localhost:9093", "localhost:9094"},
	Topic:       "my-topic1",
	Partition:   0,
	Logger:      kafka.LoggerFunc(logf),
	ErrorLogger: kafka.LoggerFunc(logf),
})

Writer

func logf(msg string, a ...interface{}) {
	fmt.Printf(msg, a...)
	fmt.Println()
}

w := &kafka.Writer{
	Addr:        kafka.TCP("localhost:9092"),
	Topic:       "topic",
	Logger:      kafka.LoggerFunc(logf),
	ErrorLogger: kafka.LoggerFunc(logf),
}

Testing

Subtle behavior changes in later Kafka versions have caused some historical tests to break, if you are running against Kafka 2.3.1 or later, exporting the KAFKA_SKIP_NETTEST=1 environment variables will skip those tests.

Run Kafka locally in docker

docker-compose up -d

Run tests

KAFKA_VERSION=2.3.1 \
  KAFKA_SKIP_NETTEST=1 \
  go test -race ./...

(or) to clean up the cached test results and run tests:

go clean -cache && make test

kafka-go's People

Contributors

abuchanan-nr avatar achille-roussel avatar aultimus avatar bgranvea avatar bhavanki avatar dominicbarnes avatar dtjm avatar jnjackins avatar jonasrmichel avatar kenju avatar kikyomits avatar kishaningithub avatar live-wire avatar maxence-charriere avatar mhmtszr avatar nickuraltsev avatar nlsun avatar petedannemann avatar pryz avatar rhansen2 avatar sagarkrkv avatar savaki avatar scarbo87 avatar shanesaww avatar stevevls avatar thehydroimpulse avatar victordenisov avatar vmihailenco avatar vrischmann avatar yolken-segment avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-go's Issues

How stable is kafka-go?

I love the idea, design and documentation, looks like a software gem, but can it be used in production? Can you give brief description of some usage examples?

Also would be nice to have #103...

WriteMessages blocks indefinitely without returning error

I have an application that writes to a Kafka broker. It is configured with most of the default configuration in place like so:

config := kafka.WriterConfig{
	Brokers: brokerlist,
	Topic:   topic,
}

w := kafka.NewWriter(config)

I've observed w.WriteMessages(ctx, msgs) block without any apparent reason. I've been looking at the code to understand the underlying implementation. In what cases would this function block forever? If any maintainers can help diagnose this with me, it would be greatly appreciated.

Any timeline on when v1 will be out?

I am looking for a solution like kafka-go, but don't want to invest in it right now as there isn't a stable version of it available yet. Is there a timeline for v1?

what if I use both async and RequiredAcks for Writer?

	WebsocketKafkaWriter = kafka.NewWriter(kafka.WriterConfig{
		...
                 ...
		
                 RequiredAcks: -1,
		Async:        true,
	})

will config like this make Writer async(batch sending the messages), and at the same time, when sending the messages, all the msgs need ack from all replicas?

Support for Gzip Compression

Related to #82

I have a Kafka stream that I need to read from that is compressed with gzip. I would prefer to continue using this package as it's API is much nicer than the others and it doesn't require C Go. Would there be any interest in getting this added?

reader: exposing first and last offset constants

Our apps read auto-offset-reset from config (i.e. Consul) which comes in as latest or earliest: i.e. to mimic an official Kafka config.

I could create a connection object and do ReadFirstOffset() / ReadLastOffset() on it to later set this offset on a Reader, but it is simpler just to pass -2 or -1 to Reader.SetOffset() to communicate startWithLast / startWithFirst offset.

-1 and -2 are of course leaky and "bad magic" from the Kafka protocol, and it'd be great to use the same constants that Reader uses:

kafka-go/reader.go

Lines 15 to 18 in d5ac6b5

const (
firstOffset = -1
lastOffset = -2
)

Hence I thought it'd be useful to open them for external visibility. What do you think?

[savaki/kafka-go/consumer-groups] Discussion

@savaki

I'm opening this issue as communication thread on the consumer groups implementation that you're working on there https://github.com/savaki/kafka-go/tree/consumer-groups

First of all, thanks a lot of taking time to work on this, it's a highly demanded feature that I'm sure a lot will benefit from.

Here are some feedback I had on your work:

  • All types that represent structured protocol messages (like DescribeGroupsRequestV1 for example) must be non-exported. One of the main goals of the kafka-go project is to hide away implementation details of the kafka protocol into high-level concepts that fit well into Go programs. We must aim for exposing the minimal public API so the package can focus on useful and programmer-friendly concepts. Take a look at how the other protocol messages are implemented for example.

  • The file naming that you used is not following the current scheme, aim for shorter names with no underscores (the way it was done for all other files). For example, api_describe_groups.go could likely be renamed describegroups.go to match the current style.

  • The naming scheme you used with the api prefix for method names like apiListGroups, apiLeaveGroup seems unnecessary. Simply naming those methods listGroups, leaveGroups would make the code shorter and more expressive.

  • It is more common in Go to declare duration constants as <value> * <unit> instead of the opposite. The reason for this is better readability because 30 * time.Second kins of read like natural English "30 seconds". For example defaultSessionTimeout = time.Second * 30 would be better written defaultSessionTimeout = 30 * time.Second.

  • This is a considerable amount of work, what do you think about splitting it into two steps? First adding all the protocol messages with tests, then adding high-level APIs and actual consumer group implementation. That way we can better focus on the changes being made, ease review and chance of producing quality software.

  • What do you think about supporting consumer groups in the Reader type instead of introducing a new public API (ConsumerGroup). It seems like we could simply add a Group string property to the ReaderConfig to configure the reader as being part of a group instead of directly reading from a partition. This may be less flexible, for example we lose the ability to read from multiple topics, or from multiple static partitions, but as a first step I think we're better off focusing on reading from a single topic and keeping the API as simple as possible. This would have the benefit of reusing the existing public API and a bunch of the Reader implementaiton, making it easier for programs based on kafka-go to introduce support for consumer groups without having to create abstractions if they need to support both.

consumer group + get back to last not commited offset

"(*Reader).SetOffset will return an error when GroupID is set" is there any way to get back to old offset also with using consumer groups? I've described problem here: https://stackoverflow.com/questions/49360325/kafka-consumer-group-loses-not-commited-messages

in short: lets say we have 4 messages. 3rd was not commited ( some error ), polling will fetch 4th message, process it with success and commit offset which effectively will mean that 3rd message will never ever be read again. Is there any way to get back to last not commited offset and try to process again? Even if that means pausing consumer and retry couple of times. Is there a solution for that in kafka-go?

proposal: support for snappy

I've done some digging through the docs, and while I've had better luck with this library than a few others but one thing I've noticed it seems to be lacking is support for snappy.

Most of the Kafka clusters I work with are using snappy by default for compression, so the ability to decompress messages coming from the reader and compress outgoing messages from the writer directly through this library would be great.

I'm just getting started with Go, so I'm not sure I'd be the best person to implement it, but I know the c headers are pretty readily available so it probably should not be terribly difficult...

Thoughts on this?

ReadFirstOffset in version [kafka_2.11-1.0.1], got incorrect offset

when I test commit offset explicitly, make a test case that's do not commit offset after call the FetchMessage. I think if restart the consumer, will consume the message that not committed,but the consumer will got new messages which offset than last commited offset

Document the working of QueueCapacity/BatchSize/BatchTimeout

I've been using the Writer with a deadlined Context and I'm unsure how to configure it to allow batching while not blocking for the whole deadline duration.

If I set a BatchSize to 1, all is well but the load on the brokers and the network traffic is considerable. It's greatly reduced by setting BatchSize to another number - like 2, however WriteMessages blocks for the duration of the deadline (300ms here)

capture d ecran 2018-04-19 a 12 56 30

I believe my understanding of the documentation is wrong?

Unless the writer was configured to write messages asynchronously, the method blocks until all messages have been written, or until the maximum number of attempts was reached.

There's about 1.3k WriteMessages per second in this test, with BatchSize set to 2 and RequiredAcks set to 1.

Some doubts!

Sorry, I have no idea about the passing by pointers will cause large memory alloc.
Passing values will copy the memory but without increasing the reference count.
Passing pointers will copy the reference but without memory copy of it's data.
Need help!

proposal: refresh consumer groups

As I understand it, the max age of an offset stored by Kafka is defined by the broker via offsets.retention.minutes. If this value is -1, then the consumer is allowed to specify the offset retention time. If you're working with your own brokers, then this is fine.

However, if you're working with a hosted kafka provider like confluent, heroku, or aiven, this isn't so good as you don't have control over broker configuration. The proposed solution is to have the consumer write its offsets, even when they haven't changed.

The question for me is what the frequency should be. Seems like there are two logical options:

  1. Reuse CommitInterval to specify the consumer should commit offsets that frequently regardless of whether new messages have been received. So every CommitInterval, all the known offsets are committed.

  2. Specify another parameter, CommitKeepAliveInterval, that specifies the interval for this task.

I lean towards the later, creating a CommitKeepAliveInterval.

Internally, kafka uses a topic to keep track of offsets. So if we had W consumer groups over X topics with Y partitions publishing at 1s retained for Z days, the broker (when restarted) would have to playback roughly W * X * Y * Z * 86400 offsets before being ready. (yes, the math is wrong, but you get the idea)

How can I list topics?

Hello, good work with this library. My question is self-described, I want to list all the available topics and their partitions, or all partitions -> topics, like the Client.Topics() of sarama package. Thank you.

WriteMessages should allow Kafka broker to route Messages to partitions

Currently, it appears that both Conn and Writer use client side routing of messages to specific partitions. Can I suggest that either Writer or Conn have the option to allow the broker to do the routing.

Many users of Kafka (including us) use the key to ensure messages with the same key arrive on the same partition. This is critical for many data replication cases where ordering matters. While we could attempt to mirror the default kafka consistent hashing strategy, there are edge cases where the number of partitions is being changed where the client may route incorrectly because of lack of information.

My proposal is to add a special case Balancer type to indicate messages should be routed by the broker.

WriteMessages should return offset

The Produce API response returns the offset at which a batch of messages was written, but this is not an information we currently expose to the program, even tho it is very valuable.

I'm thinking of introducing a method with one of those signatures, and I'd like to get some feedback on which one you think may be the best approach:

func (*Conn) WriteMessagesAt(...Message) (nbytes int, offset int64, err error)
func (*Conn) WriteFull(...Message) (nbytes int, offset int64, time time.Time, throttle time.Duration, err error)
func (*Conn) Produce(req *ProduceRequest) (res *ProduceResponse, err error)

I want to avoid breaking backward compatibility even if we're pre-1.0.

Happy to here your thoughts.

ConsumerGroup consuming only one partition. Also several consumer consuming same partition

I tried to make the smallest example of the bug.

  1. Create a topic with 2 partitions:
    ./bin/kafka-topics.sh --zookeeper localhost:2181 --topic 2part --create --partitions 2 --replication-factor 1
  2. Publish consecutive messages to the topic
  3. Consumer:
package main

import (
	"context"
	"fmt"
	"time"

	kafka "github.com/segmentio/kafka-go"
)

func main() {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{"localhost:9092"},
		GroupID: "sample",
		Topic:   "2part",
	})

	for {
		m, err := reader.FetchMessage(context.Background())
		if err != nil {
			continue
		}
		time.Sleep(1 * time.Second)
		fmt.Printf("P/O: %v/%v; %v: %v\n", m.Partition, m.Offset, string(m.Key), string(m.Value))

		reader.CommitMessages(context.Background(), m)
	}
}

Run 1 consumer:

$ ./test-consumer
P/O: 0/0; bd1f:	    Message number 0
P/O: 0/1; bd1f:	    Message number 2
P/O: 0/2; bd1f:	    Message number 4
P/O: 0/3; bd1f:	    Message number 6
P/O: 0/4; bd1f:	    Message number 8
P/O: 0/5; bd1f: 	Message number 10

Since there is only one consumer, he should read all the messages, not just the messages from one partition.
Then, when I run a second consumer:

$ ./test-consumer
P/O: 1/0; bd1f:	Message number 1
P/O: 1/1; bd1f:	Message number 3
P/O: 1/2; bd1f:	Message number 5
P/O: 1/3; bd1f:	Message number 7
P/O: 1/4; bd1f:	Message number 9

Finally, I run a third consumer. Here are the logs (trimmed) of consumer 1, 2 and 3:

Consumer 1:
P/O: 0/26; bd1f:	Message number 52
P/O: 0/27; bd1f:	Message number 54
P/O: 1/12; bd1f:	Message number 25
P/O: 1/13; bd1f:	Message number 27
P/O: 1/14; bd1f:	Message number 29
P/O: 1/15; bd1f:	Message number 31
P/O: 1/16; bd1f:	Message number 33
P/O: 1/17; bd1f:	Message number 35
Consumer 2:
P/O: 1/12; bd1f:	Message number 25
P/O: 1/13; bd1f:	Message number 27
P/O: 1/14; bd1f:	Message number 29
P/O: 1/15; bd1f:	Message number 31
P/O: 1/16; bd1f:	Message number 33
P/O: 1/17; bd1f:	Message number 35
P/O: 1/18; bd1f:	Message number 37
...
P/O: 1/25; bd1f:	Message number 51
P/O: 1/26; bd1f:	Message number 53
P/O: 1/27; bd1f:	Message number 55
P/O: 1/28; bd1f:	Message number 57
P/O: 1/29; bd1f:	Message number 59
P/O: 1/30; bd1f:	Message number 61
P/O: 1/31; bd1f:	Message number 63
Consumer 3:
P/O: 0/26; bd1f:	Message number 52
P/O: 0/27; bd1f:	Message number 54
P/O: 0/28; bd1f:	Message number 56
P/O: 0/29; bd1f:	Message number 58
P/O: 0/30; bd1f:	Message number 60
P/O: 0/31; bd1f:	Message number 62

As you can see, consumer1 is getting some messages already processed by consumer 2 and 3.

Same test with kafka-console-consumer:
Run 1 consumer:

# Consumer 1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 2part --group aaa
Message number 1
Message number 2
Message number 3
Message number 4
Message number 5
Message number 6
Message number 7
Message number 8
Message number 9

Starting a 2nd consumer:

# Consumer 2
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 2part --group aaa
Message number 11
Message number 13
Message number 15
Message number 17
Message number 19

Meanwhile, on consumer 1:

# Consumer 2
...
Message number 7
Message number 8
Message number 9
Message number 10
Message number 12
Message number 14
Message number 16
Message number 18

As you can see, when there was only one consumer, he consumed all the messages. When the second consumer appeared, each started consuming from one partition only.
If you stop consumer 2, consumer 1 starts reading from both partitions again.
If instead of stoping consumer 2, you add a third consumer, it may start consuming but one of the other 2 stops.

go dep and kafka-go

For project which uses kafka-go dep init results into:

[[constraint]]
  branch = "master"
  name = "github.com/segmentio/kafka-go"

Whereas sarama:

[[constraint]]
  name = "github.com/Shopify/sarama"
  version = "1.16.0"

Don't you have an intent to pin down first version somehow?

-1 and -2 are used inconsistently to mean "first offset" or "last offset"

See:

 // ReadFirstOffset returns the first offset available on the connection.
 func (c *Conn) ReadFirstOffset() (int64, error) {
       return c.readOffset(-2)
 }

And:

const (
       firstOffset = -1
       lastOffset  = -2
)

And:

// SetOffset changes the offset from which the next batch of messages will be
// read.
//
// Setting the offset ot -1 means to seek to the first offset.
// Setting the offset to -2 means to seek to the last offset.

And sometimes -1 means something else entirely:

// Offset returns the current offset of the reader.
func (r *Reader) Offset() int64 {
	if r.useConsumerGroup() {
		return -1
	}

Extract the message timestamp from Kafka into Message.Time in Reader

The type kafka.Message has a field for Time.

type Message struct {
    // Topic is reads only and MUST NOT be set when writing messages
    Topic string

    // Partition is reads only and MUST NOT be set when writing messages
    Partition int
    Offset    int64
    Key       []byte
    Value     []byte

    // If not set at the creation, Time will be automatically set when
    // writing the message.
    Time time.Time
}

When using a Reader the time in the message itself is not extracted and Message.Time is set to zero.

message timestamp is 1970-01-01 00:00:00 +0000 UTC

README examples don't compile

this example:

// to consume messages
conn, _ := kafka.DialLeader(ctx, "tcp", "localhost:9092")

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(1e6) // fetch 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    n, err := batch.Read(b)
    if err != nil {
        break
    }
    fmt.Println(string(b))
}

batch.Close()
conn.Close()

I get the following error:

not enough arguments in call to kafka.DialLeader
	have (context.Context, string, string)
	want (context.Context, string, string, string, int)

Test

Testing Slack notifications

proposal: WriterStats

Is there any particular reason there isn't an analog to the ReaderStats type for Writers (e.g., WriterStats)? Statistics tracking on the writer side would be helpful for applications that both produce and consume messages using this library.

allow Writer to publish to any topic

Now that Message supports Topic, it seems like there's no reason not to allow WriteMessages to publish to more than one topic. Until the next major release, we could leave the public api alone and allow topics passed directly on the message to override the default topic.

Thoughts?

Reader RetentionTime validation comparing against wrong int type

Hey,

I have a use-case where I need set the Reader RetentionTime setting to a non-default value that is bigger than the default. However the setting seems to be capped by max value for int32 when in fact internally the value sent with OffsetCommit request is using int64 for retention time (which is consistent with the spec).

This is limiting the effective maximum for RetentionTime to something between 24 and 25 days, because when using value time.Hour * 24 * 24 everything still works, but with RetentionTime: time.Hour*24*25 I get a panic saying "panic: RetentionTime out of bounds: 2160000000000000"

Can someone tell me if I've understood something wrong here or is this actually an issue with the NewReader(config ReaderConfig) validating the retention time option incorrectly.

Ps. Thanks a lot for this library, it's been a big help

race condition writing to closing (*reader).msgs

Managed to hit a scenario where reader attempted to write to its internal msgs channel after it was closed. It seems like the simplest way to remove this race is to not close msgs. closing a channel doesn't free up its resources any sooner or cause a leak. From examining the code, it doesn't appear that anyway it waiting for a close signal.

panic on writing on a high load situation

Hi,
trying to put preassure to kafka we had a panic using kafka-go:

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x8 pc=0x522d32]

goroutine 2824 [running]:
[...]/vendor/github.com/segmentio/kafka-go.(*Conn).Close(0x0, 0x0, 0xc420c538b0)
	[...]/vendor/github.com/segmentio/kafka-go/conn.go:135 +0x22
[...]/vendor/github.com/segmentio/kafka-go.(*writer).run(0xc420c1d110)
	[...]/vendor/github.com/segmentio/kafka-go/writer.go:475 +0x6d0
created by [...]/vendor/github.com/segmentio/kafka-go.newWriter
	[...]/vendor/github.com/segmentio/kafka-go/writer.go:418 +0x143

it seems the problem is here:

                       if conn, err = w.write(conn, batch, resch); err != nil {
                                conn.Close()
                                conn = nil
                        }

because w.write can return nil and an error, and it tries to close the conn without validating if its nil.

consuming: "invalid memory address or nil pointer dereference"

I am able to read messages from one broker, but not another (same Kafka version 0.10.2). I am able to consume messages from all using Clojure/Java.

Here is what I am getting with segmentio/kafka-go:

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x13788d2]

goroutine 85 [running]:
github.com/segmentio/kafka-go.(*Conn).SetReadDeadline(0x0, 0xbe7644d72f0448ba, 0x26204df38, 0x16aef00, 0xbe7644d72f0448ba, 0x26204df38)
	/../go/src/github.com/segmentio/kafka-go/conn.go:168 +0x22
github.com/segmentio/kafka-go.(*reader).read(0xc4204e8000, 0x16793e0, 0xc420064300, 0x0, 0x0, 0x0, 0x1675460, 0x14c3c60)
	/../go/src/github.com/segmentio/kafka-go/reader.go:692 +0xcc
github.com/segmentio/kafka-go.(*reader).run(0xc4204e8000, 0x16793e0, 0xc420064300, 0x0, 0xc4204e4440)
	/../go/src/github.com/segmentio/kafka-go/reader.go:585 +0x311
created by github.com/segmentio/kafka-go.(*Reader).start
	/../go/src/github.com/segmentio/kafka-go/reader.go:476 +0x219

how do I go about debugging this? Again, same consuming code with segmentio/kafka-go does work with one broker, but not with another. The only difference in config is a hostname.

Way to get batch messages and commit if the batch is successful

There is no way to implement the below flow

  1. Read batch messages from Kafka.
  2. Process all the messages.
  3. If all messages processing succesful - Commit the offset
  4. If any of the message processing fails - Do not commit the offset and read the same batch again and try to process the messages and if successful commit the offset.

Writer: Dropping messages if the topic doesn't exist

Currently the Writer is unable to use kafka's auto topic creation feature because it first queries the partition list which doesn't trigger the topic to be created. This means any messages written using WriteMessages are dropped silently.

message format issue

I'm grabbing messages off our kafka stream and noticed an issue with the message content. After decompressing the message (snappy encoded) the data appears to have some leading junk. In hex:

0 0 0 0 2 c3 56 83 0 0 5 23 67 c 0 4 0 0 ff ff ff ff 0 0 5 15

Any ideas what this is? I thought it might be a message batch as documented here:

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-CommonRequestandResponseStructure

But there are not enough bytes for that.

How could I set auto-offset-reset on restart the consumer

r := kafka.NewReader(kafka.ReaderConfig{
	Brokers:  []string{"Ip:9092"},
	GroupID:  "go_test",
	Topic:    "test",
	MinBytes: 10e3, // 10KB
	MaxBytes: 10e6, // 10MB
	//CommitInterval: time.Second,
})

When I init the NewReader ,how could I set auto-offset-reset is earliest on restart ?

Big messages disappearing

Hello,
If I'm writing a "too big" message with

err = w.WriteMessages(context.Background(),
kafka.Message{
Key: key,
Value: value,
},
)
if err != nil {
log.Errorln("WriteMessage", err.Error())
}

I'm not getting any errors, kafka and zookeeper don't show anything in the logs, and the message simply doesn't appear with kafka-console-consumer.sh.

Any idea how to fix/troubleshot that ?

failed to find any partitions for topic

I have to post messages for different topics so my send function looks like this:

func sendToKafka(msg syslogparser.LogParts) {
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  brokerFlags,
		Topic:    fmt.Sprintf("%s.logs", msg["tag"]),
		Balancer: &kafka.LeastBytes{},
	})

	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key: []byte("msg"),
			Value: []byte(
				fmt.Sprintf("some content"),
			),
		},
	)
	CheckError(err)
	w.Close()
}

For the first time new topic fails with failed to find any partitions for topic. Next time works just fine.

May be related to #53.

Unable to compress messages

This is likely user error but I'm not able to compress messages.

I've set them to use snappy with the exact code from README.md (have also tried lz4):

	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers: []string{"localhost:9092"},
		Topic:   "topic-A",
		CompressionCodec: snappy.NewCompressionCodec(),
	})

However, when the messages reach Kafka, they are not compressed:

screen shot 2018-09-14 at 4 02 59 pm

Code: https://gist.github.com/dkoston/aab79446dc411c9d125a412ea20802c4

Perhaps there's some broker configuration I need to do? I was under the impression that each message itself contains bits telling the broker what compression codec is used on it's contents and it's unnecessary to specify a codec at the broker level if you do so at the producer/writer.

If I add some debug to the client, I see that the messages are running through transform() so I'd expect them to be compressed with lz4 and marked as such:

$ go run main.go
2018/09/14 15:41:29 Using snappy for message compression
2018/09/14 15:41:29 Writing 10 messages to topic-A
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 Reading 10 messages from topic-A as group1
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 16:03:36 |----------|--------------|---------------|
2018/09/14 16:03:36 | Messages | Uncompressed | Compressed    |
2018/09/14 16:03:36 |----------|--------------|---------------|
2018/09/14 16:03:36 |       10 |           10 |             0 |
2018/09/14 16:03:36 |----------|--------------|---------------|

I added:

diff --git a/message.go b/message.go
index 2d8b775..12ed4c2 100644
--- a/message.go
+++ b/message.go
@@ -5,6 +5,7 @@ import (
        "bytes"
        "fmt"
        "time"
+       "log"
 )

 // Message is a data structure representing kafka messages.
@@ -54,9 +55,12 @@ func (msg Message) message() message {

 func (msg Message) encode() (Message, error) {
        if msg.CompressionCodec == nil {
+               log.Printf("[kafka-go]: not compressing")
                return msg, nil
        }

+       log.Printf("[kafka-go]: compressing")
+
        var err error
        msg.Value, err = transform(msg.Value, msg.CompressionCodec.Encode)
        return msg, err
@@ -65,6 +69,7 @@ func (msg Message) encode() (Message, error) {
 func (msg Message) decode() (Message, error) {
        c := msg.message().Attributes & compressionCodecMask
        if c == CompressionNoneCode {
+               log.Printf("[kafka-go]: not compressed: %v", c)
                return msg, nil
        }

@@ -73,6 +78,8 @@ func (msg Message) decode() (Message, error) {
                return msg, fmt.Errorf("codec %d not imported.", msg.CompressionCodec)
        }

+       log.Printf("[kafka-go]: compressed")
+
        var err error
        msg.Value, err = transform(msg.Value, codec.Decode)
        return msg, err

Bug: CommitMessages errors when committing without groupID for partition consumers

Description:

setting up a reader to consume on a topic:

cfg := kafka.ReaderConfig{
				Brokers:   cfg.Kafka.Brokers,
				Topic:     partition.Topic,
				Partition: partition.ID,
				MaxBytes:  10e6,
			}

r := kafka.NewReader(cfg)

Then after fetching messages and then going to commit like so:

// procesing of message
if err := p.R.CommitMessages(ctx, window...); err != nil {
   log.Println(err) // always gets error here
}

What I get is the following error message: not available when GroupID is set

I did not set a GroupId in my cfg. Upon further inspection it seems that inside the CommitMessage() method, the first line of the body is the following:

if !r.useConsumerGroup() { // line 1215 of reader.go
	return errNotAvailable
}

When I remove the ! from the if it runs as desired. Is this expected behavior? Is the err message wrong or is it the if statement that is wrong?

proposal: allow offsets to be specified by time instead of just -1, -2

As of kafka 0.10, specifying offsets by time is now possible. Would be nice to add support for this.

Here's our use case. We have a number of services that provide real time analytics. When the service boots, it reads content from Kafka and generates an in memory view of today stats. As new events arrive, the stats are updated. Currently, since there is no time based offset, we read from the old offset and replay, ignore all events that aren't today. To support that, we've made the time retention of the topic short. Instead, we'd like a longer time retention and the ability to say start from midnight.

My suggestion would be to add a SetOffsetTime to Reader.

Reader delay

Reader example gives ~5 seconds delay until first output appears. Can it be onfigured somehow? Sarama consumer prints "immediately".

proposal: expose ability to create topics

Currently, the only way to create a topic is to ensure kafka's auto.create.topics.enable is set to true (which it is by default) and then to publish a message on a non-existent topic (which will cause a panic).

This is workable, but kind of dirty feeling to me. If I want to ensure I don't pollute an existing topic with throwaway "create" messages, I need to first check if a topic exists before attempting to create it using this approach.

It would be much nicer if the internal createTopics mechanism were exposed to support this use case.

Q: What is the intended design for fast producers ?

Hello,
disclaimer : I'm pretty new to both go and kafka

While playing around with kafka-go, I noticed that the WriteMessages() is blocking until it finished delivering messages. Is the expected usage to manage a set of goroutines to ensure fast throughtput, or is there a method I missed in the API to do that ?

Thanks in advance,

Consumer Group - Self Balanced Consumers

First thanks for open sourcing this project. I couldn't agree more with your description in the opening readme. Thank you for brining a more idiomatic go solution to the Kafka/Go community!

My current project is highly dependent on Sarama Cluster because of its support for Consumer Groups with self balancing.

  • High availability of the Consumer group
  • Sharding of load during peak processing times.
  • Ensuring a single offset is read by one and only one consumer.

My simple experiments show that kafka-go/consu-go support Consumer Groups but fall back to the simple consumer(reader) by requiring the partition to be assigned by the client.

Is there any effort to support what Jay Kreps calls a High level Consumer inside of Kafka-go?

Your reader implementation with context support would be an awesome starting point. I would be happy to contribute to this effort.

Again thank you for your contribution. If I just can't read the code let me know!

optimization to share reader when reading multiple partitions from the same broker

consider allowing (*reader) to read from multiple partitions simultaneous.

Consider the following use case. 3 brokers, 1 topic with with 6 partitions. In this scenario at least 2 readers are reading from the same broker albeit from separate topics. It seems like this places unnecessary load on the brokers and could be resolved by allowing a reader to read from more than one partition.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.