Comments (13)
Ok, thanks for the suggestions. I think it would make sense to add the decoding of the header value in the codec. I could imagine using the headers as follows:
// Header is one entry in the Kafka record headers.
type Header struct{ key string, value interface{} }
type Context interface {
// Header returns the value for a header key if available, otherwise nil.
Header(key string) interface{}
// Emit asynchronously writes a message into a topic.
Emit(topic Stream, key string, value interface{}, header ...Header)
...
}
The codec would be extended to look like this:
type Codec interface {
Encode(value interface{}) (data []byte, err error)
Decode(data []byte) (value interface{}, err error)
EncodeHeader(value interface{}) (data []byte, err error)
DecodeHeader(data []byte) (value interface{}, err error)
}
To avoid having to implement EncodeHeader
and DecodeHeader
every time, we can provide some header codecs to be added to any codec:
type MyCodec struct {
codec.HeaderBytes
}
// only implement Encode and Decode
func (c *MyCodec) Encode(value interface{}) (data []byte, err error) { ... }
func (c *MyCodec) Decode(data []byte) (value interface{}, err error) { ... }
That would require to add to all codecs in existing code the above codec.HeaderBytes
, even if one does not use headers. Does that sound ok?
from goka.
Also a way to get all the headers (raw) could be useful, eg
type Context interface {
// Header returns the value for a header key if available, otherwise nil.
Header(key string) interface{}
// Headers returns the raw headers.
Headers() map[string][]byte
...
}
from goka.
Sure. I'd be happy to review your PR. Also, if you need help to drive through the code, let me know.
from goka.
@asdine Same here, I'll be happy to give you a hand.
from goka.
Sure ! I'll work on that this weekend 👍
from goka.
We are starting to use goka and this would be a nice addition that would provide a way to trace. Any word on this? If you guys have an idea on how this should be implemented I could take a stab at it as well.
from goka.
@annymsMthd sorry for the delayed response. PRs are very welcome.
Since we don't use message headers, perhaps you could quickly explain me how you'd like have it available in your processor callbacks, and then I can hint you the places where to add the pieces of code you'd need.
A processor callback looks like this func(ctx goka.Context, m interface{})
. The message payload is m
. Other metadata are accessible via ctx
:
type Context interface {
// Topic returns the topic of input message.
Topic() Stream
// Key returns the key of the input message.
Key() string
// Partition returns the partition of the input message.
Partition() int32
// Offset returns the offset of the input message.
Offset() int64
// Timestamp returns the timestamp of the input message. If the timestamp is
// invalid, a zero time will be returned.
Timestamp() time.Time
// Emit asynchronously writes a message into a topic.
Emit(topic Stream, key string, value interface{})
...
}
ctx.Emit(topic, key, value)
is used to emit a message into a topic.
So, how would you like to use the headers given the existing interface?
from goka.
Hello @db7. good to hear from you on this subject.
As mentioned in the doc of Kafka, Record can have headers. Headers are key-value (string, []byte).
So what I can see it's having a new map for Headers in context and modifying emit to add those headers during the emit if present. and also a second Emit func that allow for a fourth parameter Emit(topic, key, value, headers)
Also in Context having access to this Headers will be helpful. I don't know if headers could be encoded using a codec as it's the case for the value.
Headers can be map[string][]byte or map[string]interface{} depending.
What do you think ?
from goka.
@j0hnsmith A complete map is a good idea, but do you really want to have that always as raw bytes? Wouldn't it be sufficient to use a header bytes codec (which does not do anything)?
from goka.
Ahh, yeah, missed that bit. 👍
from goka.
Hi guys.
Happy new year. Can we take time to finish this feature. I think it should not take a while to implement this. What's Lovoo position with that project? Are you maintaining it? Do you have any roadmap?
from goka.
Hi @jbpin,
very sorry again for the long delay.
In general, we are maintaining Goka primarily by fixing bugs or improving smaller features
we use in our services. That's why there is no official roadmap.
Unfortunately we don't have much resources to proactively work on
feature requests for which we don't have real use cases,
even if those features (like emitting headers in a message) definitely make sense.
Nevertheless we have interest in pushing the project and its features. I'm happy to
support work on specific problems, review pull requests and such.
Having support by active users of Goka would be helpful and very much appreciated.
Maybe we can create a roadmap in the wiki to start buildinga community, what do you guys think?
For this specific ticket: I think the suggestions made so far are a good starting point.
And again I'm happy to support anyone who wants to give it a try, because it can take a while until I find
the time to start.
from goka.
Since this issue didn't have any activity for over a year, I'll close it as it doesn't seem to be too important. Feel free to create a new issue with concrete requirements or ideas.
from goka.
Related Issues (20)
- Pause and Resume Functionality in Processor HOT 4
- Link to blog post is dead HOT 1
- Processor calls wrong decode codec on message receipt HOT 1
- Any way allow to customize key of KTable HOT 1
- bug error setting up for partition HOT 3
- how to use Goka to move data from group table state to different storage system HOT 4
- EmitSync stuck in channel HOT 3
- Cannot connect to broker if the program is on docker container HOT 3
- Kafka server: Message was too large, server rejected it to avoid allocation error HOT 3
- How to modify value in Group table? HOT 4
- What storage I should use for processor and view? HOT 4
- How to fill up local cache with messages from a custom offset on start up? HOT 1
- Convert to using github.com/IBM/sarama HOT 1
- [question] Azure EventHub HOT 1
- no automatic reconnection processor ? HOT 1
- LevelDB - No space left on device HOT 1
- "panic: sync: negative WaitGroup counter" in PartitionProcessor.VisitValues HOT 8
- Behavior of View Sync HOT 1
- Configuring Sarama's initial offset to OffsetOldest leads the existing group to reprocess topic messages from start HOT 3
- Writing to group table from asynchronous function; using ctx.SetValue causes a WaitGroup panic
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from goka.