lovoo / goka Goto Github PK
View Code? Open in Web Editor NEWGoka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
License: BSD 3-Clause "New" or "Revised" License
Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
License: BSD 3-Clause "New" or "Revised" License
The current implementation of goka tester has some limitations:
In branch tester-refactoring
I started writing a new tester implementation.
The new tester features
tester.Consume(..)
has terminated, all processors and loops and are finished consuming.The interface will change slightly, assuming that the tester has not been used too much yet:
Starting and running the tester has not changed.
tester.ValueForKey("somekey")
This was possible because there was only one group table. Now we can have multiple, so you have to specify the topic too
tester.TableValue("group-table", "somekey")
tester.SetValue("somekey", "value")
// becomes
tester.SetTableValue("group-table", "somekey", "value")
tester.ExpectEmit("topic", "key", func(value []byte){
// decode the value and check what you need to check
})
// then to make sure nothing extra was emitted
tester.Finish(true)
Now we can do
key, value, hasNext := tester.NextMessage("topic")
if !hasNext{
// there was nothing more in this topic
}else{
// check what you need to check with key and value
}
// similar to check there were no more messages
tester.ExpectEmpty("topic")
The old ExpectEmit is still there to avoid too much broken tests when upgrading but imho it should be removed.
// #### EDIT: this is not possible anymore. The current kafka-interface defined by goka does not allow this kind of initialization without nasty hacks.
// tester.AddInitialState("group-table", "key", "value")
// then run the processor
// go proc.Run(ctx)
// then do the test, the processor will start with the table we initialized above
SetTableValue
likegkt := tester.New(t)
proc := goka.NewProcessor(...,
goka.WithTester(gkt),
)
go proc.Run(context.Background())
gkt.SetTableValue(<table>, <key>, <value>)
Documentation and Examples still need lots of polishing, but that's essentially how the new Tester would look like.
What do you think?
Still the issue when we get an error from kafka while recovering a view, it is not shutting down but simply blocking
Right now Goka checks for copartitioning all topics in the graph. I think it should not check that for lookup topics, since these are materialized completely in all instances of the application.
We have a use case when we want to have 10 partitions for input topic, but being able to do lookups from topics with only 1 partition.
Do you agree, or am I missing some situation where it would not be possible?
It would be great if Goka could natively provide primitives for windowed processing. It’s a pretty complex topic, but seems really needed in many stream processing workloads.
Kafka streams handles that by providing timestamp extractors to identify the actual time of the message, and modifying the record keys to be stored in the state store augmenting them with the window bucket.
Another approach that may help users to implement their own windowing is that if goka.Context would allow to get and set arbitrary values from the storage. I really don’t like it conceptually but it gives more freedom and is less complicated to implement.
I’m sure you were thinking about windowing in general. Do you have any clear idea on how it could be built into Goka?
Kafka is optimised around storing small messages. I'm currently using goka (very effectively) to aggregate a stream of transactions, however I'm concerned that the amount of data I store in my compacted topic will grow larger and larger if I start aggregating historical data too.
A working example:
I have a stream of transactions for users spending money.
I'm aggregating how much a user spends of all time.
I want to aggregate how much a user has spent per week.
I'm currently using a single compacted topic of userid -> user object (storing aggregated values).
To solve this problem in my mind, I need to split my dataset down into smaller pieces and use compound keys to store and retrieve historical data (e.g useridX.weekY
).
Is there a way I can do this with goka? It looks to me like there is no way to change the key stored in the compacted topic from the event key that comes from a stream.
The only thing I can think is to emit from my first processor into a new stream, and then have another processor that aggregates this into a weekly compacted topic. Seems like a lot of overhead and I'm not sure if there's a simpler way.
When loading a view and kafka gives the message
"response did not contain all the expected topic/partition blocks",
goka should tolerate the message and just keep going.
According to the linked issue, this is normal behavior.
An awesome work!
But goka is production ready? Any company using it in production env?
Currently, nil messages in streams are just ignored. I think it would be better to have a WithIgnoreNil()
option passed to the processor to ignore nil messages instead of passing them to the process callback.
Hi,
I have a question regarding the example-2 which pobpulates an rest api so one can access the number of clicks for a given user-id as key.
How would I implement a list of users with their number of clicks? ( basically a View.all() if this would even make sense ).
Or Would this be something I have to manage in a processor function? Like a slice of users with their id and clicks? Or is there a way to iterate over the keys? What would be the recommended way?
Any plans to extend the docs or examples for more advanced scenarios covering Lookup, Join, Loopback, etc?
Thanks for your feedback in advance.
Hi, looking through the library it looks like it doesn't support encrypted traffic to kafka:
https://docs.confluent.io/current/kafka/authentication_ssl.html
Would this be difficult to implement?
I'm running into an issue where upon starting up some goka processors, I get an error that looks like this:
kafka error: expected rebalance OK but received rebalance error
It appears to be coming from here:
Line 102 in b494605
I've noticed it never happens when I run it locally, happens sometimes when I run it in our sandbox environment, and happens consistently in our production environment. I'm thinking this may be because these environments differ in number of partitions per topic (i believe i'm using 2 locally, 8 in sandbox, and 32 in production).
Does anyone know what could be causing this or have some ideas of what I could check to investigate further?
Hi, how do you achieve consistency views and processors at N-nodes? Views can consume messages from the topic-tables at different rates.
In some cases, one wants to access the view contents even if connecting to Kafka is not possible or broken. We could add an option to views so that they don't close the databases when an error in the Kafka connection occurs. @sweigert
Docs says that Goka will read from the newest offset for streams (goka.Input
) and same thing appears in the log when processor starts (something like map[0:-1]).
Otherwise, for a freshly created topic, and for consumer group ID that never existed in this broker, my processor consumes all the messages from the beginning of the topic).
This is just to understand what is the correct behavior and fix docs if there’s an error.
Hi,
All is in the title.. I want to iterate on the view for a subset of key as with syndtr/goleveldb
iter := db.NewIterator(util.BytesPrefix([]byte("foo-")), nil)
for iter.Next() {
// Use key/value.
...
}
iter.Release()
err = iter.Error()
Can you give me some direction please ? I'm also keen to manage a PR for that if necessary.
This is really weird thing that started happening without any reason.
We have one processor that has 1 input, 1 join and 6 lookup edges in the group graph.
When processor starts, there's no error in the log and everything seems to be working fine. There are messages for corresponding views being opened and so on.
But for some reason consumer is not consuming any messages.
When we stop a processor (we handle SIGTERM and SIGINT gracefully, and then execute Stop()
on the processor) it throws some errors like: failing: error in view: Errors: view error opening partition ...: error removing partition .... : partition was not added
.
That does not depend on how long I wait after consumer is rebalanced. Actually those topics are empty on my local machine and same thing is happening.
Real mystery for me.
Parts of the web component still rely on the processor having a state to display several metrics.
I create a view that reads from table test:
view, err := goka.NewView([]string{"localhost:9092"}, goka.GroupTable(goka.Group("test")), new(codec.String))
if err != nil {
log.Fatalf("Error: %v", err)
return
}
When I create the second view I get an error (the instance of the two views are in different projects):
view2, err := goka.NewView([]string{"localhost:9092"}, goka.GroupTable(goka.Group("test")), new(codec.String))
if err != nil {
log.Fatalf("Error: %v", err)
return
}
Error:
Error creating local storage for partition 0: error opening leveldb: resource temporarily unavailable
exit status 1
Can anyone suggest me how to do that??
Thank you
We should review the way we are gathering metrics because they are currently introducing too much overhead. Main issues are:
I've noticed that a new consumer group does not consumer earlier messages. Its nice that a consumer group will retrieve missed messages while it was down, but this requires that the consumer group was connected at an earlier time. Is it possible that when a new group is defined, it starts at the first message in a topic?
Currently the Start()
method of processors and views use errgroup
to create goroutines for each partition and passes a context.Context
to them. The Stop()
method simply cancels the context to stop all goroutines.
I think it would be nice to use the same mechanism to control multiple processors and views running on the same program. For that I'd propose to replace the Start()
and Stop()
methods with a Run(context.Context)
method. The usage would be something like this:
// create the context and cancel function
ctx, cancel := context.WithCancel(context.Background())
// create error group
g, ctx := errgroup.WithContext(ctx)
// create processors and views
p, _ := goka.NewProcessor(brokers, graph)
...
// start processor and views passing the context
g.Go(func() error { return p.Run(ctx) })
// catch signals
go func() {
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
<-wait // wait for SIGINT/SIGTERM
cancel() // gracefully stop processors and views
}()
if err := g.Wait(); err != nil {
log.Fatalln(err)
}
Perhaps we could still support Start()
and Stop()
via some wrapper... something like this:
p, err := goka.StartStopper(goka.NewProcessor(brokers, graph))
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
go func() {
if err = p.Start(); err != nil {
log.Fatalf("error running processor: %v", err)
}
}()
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
<-wait // wait for SIGINT/SIGTERM
p.Stop() // gracefully stop processor
Any opinions?
Running a processor with a trailing "/" fails, because the node cannot be found in zookeeper.
Need some double-slash-elimination probably
reproduce:
I'm looking around in the documentation and I can't spot anything that talks about concurrency and goka.View. If I've initialized a goka.View may I access it concurrently, or must I either use a mutex or dedicated goka.View per goroutine?
let's print an error when it happens. Otherwise the user has to wait until the processor has shut down completely to see the errors. If that fails the is never shown, which can be annoying.
Right now, updates to group table's storage are published to Kafka from goka.Context#setValueForKey
.
It can be useful to embed this functionality directly into storage. This way one could create custom storage implementations that are also backed by Kafka and use it for other purposes, not only for working with group table.
It may be useful when you want to have some custom workload around storage, but don't want to care about it's safety on disk, and have it backed by Kafka.
Kafka Streams also does it this way. They basically have a wrapper around storage that adds changeling functionality to any storage. And then they allow to access storage from their Processor API in any point of the processing pipeline: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
On an empty kafka, running a processor fails, because the copartioning cannot detect a partition count to create the missing topics.
We should have a processor option defining the default partition count for that case.
Goka works with bsm/sarama-cluster up to at least version 2ae74ad075d4f088751b8c074f7e88c1020d7687. The latest version does not behave as expected and causes processors and views to crash.
This is rather a proposal for future versions, as it will be a breaking change.
The thing is that, message keys in Kafka are bytes, so can be not only simple strings, but also complex structures encoded as bytes.
For example we use Debezium (http://debezium.io) for doing Change Data Capture from MySQL databases, and Debezium uses table's primary key as message key for Kafka. It is commonly encoded with Avro as a struct like {"id": 1}
.
Right now in order to deal with this, we need to pass goka.UpdateCallback
everywhere, that would convert key string back to []byte
, deserialize it to struct and then store only the int value converted to string as a table's key.
Using []byte
for keys would just be more clear semantically for many Kafka users, since that is how Kafka works internally.
When running on SSDs, one may prefer using BadgerDB instead of LevelDB.
https://github.com/dgraph-io/badger
Error creating processor: topic group-1-table has 50 partitions instead of 400 exit status 1
I see that this configuration comes from sarama. any particular reason I could be facing this.
Since Kafka v0.11, messages can contain headers, it would be nice to be able to specify them when emitting messages and receive them in our processors.
I'm interested in submitting a PR if that's something you'd like to consider adding to goka.
It seems like right now Goka does not allow to use the same topic as input and join at the same time in the same group.
We have a situation where we have set of common lookups that we want to use to enrich messages from topic-a
and topic-b
. We don't want to define separate groups for topic-a
and topic-b
because in this case these lookups will end up duplicated in different LevelDBs which is now common per group.
So we put topic-a
and topic-b
in the same group. The thing is that we also want to join topic-a
messages with topic-b
so we put topic-b
also as a join edge in the group.
When we do this, the callback defines for topic-b
is not processing anymore, and we are only able to use topic-b
for joins.
What we want is to enrich topic-a
messages with common lookups and also with latest topic-b
message, and we also want to enrich every topic-b
message with common lookups only.
Hope this explanation make sense (if not let me know).
What should be happening if same group use same topic in as input and join or lookup?
Oftentimes during message processing firing another IO operation is needed (like querying a remote database, or executing remote HTTP call, or similar). Normally these IO operation would accept context.Context
for cancellation.
It would be great if Goka could propagate the context passed to processor's Run
method to the actual callback.
One of the way to do it in a backward-compatible way could be embedding context.Context
interface inside goka.Context
directly, so that goka.Context
satisfies context.Context
. Then one could pass goka.Context
directly to the IO function or/and create child context.Context
off of it.
My only concern about it is that goka.Context
would do even more things, and it's already doing a lot. But as a benefit of this solution we could avoid confusion between context.Context
and goka.Context
as use of context.Context
is becoming more wide in the community.
The default local storage currently compacts writes with the snapshot package. The reasoning behind that was to reduce the disk traffic for some workloads. However, it seems the snapshot causes the memory usage to be way higher than expected.
The best would be to remove the write-compaction from the default storage, possibly leaving it as an optional wrapper to be passed via WithStorageBuilder().
I was playing around with the example code given on https://github.com/lovoo/goka/, I wanted to see what would happen with a bunch of separate instances running in parallel, each emitting a message once per second. Looking over the code it appeared as though what I needed to do was add in a bit of code to make sure each instance had its own LevelDB path, and then add in the bits to emit more than one message:
$ diff -u example.go.orig example.go
--- example.go.orig 2017-10-27 10:58:53.000000000 -0700
+++ example.go 2017-10-27 10:36:42.000000000 -0700
@@ -1,11 +1,13 @@
package main
import (
- "fmt"
+ "io/ioutil"
"log"
"os"
"os/signal"
+ "path/filepath"
"syscall"
+ "time"
"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
@@ -18,17 +20,20 @@
)
// emits a single message and leave
-func runEmitter() {
+func runEmitter(n int) {
emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}
defer emitter.Finish()
- err = emitter.EmitSync("some-key", "some-value")
- if err != nil {
- log.Fatalf("error emitting message: %v", err)
+ for i := 0; i < n; i++ {
+ err = emitter.EmitSync("some-key", "some-value")
+ if err != nil {
+ log.Fatalf("error emitting message: %v", err)
+ }
+ log.Printf("emitted message #%d", i+1)
+ time.Sleep(time.Second)
}
- fmt.Println("message emitted")
}
// process messages until ctrl-c is pressed
@@ -56,7 +61,15 @@
goka.Persist(new(codec.Int64)),
)
- p, err := goka.NewProcessor(brokers, g)
+ tdir, err := ioutil.TempDir("", "")
+ if err != nil {
+ log.Fatalf("error creating temp db dir: %v", err)
+ }
+ defer os.RemoveAll(tdir)
+
+ sb := goka.WithStorageBuilder(goka.DefaultStorageBuilder(filepath.Join(tdir, "group")))
+
+ p, err := goka.NewProcessor(brokers, g, sb)
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
@@ -73,6 +86,6 @@
}
func main() {
- runEmitter() // emits one message and stops
- runProcessor() // press ctrl-c to stop
+ go runEmitter(100) // emits 100 message and stops
+ runProcessor() // press ctrl-c to stop
}
What I ended up with was: https://play.golang.org/p/vFakftSgcs
Running 6 instances in parallel did what I'd have expected, I could see the example-group-state topic receive a flow of steadily incrementing values until it reached 600 (6 x 100).
When I Ctrl-C to quit, five of them quit gracefully but one of them gave me this:
2017/10/27 10:54:52 emitted message #98
2017/10/27 10:54:53 emitted message #99
2017/10/27 10:54:54 emitted message #100
2017/10/27 10:55:02 Processor: rebalancing: map[]
2017/10/27 10:55:02 Processor: rebalancing: map[]
^C2017/10/27 10:55:04 Processor: stopping
2017/10/27 10:55:04 Processor: wait for main goroutine
2017/10/27 10:55:04 Processor: stopped
2017/10/27 10:55:04 Processor: removing partitions
2017/10/27 10:55:04 Processor: shutdown complete
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x38 pc=0x12782b0]
goroutine 66 [running]:
github.com/lovoo/goka/kafka.(*producer).run(0xc4200caa20)
/Users/jimr/Library/gocode/src/github.com/lovoo/goka/kafka/producer.go:78 +0x2e0
created by github.com/lovoo/goka/kafka.NewProducer
/Users/jimr/Library/gocode/src/github.com/lovoo/goka/kafka/producer.go:39 +0x243
I'm new to goka and trying to figure out the best way to create a table based on a different key than the events I'm consuming.
For example, assume I'm consuming events that look like:
{
"client_id": "abc123",
"business_id": "def456",
"timestamp": "2018-01-01 01:23:45"
}
The key on these events is the client_id
. What I want to do is consume these events and create a table that tracks the latest timestamp by business_id
. So in that case, my new key would be business_id
.
There are two options I see:
Context.Lookup
and Context.Emit
or Context.Loopback
(not sure which one to use) in order to specify the key based on business_id
.Which of these options is preferred, or is there another better option?
Right now there is no way to pass a custom hasher for views (for example) when you configure Lookup
edges. So, if your topic you use for view is not partitioned using default Sarama's hasher, it will never find the correct value.
The hasher passed to the processor of the graph is not propagated to the view (which can be the simplest solution, although not always the right one, since you can you different hashers for main input and lookups).
I believe there should be a way to pass corresponding options to each edge of the graph. Using variadic parameters should not break the compatibility.
Trying to access group table without adding goka.Persist(..) to DefineGroup panics, which it shouldn't.
In the blog post announcing Goka (https://tech.lovoo.com/2017/05/23/goka/) author says about some usages of Goka at Lovoo and says there'll be future posts about that.
There are no more posts about it yet, but I would like to understand the differences between these two joins mentioned in the post:
I think the second one is what I need for my use case, but it's not clear to me how to implement it.
I'll describe the use case:
We have a compacted topic of users and compacted topic of bookings. Users are partitioned by user id, and bookings are partitioned by booking id. We create a group table for basic user data and a counter for all the bookings per user.
We've tried several approaches so far:
The issue with this approach is that we could have duplicated bookings because of "at least once" semantics. But we need accurate data.
ctx.Join
with this repartitioned bookings topic.The problem with this one is that we loose most of the bookings, since it's not guaranteed that we've processed all bookings for a given user, before message on users topic is processed for a particular user.
It sounds like cross-joins/broadcast-joins is what we actually need, but I'm not sure.
In Kafka Streams semantics what we need I believe would be KTable-KTable join.
What would you recommend to solve this situation?
I found that often we want to be able to ensure that certain part of some topic was processed before starting doing joins with that topic.
For example we join stream A with table B. We know a point in time where topic B would be mostly read. It may not be completely recovered, because messages are constantly coming in. But we know that we need at least the portion before that point in time to be available for joins.
So the idea would be to wait until certain processing time of topic B before starting processing topic A.
I could assume that using processor’s stats something like this could be achieved, but I’m not sure.
I’ve tried sleeping in A’s callback until particular record of topic B is available, but found out that topic B is stalling when topic A sleeps.
Is that the only way to change config Version for Sarama producer ?
hasher := goka.DefaultHasher()
partitioner := sarama.NewCustomHashPartitioner(hasher)
config := kafka.CreateDefaultSaramaConfig("goka", partitioner, metrics.DefaultRegistry)
config.Version = sarama.V1_0_0_0
producer, err := kafka.NewProducer(brokers, &config.Config, logger.NewLogger(log.WithField("app", "service")))
if err != nil {
return nil, err
}
emitter, err := goka.NewEmitter(brokers, goka.Stream(topic), new(codec.Proto), goka.WithEmitterProducer(producer))
I couldn't find any example or documentation about how to use lookups (goka.Lookup
, ctx.Lookup
).
I'm not sure my use case is possible, but I would like to have something similar to Kafka Streams GlobalKTable, which is a state of a topic across ALL it's partitions. Useful for some small set of lookup values that should not be co-partitioned with the main processor's topic.
So for example I have a topic called cities
which is a compacted topic of ID as key and Name as value. And then I have a processor for users
topic with user's ID as key and user properties as value (city id is one of the properties).
So for each user I want to lookup a city name by this user's city_id, but I don't want to partition users topic by city ID because it doesn't make sense. So cities topic should be processed entirely to fill the lookup table.
Is it currently possible with Goka?
It seems like there's no big need to validate co-partitioning for topologies that has no join topics.
For example I have a graph with only inputs (multiple), and these inputs have different number of partition, which is OK as there's not stateful logic around it. But Goka complains at startup that topics doesn't have the same number of partitions.
I'm facing weird issue with views:
I have a topic with 10 partitions, where each message has a string key.
I create a view for this topic, wait until view is recovered, than do Get
for a key that I'm really sure exists, and I get nil value and nil error.
BUT, the weirdest thing is that when I create an iterator for that view, I see that there is actually a message for this key.
And even after iterating, when I try to access the same key, I continue seeing nil value and nil error.
Does anyone know what can I be missing?
Hi, I try to follow the tips on the wiki about how to create tables with log compaction.
I'm using the method "Using TopicManager to create table topics" but when I go to check the topic level configuration my table doesn't have the conf "cleanup.policy=compact".
I saw that the tips in the wiki were updated the last time on 25th of February, are they still valid??
Thank you
Edoardo
Is it possible to prevent a consumer from committing offset after it has received a record? In my callback function I would like to error out so that if the consumer fails, a future consumer will pick up the record it was on.
Right now the Recovered
method on goka.View
and other objects is returning a bool
. Although it's helpful for most of the applications being able to block until it's ready.
What do you think about introducing additional method, like Ready
that would return <-chan struct{}
instead, so that the caller can block until there is something on the channel?
Behind the scenes, this method could poll the Recovered
method internally in a goroutine with some default sleep value, until Recovered
returns true.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.