Git Product home page Git Product logo

Comments (7)

db7 avatar db7 commented on May 30, 2024

Let me try to understand your problem:

  • topic-a and topic-b are streams (since they are used as input)
  • you consume both in the same group
  • you have a couple of joined tables (say, table-1and table-2)

Now you want to additionally have the latest message of topic-b when you process the next topic-a message. The first thing that comes to me is to store the latest topic-b message in the group table. Is that ok in your scenario or you can't extend the group table schema?

Another option is to store the latest topic-b in another group table and join that group table in the original processor group.

I see the benefit of joining the stream as a table as you are suggesting, that saves you a topic and you do the log compaction in the processor. But the current implementation of Join() is expecting a table, not a stream. If you delete the leveldb of your processor and start it again, your joined topic-b won't have the latest message for keys that were last updated before now()-retention.ms because Kafka will have deleted them. The local storage of goka is supposed to be only a local view of the actual tables that are stored in Kafka; using leveldb to compact a stream will break recovery.

As a hack, one could disable the check of the edges. Some coding would be necessary too because the topic name is used to dispatch the message to the right goroutine (processing goroutines or loader goroutines). But to properly handle stream-stream joins, I think we'd need to deal with time of the streams and that is a bigger change.

Do you think that you can use a group table to store the latest message of topic-b or there are concerns with that?

from goka.

burdiyan avatar burdiyan commented on May 30, 2024

The only concern I have about using group table is to avoid creating another topic and duplicate information that is already in the original topic. I think you understood the problem correctly.

I ended up creating a custom “collection” of multiple views for these common lookups, and creating separate groups for topic-a and topic-b. The this custom lookup collection is passed to both callbacks of topic-a and topic-b as normal Go struct.

The only thing about that is the need to manually wait until these views are recovered before starting processors of topic-a and topic-b.

That’s tedious but seems to work.

from goka.

db7 avatar db7 commented on May 30, 2024

Well, it is only the latest message of each key that is stored in the group table, not the complete stream. We have similar solutions in our applications and we have many places where data gets duplicated (or enriched).

After thinking a bit more, IMO the best solution for your problem would be a single processor group, which keeps in its group table the latest message of topic-b. In this case you don't have to create an extra table (btw, goka can configure the group tables for you).

Regarding the collection of views: We have used that, but we tend to avoid it now. The recovery using lookup views is faster because group table and lookup tables can be recovered together. Since we have really a lot of go programs running this processors and views, we ended up creating a library that manages the life cycle of views and processors in a program. The library guarantees that all views get recovered before the processors (take a look here).

from goka.

burdiyan avatar burdiyan commented on May 30, 2024

Thanks, will think more about it!

from goka.

burdiyan avatar burdiyan commented on May 30, 2024

Reopening the issue to resolve further questions :)

@db7 By doing what you suggest, I somehow need to make sure, that topic-b is read until HWM at the moment, before starting processing topic-a. Otherwise I could process topic-a message and no topic-b message was processed before, and it's not in the group table yet. Am I right assuming that?

I assume the code would be something like this for what you're recommending:

goka.DefineGroup("foo",
    goka.Input("topic-a", new(codec.String), func(ctx goka.Context, message interface{}) {
        // Process topic-a message and enrich with lookups.
        ctx.Lookup("lookup-1")
        ctx.Lookup("lookup-2")    
        ctx.Lookup("lookup-3")

        // Extract relevant information from the topic-b message with the same key.
        // We know beforehand, that at least one message with the same key would be in the topic-b
        // before HWM at the moment of starting the application.
        topicbValue := ctx.Value()
        ...  
    }),
    goka.Input("topic-b", new(codec.String), func(ctx goka.Context, message interface{}) {
        // Independent processing of topic-b message.

        // Save topic-b message in the group table so that topic-a callback can use it.
        ctx.SetValue(message)
    }),
    goka.Lookup("lookup-1", new(codec.String)),
    goka.Lookup("lookup-2", new(codec.String)),
    goka.Lookup("lookup-3", new(codec.String)),
    goka.Persist(someGroupCodec),
)

from goka.

db7 avatar db7 commented on May 30, 2024

Yes, that is what I was thinking. But I didn't realise you had a requirement of synchronising topic-a with topic-b.

Note that even if you pass topic-b via another processor group (and group table) and try to join the table with topic-a, waiting for the HWM is not sufficient because you don't know when topic-b actually started sending messages.

Perhaps you could try to sync topic-a and topic-b using a timestamp in the events and buffering the messages of the faster topic in the group-table. We don't do this often. We have a single processor where such time join is needed. In our specific scenario, the messages come very seldom and our solution is simply to loop the too-early-arriving message (via another processor) until the other message has arrived.

from goka.

burdiyan avatar burdiyan commented on May 30, 2024

I ended up just duplicating common lookups in two processors, and use them separately. Syncronizing stuff is tricky, and doing it manually within a single group is not event possible. But that's ok.

from goka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.