Git Product home page Git Product logo

Comments (25)

db7 avatar db7 commented on May 31, 2024

Glad to hear you guys are using goka in production.

What version of sarama and sarama-cluster are you using? sarama-cluster changed their interface in the past and broke goka, so that can be happening again.

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

We're using the latest on master branch for sarama-cluster. Commit is d98592677c0aa08d8aafb882d344fb461682b722.

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

@db7 is there a known good version of sarama-cluster that I could try to rule that out?

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

Also I'm using goka from commit d58a3143ca343a51a72459a6de8ad138c219c546, just before the Processor.Run(context) change was added.

from goka.

SamiHiltunen avatar SamiHiltunen commented on May 31, 2024

We are currently using sarama-cluster 24016d206c730276dfb58f802999066f2f4bfeaa which should be good with the revision of Goka you are running. Could you try with the given revision of sarama-cluster and report back whether you still have the same issue?

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

Downgraded sarama-cluster to 24016d206c730276dfb58f802999066f2f4bfeaa as suggested, but still seeing the same behaviour.

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

I put some debug prints in the sarama-cluster code and found out the errors happens during rebalance. The error is:

kafka server: A rebalance for the group is in progress. Please re-join the group.

I think this may have to do with my running 3 processors that all use the same group name. One processor reads from topic A and outputs a rekeyed version to topic Aprime. The second processor reads from topic B and outputs a rekeyed version to topic Bprime. The final processor reads from Aprime and Bprime and persists to a group table. I wonder if because these all use the same group name they somehow conflict while rebalancing is happening? Not exactly sure how the rebalancing works.

from goka.

db7 avatar db7 commented on May 31, 2024

That is a problem. When you call DefineGroup(groupName, ...) you are describing how the processor group groupName will behave and which topics instances of this group will consume. If your processors are not instances of the same group, they must have different names.

From what you described, it sounds like these are indeed different processor groups and should get unique names. Can you try renaming them?

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

I tried changing the group names for the 2 rekeyers, so now all 3 processors are using unique group names. Unfortunately I'm still seeing the same behaviour.

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

So, in addition to using separate group names, I also put a 1 minute delay between starting the processors so that the rebalancing can finish before starting the next. Still running into this issue. I'm not really sure what else to try or investigate.

from goka.

db7 avatar db7 commented on May 31, 2024

The rebalancing matters for processor instances in the same group. From your description it seems that you have one running instance for each of these 3 processor groups.

What Kafka version are you running?

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

In sandbox I'm only running one instance, but in production I'm running two. I don't think the issue has to do with multiple instances running, since it happens in the sandbox environment sometimes too. I'm pretty sure the number of partitions is what increases the chances of it occurring. I will decrease the instances to 1 in production to see if that theory holds true.

I'm running Kafka 0.10.2.1.

Here is how I'm defining my groups.

g := goka.DefineGroup(goka.Group(group),
		goka.Input(goka.Stream(topicA), codec, rekeyer.Rekey),
		goka.Output(goka.Stream(topicAPrime), codec),
	)

g := goka.DefineGroup(goka.Group(group),
		goka.Input(goka.Stream(topicB), codec, rekeyer.Rekey),
		goka.Output(goka.Stream(topicBPrime), codec),
	)

g := goka.DefineGroup(goka.Group(group),
		goka.Input(goka.Stream(topicAPrime), &codec.ACodec{}, aHandler.HandleEvent),
		goka.Input(goka.Stream(topicBPrime), &codec.BCodec{},
			bHandler.HandleEvent),
		goka.Persist(&codec.StatusCodec{}),
	)

from goka.

db7 avatar db7 commented on May 31, 2024

Ok, that looks normal (as long as group variable is different for the 3 groups). We are also using Kafka 0.10.2. Most of our topics have 20 partitions.

Are you also using this Kafka cluster with other tools/applications at the moment or this set up only for this application?

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

Yes, the group variable is different for each one.

We have a lot of consumers/producers using Kafka (some Go using Sarama/Sarama-cluster, and some in Scala), this is our first one using goka in production, though I've tested a simpler application with goka in sandbox before without issue. The simpler app just had a single processor with one input and persisted group table.

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

An update on this issue. Apparently the combination of changing the group names for each processor AND adding a sleep between starting each processor did work. I thought it didn't work because the service was still dying, but that was due to a separate issue (out of memory).
I'm still not certain why a sleep was required between processors starting if they were using separate group names though.

from goka.

db7 avatar db7 commented on May 31, 2024

It would be great if you'd let us know once you find it out... adding a sleep between starting each processor shouldn't be necessary even if they run in the same program. (Each processor creates its own sarama client as long as you don't overwrite the consumer builder option)

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

This bug has reappeared again. Even with the different group names and sleeps between starting the processors. From looking at what's going on with the topics involved, both rekeyer processors (the ones with a single input / single output) seem to be starting up fine and rekeying events (producing to the output topics), but nothing is getting produced to the group table. It seems to happen 30 seconds after the final processor starts.

from goka.

MahlerFive avatar MahlerFive commented on May 31, 2024

Added some more debugging in sarama-cluster and revealed some things.

During the rebalance, joinGroup is called on the sarama-cluster consumer which then calls broker.joinGroup() which returns an i/o timeout error:

read tcp 192.168.254.154:60031->10.16.210.26:9092: i/o timeout

I'm not certain whether this is a goka issue, sarama-cluster issue, or something up with our kafka cluster. I'm hesitant to say it's our kafka cluster since we have tons of consumers and producers running on it with no issue. A lot of these consumers/producers use sarama + sarama-cluster as well with no issue.

from goka.

db7 avatar db7 commented on May 31, 2024

To dig a bit further in the problem, you could start both rekeyer processors in one program and after they run, you could start another program that only consume (and drop) the rekeyed messages using sarama-cluster only but with the same group name you used in goka processor. Then you could observe the rebalancing notifications. I think that would help to see whether the problem also appears without goka.

An alternative to the sarama-cluster is to consume directly with goka's wrapper for sarama-cluster. Something like this should do it:

package main

import (
	"fmt"

	"github.com/lovoo/goka/kafka"
)
func main() {
	brokers := []string{"localhost:9092"}
	group := "test"
        inputStream := "some-topic"
	c, err := kafka.NewSaramaConsumer(brokers, group, kafka.NewConfig())
	if err != nil {
		panic(err)
	}
	defer c.Close()
	if err = c.Subscribe(map[string]int64{inputStream: -1}); err != nil {
		panic(err)
	}
	for e := range c.Events() {
		switch e := e.(type) {
		case *kafka.Assignment:
			for p, o := range *e {
				fmt.Println("adding", p, "offset", o)
				c.AddGroupPartition(p)
			}
		default:
			fmt.Println(e)
		}
	}
}

Hope this helps.

from goka.

db7 avatar db7 commented on May 31, 2024

I'll close this issue assuming it's done. Please reopen if you still have this issue.

from goka.

activeshadow avatar activeshadow commented on May 31, 2024

@db7 we're now running Goka in production at my company and I'm starting to see a lot of these errors in my processors that leverage processor tables for persistence. As an example, I have a Kafka topic with 20 partitions that I'm consuming, and I'm using Kubernetes to deploy 20 replicas of a processor using the same processor group name. The processor table has 20 partitions as well.

What can I do to help get some additional insight into what's going on here so I can try to help fix it? Things seem to work OK if I only run one replica of a processor, but I'm needing to scale out processors to handle load.

It's also worth mentioning that this only seems to happen with some of my processors that are leveraging a processor table for persistence. Other processors, that are also scaled out to many replicas, seem to work just fine. The ones that don't seem to have a problem are ones that consume a table that's not written to much, while the ones that are having trouble seem to be ones that are consuming a table that's written to quite often. In the case of being written to often, I'm seeing this with both producers that write to the table themselves as well as producers who read from their processor table, but items are written to said processor table by other emitters.

from goka.

db7 avatar db7 commented on May 31, 2024

@activeshadow, I lost track of your message. Is this still a problem? If yes, please reopen this ticket.

from goka.

invernizzie avatar invernizzie commented on May 31, 2024

@db7 I'm seeing the issue @activeshadow describes, also in a group that writes to a table with high write load. The rest of my consumer groups are doing fine, running in the same go program.

I'm running goka v0.1.3-6-g55b1948 in production on a Kafka 0.1.1 cluster. I realize there's a newer version that modifies the consumer group code, do you think it would help to upgrade?

from goka.

db7 avatar db7 commented on May 31, 2024

@frairon, maybe this issue should be reopened.
@invernizzie sorry, I am not following the project at the moment.

from goka.

frairon avatar frairon commented on May 31, 2024

@invernizzie, since it's very hard to debug the old code to reproduce those errors, it would be very helpful if you could update and let us know if the problem still occurs. Just reopen the ticket in this case.
As we have seen that error in the old version too, but not in the new one, I'm quite positive that it'll be fixed for you as well.

from goka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.