Comments (24)
@burdiyan that sounds easy to implement. As far as I see, all these changes can be implemented on top of the existing edges. We just have to be careful what edges can be combined together. If Window
is used, we should not use Persist
, but also should not use Join
nor Lookup
because those are not timed. Perhaps we should also not allow Loopback
? So the Window
will restrict quite a lot how the graph will look like.
We were planning at some point to come up with an operators
subpackage and I think windowing would fit there perfectly. The operators
subpackage would be a library of predefined design patterns. It would provide GroupGraph
builders with a predetermined structure, for example, if you want to map the values of a topic into another topic, currently you'd write something like this:
goka.DefineGraph(group,
goka.Input(inTopic, inCodec, func(ctx goka.Context, m interface{} {
result := mapFunction(m)
ctx.Emit(outTopic, ctx.Key(), result)
}),
goka.Output(outTopic, outCodec),
)
Instead of doing that by hand, the operators library would provide a GroupGraph builder like this:
operators.Map(inTopic, inCodec, outTopic, outCodec,
func(m interface{}) interface{} { return mapFunction(m) })
The same thing can be done for ReduceByKey
and Filter
and some other simple operators.
A Window
operator could look like this:
operators.Window("count-clicks-5m", new(codec.Int), 5*time.Minute, 24*time.Hour,
operators.WindowInput("add-clicks", new(codec.String),
func(value interface{}, message interface{}) interface{} {
var count int
if v := value; v != nil {
count = v.(int)
}
count++
return count
}),
operators.WindowInput("sub-clicks", new(codec.String),
func(value interface{}, message interface{}) interface{} {
var count int
if v := value; v != nil {
count = v.(int)
}
count--
return count
}),
)
The interface{}
types could be replaced by some specialized context or other types we find adequate.
The advantage of building such patterns on top of the edge primitives is that we have type safety when combining the edges and we can restrict/extend the callback interface, tailoring it to the use case. I would prefer that than pushing Window
at the same level as the edges. What do you think about that?
Also, in the operators
package we could use a more fluent definition for such group graphs if desired, eg,
operators.Window(group, codec, time, time).
AddInput(topic, codec, function).
AddInput(...).
Build()
from goka.
@andrewmunro currently the easiest approach I would recommend is to use a group table, create a record there that would manage all the windowing.
For example you could have something like:
type WindowedRecord struct {
windows map[int64]MyAggregatedValue
}
Then on each incoming record from your input topic, you would retrieve WindowedRecord
from the group table store (or initialize one if it's not there), then identify the window based on epoch time of your incoming record, and put it in the right key of the underlying map. Then you would store the record back in the group table.
Let me know if the above make sense for you, I'm not sure I'm explaining myself clear enough.
IMO, something similar right now is the only way to implement windowing semantics without modifying Goka itself.
BTW, @andrewmunro could you describe a little bit more your use case for windowing semantics that you need in your application?
from goka.
Do you mean when a month is over and the customer object is not yet updated? You can check what the current month is when a service reads the data from the view. What we usually do is to add methods to the type stored in the group table. Something like this:
func (c *Customer) GetAmountThisMonth() float64 {
if c.Timestamp.Month() != time.Now().Month() {
return 0.0
}
return c.AmonthTransactedThisMonth
}
from goka.
This would be awesome! We don't have use cases for that at the moment, but I'm happy to discuss about it.
So goka.Context
is an interface anyway. Whenever writing some value X, one could wrap that with some window bucket. The timestamp extractor could be either part of the codec or passed via option. One thing to consider is that automatically augmenting the state would make the table unreadable from views that are not aware of the augmentation.
Would you like to give a try? :-)
from goka.
I'd like to give it a try, but first would like to have it super clear in terms of the design.
So goka.Context is an interface anyway. Whenever writing some value X, one could wrap that with some window bucket.
The thing is that it seems like there's no way to swap the context implementation, it's somehow hardcoded in the processor here: https://sourcegraph.com/github.com/lovoo/goka/-/blob/processor.go#L643:9
What is the best way you see for someone to create custom context? Pass it as an option? Create something like context builder maybe?
The timestamp extractor could be either part of the codec or passed via option.
I think both are valid solutions, but I think I like the option better.
One thing to consider is that automatically augmenting the state would make the table unreadable from views that are not aware of the augmentation.
That's true. The thing is that in kafka streams nobody is supposed to use state store change log topics, they are only for restoring the state store. But Goka is explicitly allowing other pieces to use group table's changelog topic, which is also totally valid I think.
from goka.
Nice!
I think it should be easy to wrap the callbacks in goka.Input(topic, codec, callback)
. One idea would be to create a subpackage, say timebucket
, then
g := goka.DefineGroup(
tb.Input(topic, codec, callback), // callback will get wrapped
tb.Persist(otherCodec), // otherCodec will get wrapped
)
And the wrapper would do something like this:
func Input(topic goka.Stream, codec goka.Codec, cb goka.ProcessCallback) Edges {
return goka.Input(topic, codec, func(ctx context.Context, m interface{}) {
tbContext := convertContext(ctx)
cb(tbContext, m)
})
}
A similar idea for persist. What do you think?
from goka.
I guess that was unclear. I'll write a longer example later today.
from goka.
Ok, so the idea was to create a package, eg, timebucket
. In this package we wrap the edges of the GroupGraph
and redefine the ProcessCallback
to use a more specialized callback. (disclaimer: I havent thought about the window functions, so this is very high level and I don't know what extra functions the context should have)
The table values could be defined like this:
// bucket for the messages of one stream
type bucket struct {
messages []interface{}
}
// wrapper around the user value. This is the actual value stored in the group table.
type bucketValue struct {
// buckets for the streams that are buffered
buckets map[goka.Stream]bucket
// actual value of the user
value interface{}
}
// codec of codecs: keeps codecs of buffered streams and table
type bucketCodec struct {
stream map[goka.Stream]goka.Codec
value goka.Codec
}
func (bc *bucketCodec) Encode(value interface{}) (data []byte, err error) {
// serialize a bucketValue usind the codecs in bc
return nil, nil
}
func (bc *bucketCodec) Decode(data []byte) (value interface{}, err error) {
// deserialize a bucketValue using the codecs in bc
return new(bucketValue), nil
}
The context would be some specialized context containing any function relevant to get messages from the buckets. To process input streams with timebuckets, the user should implement callbacks with this signature: func(ctx timebucket.Context, m interface{})
timebucket
could provide 3 functions to create the groupgraph: Input, Persist and DefineGroup.
Input()
creates an inputStream
which implements goka.Edge
interface but also stores, for example, a retrieve function to get a timestamp from the events.
// goka.Edge for buffered streams
func Input(topic goka.Stream, codec goka.Codec, cb goka.ProcessCallback, retrieve RetrieveTimeFunc) goka.Edge {
return &inputStream{topic, codec, cb, retrieve}
}
type inputStream struct {
topic goka.Stream
codec goka.Codec
cb ProcessCallback
retrieve RetrieveTimeFunc
}
func (i *inputStream) String() string { return "..." }
func (i *inputStream) Topic() string { return string(i.topic) }
func (i *inputStream) Codec() goka.Codec { return i.codec }
// Functions with this signature can retrieve a timestamp from a message
type RetrieveTimeFunc func(message interface{}) (time.Time, error)
The persistency edge would be something like this:
// goka.Edge for wrapped value
func Persist(codec goka.Codec, windowLength time.Duration) goka.Edge {
return &table{codec, windowLength}
}
type table struct {
topic goka.Stream
codec goka.Codec
windowLength time.Duration
}
When defining a group graph, we can wrap the callbacks and codecs and provide a normal goka.GroupGraph
to the goka.Processor
.
If timebucket.DefineGroup
only takes goka.Edge
objects as argument, it also allows one passing normal goka.Input
, goka.Table
, etc, which is nice. We can internally do type switches to find the inputStream and table types defined above.
// timebucket.DefineGraph
func DefineGraph(group goka.Group, edges ...goka.Edge) *goka.GroupGraph {
var (
t = findPersist(edges) // persistency edge
i = findTimeBucketed(edges) // timebucketed inputStreams
o = removeFrom(edges, t, i) // remaining edges
e []Edge // result
)
// 1. replace codec of persistency edge
c := &bucketCodec{
stream: make(map[goka.Stream]goka.Codec),
value: t.Codec(),
}
for _, edge := range i {
c.stream[goka.Stream(edge.Topic())] = edge.Codec()
}
e = append(e, goka.Persist(c))
// 2. wrap callbacks in timebucketed inputStreams
for _, in := range i {
// callback with the timebucket context
cb := func(ctx goka.Context, m interface{}) {
// create timebucket Context
tbctx := newContext(ctx)
// save message in bucket if necessary
if decide(in.retrieve(m), t.windowLength) {
tbctx.bucket.Add(m)
}
// call user callback func(ctx timebucket.Context, m interface{})
in.cb(tbctx, m)
// if messages are in buffer and SetValue not called, call it now
tbctx.persist()
}
e = append(e, goka.Input(in.Topic(), in.Codec(), cb)
}
// 3. put everything together
return goka.DefineGroup(group, append(e, o...)...)
}
I just don't know how timebucket.Context
should look like since I don't know much about time joins.
There are definitely other ways how to inject that into goka. Let me know if you have other ideas.
from goka.
Just read the thread and from the high level it seems pretty reasonable, although conceptually there’s something I don’t like about the idea of duplicating core functions (goka.Stream and etc.) specifically for doing windowing.
I’ll play a little bit more with Kafka Streams to see how it works internally, because I only was reading code without actually playing with it.
On the other hand, there’s an idea that comes into my mind that seems should be taken into account: TTL for the records. Windows should have expiry time (maybe optional) after which they should go away. It maybe something similar to Kafka’s compaction mechanism.
Kafka Streams is using rocks db as a state store, and rocks db has TTL built in. Although I don’t know if they are using it for windowing at all.
from goka.
We'd not be duplicating core functions. The functions I suggested should be seen as "builders" which wrap the original code with additional functionality. But other solutions are also possible.
And yes, it would be great to learn from your experience with Kafka Streams.
Just the TTL for records sounds complex in the context of Goka. Of course we can expire entries in the buckets when events arrive for the respective keys, but expiring entries in buckets that receive no message will be harder.
from goka.
i could see it being something along these lines (from a very hight level perspective):
-
We could have a new
goka.Edge
that would create a group table. Same thing asgoka.Persist
but something likegoka.Windowed
. Only one of these two would be allowed in a group. -
goka.Windowed
would accept the codec, window size and window retention period (TTL). Retention period can be longer than window size to allow for late arrival data. -
We could start using message's
.Timestamp
parameter to define time buckets. -
Current implementation of
goka.Context
would have to understand when group table is a windowed one, and would wrap.Value
and.SetValue
modifying the record's key to some common pattern that identifies the key in a bucket. For example Kafka Streams uses the following:[<original-key>@<bucket-start-ms>/<bucket-end-ms>]
. -
This way, a user would obtain a corresponding aggregate for each processing message, and the actual aggregate is defined by the user.
-
There should be a cleanup process, that would purge expired windows, once per TTL duration. It can be best-effort operation, so sometimes buckets that are already expired can still be in the state store for some time.
-
Windows are based on the epoch time. (
round(current-timestamp-nano/window-size-nano
would be a bucket start time).
Things to take into account:
-
Users of windowed group table's topics should be aware of the modified key pattern.
-
Incoming records that correspond to already expired buckets should be discarded (or maybe let the user decide what to do with them).
A possible user experience could be something like this:
goka.DefineGroup("count-clicks-5m",
goka.Input("clicks", new(codec.String), func(ctx goka.Context, message interface{}) {
// This would grab the value from the state store
// that corresponds to the time bucket of the incoming message
// based on it's Timestamp parameter.
var count int
if v := ctx.Value(); v != nil {
count = v.(int)
}
count++
ctx.SetValue(count)
}),
goka.Windowed(new(codec.Int), 5 * time.Minute, 24 * time.Hour),
)
This is very similar to how Kafka Streams works (without taking into account different window types and more advanced features).
What do you think?
from goka.
That sounds really nice! It is simpler than my proposal above and should also solve your issue with joining two streams, right? Do you have a suggestion how to implement the cleaning up process?
Sorry for the late response.
from goka.
@db7 In Kafka Streams, time is advanced only when new messages are coming.
So basically it could be something like this when we process a new message:
if windowExist(message):
addToWindow(message)
else:
createNewWindow(message)
purgeExpiredWindows(key)
from goka.
@db7 Is there any update on this? I'd love to add time based windowing to my application but am unsure on how to implement it myself :(
from goka.
@burdiyan That kind of makes sense. So here's an example.
Say I have an incoming stream of transactions from customers. I want to aggregate the amount of money a customer has spent in different time windows, such as this week, this month, this year (maybe too much data to store).
So my input model would look like this:
type Transaction {
CustomerID string
Amount float64
}
And my group model looks like this:
type Customer struct {
ID string
AmountTransacted float64
}
And finally, my aggregator looks like this:
g := goka.DefineGroup(
group,
goka.Input(topic, new(transaction.Codec), func(ctx goka.Context, msg interface{}){
t := msg.(*transaction.Transaction)
c = ctx.Value().(*customer.Customer)
c.AmountTransacted += t.Amount
ctx.SetValue(c)
}),
goka.Persist(new(customer.Codec)),
)
Essentially what I want is to add an AmountTransactedThisMonth
field to my Customer struct.
type Customer struct {
ID string
AmountTransacted float64
AmountTransactedThisMonth float64
}
from goka.
@andrewmunro you'll also need to keep track of what is the current month you are considering using a timestamp and then you'd have to reset the aggregation of the month once the month is over. Something like this:
func(ctx goka.Context, msg interface{}){
t := msg.(*transaction.Transaction)
c = ctx.Value().(*customer.Customer)
c.AmountTransacted += t.Amount
// reset amount if month is over
if t.Timestamp.Month() != c.Timestamp.Month() {
c.Timestamp = t.Timestamp
c.AmountTransactedThisMonth = 0
}
c.AmountTransactedThisMonth += t.Amount
ctx.SetValue(c)
})
Also note that you don't need to keep the customer ID in your state since it will be the same as the key.
from goka.
@db7 I thought about this, but won’t any views accessing a customer’s state potentially be incorrect until that customer makes another transaction?
from goka.
Hi @db7
We've been looking into using goka, for our use case we'll need windowing based on specific times, for example every 15 minutes at fixed intervals and we need to make sure the window is emitted at fixed times even if no message was received.
I was thinking of implementing a Ticker to check the time and emit the window when needed, but after digging a little I noticed that if no message was received (ex. If the consumer was restarted) the context won't have the message key and it'll have no way to find the correct value (as we save the window in the context).
Any suggestion on how we can implement this kind of functionality? I would be happy to contribute this functionality but struggle to see a reasonable solution :/ any help would be much appreciated.
from goka.
Hi @keisar I haven't been working on this for a while. I guess @frairon would be able to help you better.
If I understand your problem every 15 minutes you need to create a message for every key of a table. The only idea I have at the moment is the following. In your main (not in ProcessCallback), you create a ticker that iterates over the table every 15 minutes and sends a message with an emitter. AFAIR, you can iterate directly on the top of the processor. But it may be better to create another program with a view of the table and iterate on the view.
from goka.
Thank you @db7
I'll look into this, your approach sounds like a good one, I'm only concerned on how to make sure a window is not sent multiple times, for example if I have multiple instances running the emitter I'm guessing each emitter will see all the windows so I'll need to find a way of assigning the data between these instances or maybe goka covers us and we don't need to take care of that?
from goka.
Hi @keisar,
yep, Diogo is basically right, there is no builtin functionality of windowing in goka. But if you want to emit a value for every key, create a view and iterate over it. But this gives you more of a snapshots-semantics, because all windows are emitted at the same time. Also it probably won't scale very well since it'll always iterate all keys even when sending only few entries.
Also, if you had multiple instances of that view-based-window-emitter, every instance would emit every window.
I'm not sure I've really understood the use case. Could you give an example of input/output, how it's aggregated and what data should end up in the window eventually? Maybe there are other solutions too.
from goka.
Hi @frairon
Thanks for the reply, you confirmed some of my concerns, I was worried on how this will scale…
Here’s an example, hopefully it’ll clarify the use case a little:
We have two topics, one for appointments and another for appointments cancellation (key for both is customer id), we want to window the events coming in per customer for 15 minutes, remove cancelled appointments and create some counters per window.
It is important that each window is sent at the same time, for example a window can be from 11:13-11:28 so the window will be emitted at 11:28 even if no message arrived at that time.
from goka.
Hi @frairon
I'm attaching here a diagram showing what we thought of doing in order to achieve the windowing, both for feedback and to help others if they want to achieve the same (assuming you won't find any issues we missed :) )
Short legend for the diagram:
- Window tagger - adds a window field to each incoming message
- Window collector - collects all messages into a GroupTable under the appropriate window
- Window notifier - produces messages to a topic whenever a window should be created
- Window transmitter - When a window needs to be transmitted (based on notification from the window notifier) it will grab the window, send it and clear it (we will have some logic here to deduplicate multiple notification for the same window)
Hopefully with this approach the transmitter can still scale and the notifier doesn't do much (every 15 minutes sends small messages for each window) so it won't be an issue scale-wise.
from goka.
@keisar We've implemented similar windowing semantics in a goka processor by relying on a ticker and Processor.VisitAll
. I've added more detail on our use case here: #392 (comment)
from goka.
Related Issues (20)
- Pause and Resume Functionality in Processor HOT 4
- Link to blog post is dead HOT 1
- Processor calls wrong decode codec on message receipt HOT 1
- Any way allow to customize key of KTable HOT 1
- bug error setting up for partition HOT 3
- how to use Goka to move data from group table state to different storage system HOT 4
- EmitSync stuck in channel HOT 3
- Cannot connect to broker if the program is on docker container HOT 3
- Kafka server: Message was too large, server rejected it to avoid allocation error HOT 3
- How to modify value in Group table? HOT 4
- What storage I should use for processor and view? HOT 4
- How to fill up local cache with messages from a custom offset on start up? HOT 1
- Convert to using github.com/IBM/sarama HOT 1
- [question] Azure EventHub HOT 1
- no automatic reconnection processor ? HOT 1
- LevelDB - No space left on device HOT 1
- "panic: sync: negative WaitGroup counter" in PartitionProcessor.VisitValues HOT 8
- Behavior of View Sync HOT 1
- Configuring Sarama's initial offset to OffsetOldest leads the existing group to reprocess topic messages from start HOT 3
- Writing to group table from asynchronous function; using ctx.SetValue causes a WaitGroup panic
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from goka.