Comments (7)
Let me try to understand your problem:
topic-a
andtopic-b
are streams (since they are used as input)- you consume both in the same group
- you have a couple of joined tables (say,
table-1
andtable-2
)
Now you want to additionally have the latest message of topic-b
when you process the next topic-a
message. The first thing that comes to me is to store the latest topic-b
message in the group table. Is that ok in your scenario or you can't extend the group table schema?
Another option is to store the latest topic-b
in another group table and join that group table in the original processor group.
I see the benefit of joining the stream as a table as you are suggesting, that saves you a topic and you do the log compaction in the processor. But the current implementation of Join()
is expecting a table, not a stream. If you delete the leveldb of your processor and start it again, your joined topic-b
won't have the latest message for keys that were last updated before now()-retention.ms because Kafka will have deleted them. The local storage of goka is supposed to be only a local view of the actual tables that are stored in Kafka; using leveldb to compact a stream will break recovery.
As a hack, one could disable the check of the edges. Some coding would be necessary too because the topic name is used to dispatch the message to the right goroutine (processing goroutines or loader goroutines). But to properly handle stream-stream joins, I think we'd need to deal with time of the streams and that is a bigger change.
Do you think that you can use a group table to store the latest message of topic-b
or there are concerns with that?
from goka.
The only concern I have about using group table is to avoid creating another topic and duplicate information that is already in the original topic. I think you understood the problem correctly.
I ended up creating a custom “collection” of multiple views for these common lookups, and creating separate groups for topic-a and topic-b. The this custom lookup collection is passed to both callbacks of topic-a and topic-b as normal Go struct.
The only thing about that is the need to manually wait until these views are recovered before starting processors of topic-a and topic-b.
That’s tedious but seems to work.
from goka.
Well, it is only the latest message of each key that is stored in the group table, not the complete stream. We have similar solutions in our applications and we have many places where data gets duplicated (or enriched).
After thinking a bit more, IMO the best solution for your problem would be a single processor group, which keeps in its group table the latest message of topic-b. In this case you don't have to create an extra table (btw, goka can configure the group tables for you).
Regarding the collection of views: We have used that, but we tend to avoid it now. The recovery using lookup views is faster because group table and lookup tables can be recovered together. Since we have really a lot of go programs running this processors and views, we ended up creating a library that manages the life cycle of views and processors in a program. The library guarantees that all views get recovered before the processors (take a look here).
from goka.
Thanks, will think more about it!
from goka.
Reopening the issue to resolve further questions :)
@db7 By doing what you suggest, I somehow need to make sure, that topic-b
is read until HWM at the moment, before starting processing topic-a
. Otherwise I could process topic-a
message and no topic-b
message was processed before, and it's not in the group table yet. Am I right assuming that?
I assume the code would be something like this for what you're recommending:
goka.DefineGroup("foo",
goka.Input("topic-a", new(codec.String), func(ctx goka.Context, message interface{}) {
// Process topic-a message and enrich with lookups.
ctx.Lookup("lookup-1")
ctx.Lookup("lookup-2")
ctx.Lookup("lookup-3")
// Extract relevant information from the topic-b message with the same key.
// We know beforehand, that at least one message with the same key would be in the topic-b
// before HWM at the moment of starting the application.
topicbValue := ctx.Value()
...
}),
goka.Input("topic-b", new(codec.String), func(ctx goka.Context, message interface{}) {
// Independent processing of topic-b message.
// Save topic-b message in the group table so that topic-a callback can use it.
ctx.SetValue(message)
}),
goka.Lookup("lookup-1", new(codec.String)),
goka.Lookup("lookup-2", new(codec.String)),
goka.Lookup("lookup-3", new(codec.String)),
goka.Persist(someGroupCodec),
)
from goka.
Yes, that is what I was thinking. But I didn't realise you had a requirement of synchronising topic-a with topic-b.
Note that even if you pass topic-b via another processor group (and group table) and try to join the table with topic-a, waiting for the HWM is not sufficient because you don't know when topic-b actually started sending messages.
Perhaps you could try to sync topic-a and topic-b using a timestamp in the events and buffering the messages of the faster topic in the group-table. We don't do this often. We have a single processor where such time join is needed. In our specific scenario, the messages come very seldom and our solution is simply to loop the too-early-arriving message (via another processor) until the other message has arrived.
from goka.
I ended up just duplicating common lookups in two processors, and use them separately. Syncronizing stuff is tricky, and doing it manually within a single group is not event possible. But that's ok.
from goka.
Related Issues (20)
- check partition mismatch
- create actions for examples
- The process is disconnected when Kafka restart HOT 2
- Wiki references ZKTopicManagerBuilder but it doesn't exist HOT 1
- Goka set debug mode to custom logger HOT 3
- ctx.Value() changes initial message HOT 5
- RedisIterator skips first key when it.Next() is called HOT 5
- Failing to run examples HOT 6
- Question: Goka compatibility with Kafka 3.3.1 (KRaft) HOT 1
- Bug: `PartitionProcessor.VisitValues` does not clean up `Signal.waiters`, causing a memory leak
- Question: Regarding Emitters Connection HOT 2
- Pause and Resume Functionality in Processor HOT 4
- Link to blog post is dead HOT 1
- Processor calls wrong decode codec on message receipt HOT 1
- Any way allow to customize key of KTable HOT 1
- bug error setting up for partition HOT 3
- how to use Goka to move data from group table state to different storage system HOT 4
- EmitSync stuck in channel HOT 3
- Cannot connect to broker if the program is on docker container HOT 3
- Kafka server: Message was too large, server rejected it to avoid allocation error HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from goka.