Comments (25)
Glad to hear you guys are using goka in production.
What version of sarama and sarama-cluster are you using? sarama-cluster changed their interface in the past and broke goka, so that can be happening again.
from goka.
We're using the latest on master branch for sarama-cluster. Commit is d98592677c0aa08d8aafb882d344fb461682b722
.
from goka.
@db7 is there a known good version of sarama-cluster that I could try to rule that out?
from goka.
Also I'm using goka from commit d58a3143ca343a51a72459a6de8ad138c219c546
, just before the Processor.Run(context) change was added.
from goka.
We are currently using sarama-cluster 24016d206c730276dfb58f802999066f2f4bfeaa
which should be good with the revision of Goka you are running. Could you try with the given revision of sarama-cluster and report back whether you still have the same issue?
from goka.
Downgraded sarama-cluster to 24016d206c730276dfb58f802999066f2f4bfeaa
as suggested, but still seeing the same behaviour.
from goka.
I put some debug prints in the sarama-cluster code and found out the errors happens during rebalance. The error is:
kafka server: A rebalance for the group is in progress. Please re-join the group.
I think this may have to do with my running 3 processors that all use the same group name. One processor reads from topic A and outputs a rekeyed version to topic Aprime. The second processor reads from topic B and outputs a rekeyed version to topic Bprime. The final processor reads from Aprime and Bprime and persists to a group table. I wonder if because these all use the same group name they somehow conflict while rebalancing is happening? Not exactly sure how the rebalancing works.
from goka.
That is a problem. When you call DefineGroup(groupName, ...)
you are describing how the processor group groupName
will behave and which topics instances of this group will consume. If your processors are not instances of the same group, they must have different names.
From what you described, it sounds like these are indeed different processor groups and should get unique names. Can you try renaming them?
from goka.
I tried changing the group names for the 2 rekeyers, so now all 3 processors are using unique group names. Unfortunately I'm still seeing the same behaviour.
from goka.
So, in addition to using separate group names, I also put a 1 minute delay between starting the processors so that the rebalancing can finish before starting the next. Still running into this issue. I'm not really sure what else to try or investigate.
from goka.
The rebalancing matters for processor instances in the same group. From your description it seems that you have one running instance for each of these 3 processor groups.
What Kafka version are you running?
from goka.
In sandbox I'm only running one instance, but in production I'm running two. I don't think the issue has to do with multiple instances running, since it happens in the sandbox environment sometimes too. I'm pretty sure the number of partitions is what increases the chances of it occurring. I will decrease the instances to 1 in production to see if that theory holds true.
I'm running Kafka 0.10.2.1.
Here is how I'm defining my groups.
g := goka.DefineGroup(goka.Group(group),
goka.Input(goka.Stream(topicA), codec, rekeyer.Rekey),
goka.Output(goka.Stream(topicAPrime), codec),
)
g := goka.DefineGroup(goka.Group(group),
goka.Input(goka.Stream(topicB), codec, rekeyer.Rekey),
goka.Output(goka.Stream(topicBPrime), codec),
)
g := goka.DefineGroup(goka.Group(group),
goka.Input(goka.Stream(topicAPrime), &codec.ACodec{}, aHandler.HandleEvent),
goka.Input(goka.Stream(topicBPrime), &codec.BCodec{},
bHandler.HandleEvent),
goka.Persist(&codec.StatusCodec{}),
)
from goka.
Ok, that looks normal (as long as group
variable is different for the 3 groups). We are also using Kafka 0.10.2. Most of our topics have 20 partitions.
Are you also using this Kafka cluster with other tools/applications at the moment or this set up only for this application?
from goka.
Yes, the group variable is different for each one.
We have a lot of consumers/producers using Kafka (some Go using Sarama/Sarama-cluster, and some in Scala), this is our first one using goka in production, though I've tested a simpler application with goka in sandbox before without issue. The simpler app just had a single processor with one input and persisted group table.
from goka.
An update on this issue. Apparently the combination of changing the group names for each processor AND adding a sleep between starting each processor did work. I thought it didn't work because the service was still dying, but that was due to a separate issue (out of memory).
I'm still not certain why a sleep was required between processors starting if they were using separate group names though.
from goka.
It would be great if you'd let us know once you find it out... adding a sleep between starting each processor shouldn't be necessary even if they run in the same program. (Each processor creates its own sarama client as long as you don't overwrite the consumer builder option)
from goka.
This bug has reappeared again. Even with the different group names and sleeps between starting the processors. From looking at what's going on with the topics involved, both rekeyer processors (the ones with a single input / single output) seem to be starting up fine and rekeying events (producing to the output topics), but nothing is getting produced to the group table. It seems to happen 30 seconds after the final processor starts.
from goka.
Added some more debugging in sarama-cluster
and revealed some things.
During the rebalance, joinGroup
is called on the sarama-cluster consumer which then calls broker.joinGroup()
which returns an i/o timeout error:
read tcp 192.168.254.154:60031->10.16.210.26:9092: i/o timeout
I'm not certain whether this is a goka issue, sarama-cluster issue, or something up with our kafka cluster. I'm hesitant to say it's our kafka cluster since we have tons of consumers and producers running on it with no issue. A lot of these consumers/producers use sarama + sarama-cluster as well with no issue.
from goka.
To dig a bit further in the problem, you could start both rekeyer processors in one program and after they run, you could start another program that only consume (and drop) the rekeyed messages using sarama-cluster only but with the same group name you used in goka processor. Then you could observe the rebalancing notifications. I think that would help to see whether the problem also appears without goka.
An alternative to the sarama-cluster is to consume directly with goka's wrapper for sarama-cluster. Something like this should do it:
package main
import (
"fmt"
"github.com/lovoo/goka/kafka"
)
func main() {
brokers := []string{"localhost:9092"}
group := "test"
inputStream := "some-topic"
c, err := kafka.NewSaramaConsumer(brokers, group, kafka.NewConfig())
if err != nil {
panic(err)
}
defer c.Close()
if err = c.Subscribe(map[string]int64{inputStream: -1}); err != nil {
panic(err)
}
for e := range c.Events() {
switch e := e.(type) {
case *kafka.Assignment:
for p, o := range *e {
fmt.Println("adding", p, "offset", o)
c.AddGroupPartition(p)
}
default:
fmt.Println(e)
}
}
}
Hope this helps.
from goka.
I'll close this issue assuming it's done. Please reopen if you still have this issue.
from goka.
@db7 we're now running Goka in production at my company and I'm starting to see a lot of these errors in my processors that leverage processor tables for persistence. As an example, I have a Kafka topic with 20 partitions that I'm consuming, and I'm using Kubernetes to deploy 20 replicas of a processor using the same processor group name. The processor table has 20 partitions as well.
What can I do to help get some additional insight into what's going on here so I can try to help fix it? Things seem to work OK if I only run one replica of a processor, but I'm needing to scale out processors to handle load.
It's also worth mentioning that this only seems to happen with some of my processors that are leveraging a processor table for persistence. Other processors, that are also scaled out to many replicas, seem to work just fine. The ones that don't seem to have a problem are ones that consume a table that's not written to much, while the ones that are having trouble seem to be ones that are consuming a table that's written to quite often. In the case of being written to often, I'm seeing this with both producers that write to the table themselves as well as producers who read from their processor table, but items are written to said processor table by other emitters.
from goka.
@activeshadow, I lost track of your message. Is this still a problem? If yes, please reopen this ticket.
from goka.
@db7 I'm seeing the issue @activeshadow describes, also in a group that writes to a table with high write load. The rest of my consumer groups are doing fine, running in the same go program.
I'm running goka v0.1.3-6-g55b1948
in production on a Kafka 0.1.1 cluster. I realize there's a newer version that modifies the consumer group code, do you think it would help to upgrade?
from goka.
@frairon, maybe this issue should be reopened.
@invernizzie sorry, I am not following the project at the moment.
from goka.
@invernizzie, since it's very hard to debug the old code to reproduce those errors, it would be very helpful if you could update and let us know if the problem still occurs. Just reopen the ticket in this case.
As we have seen that error in the old version too, but not in the new one, I'm quite positive that it'll be fixed for you as well.
from goka.
Related Issues (20)
- Link to blog post is dead HOT 1
- Processor calls wrong decode codec on message receipt HOT 1
- Any way allow to customize key of KTable HOT 1
- bug error setting up for partition HOT 3
- how to use Goka to move data from group table state to different storage system HOT 4
- EmitSync stuck in channel HOT 3
- Cannot connect to broker if the program is on docker container HOT 3
- Kafka server: Message was too large, server rejected it to avoid allocation error HOT 3
- How to modify value in Group table? HOT 4
- What storage I should use for processor and view? HOT 4
- How to fill up local cache with messages from a custom offset on start up? HOT 1
- Convert to using github.com/IBM/sarama HOT 1
- [question] Azure EventHub HOT 1
- no automatic reconnection processor ? HOT 1
- LevelDB - No space left on device HOT 1
- "panic: sync: negative WaitGroup counter" in PartitionProcessor.VisitValues HOT 8
- Behavior of View Sync HOT 1
- Configuring Sarama's initial offset to OffsetOldest leads the existing group to reprocess topic messages from start HOT 3
- Writing to group table from asynchronous function; using ctx.SetValue causes a WaitGroup panic
- Is there a way to get a list of the keys associated with a processor instance? HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from goka.