Git Product home page Git Product logo

goka's People

Contributors

adw1n avatar alecy avatar anoruxylene avatar c1728p9 avatar chin8628 avatar chrisfjones avatar db7 avatar dependabot[bot] avatar frairon avatar ghstahl avatar heltonmarx avatar heppu avatar j0hnsmith avatar jbpin avatar jomaresch avatar juandes avatar kavinsk1 avatar kskitek avatar mavogel avatar mmreza79 avatar mrahs avatar norbertklawikowski avatar qneyrat avatar roubanakhle avatar samihiltunen avatar spsmhaitjema avatar testwill avatar tinahhhhh avatar wwaldner-amtelco avatar zacharyfox avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

goka's Issues

Improve tester implementation

The current implementation of goka tester has some limitations:

  • only one processor can be tested
  • lookups and joins are not supported
  • state codec has to be registered manually

In branch tester-refactoring I started writing a new tester implementation.
The new tester features

  • multiple processor tests
  • lookups, joins and loop fully working
  • group graph detection, so every topic knows its codec
  • synchronous, so after tester.Consume(..) has terminated, all processors and loops and are finished consuming.
  • improved message tracking to assert correct emits of processors

The interface will change slightly, assuming that the tester has not been used too much yet:

Starting and running the tester has not changed.

Changes

  • retrieving values from the processor's group table via the tester was done before via
tester.ValueForKey("somekey")

This was possible because there was only one group table. Now we can have multiple, so you have to specify the topic too

tester.TableValue("group-table", "somekey")
  • Setting a value has changed analogue like
tester.SetValue("somekey", "value")
// becomes
tester.SetTableValue("group-table", "somekey", "value")
  • the ExpectEmit functionality was cumbersome because the tester didn't know a topic's codec:
tester.ExpectEmit("topic", "key", func(value []byte){
   // decode the value and check what you need to check
})

// then to make sure nothing extra was emitted
tester.Finish(true)

Now we can do

key, value, hasNext := tester.NextMessage("topic")
if !hasNext{
  // there was nothing more in this topic
}else{
    // check what you need to check with key and value
}
// similar to check there were no more messages
tester.ExpectEmpty("topic")

The old ExpectEmit is still there to avoid too much broken tests when upgrading but imho it should be removed.

  • Initialize the processors with state. This was implemented poorly in the old tester, now you can simply do:
// #### EDIT: this is not possible anymore. The current kafka-interface defined by goka does not allow this kind of initialization without nasty hacks.
// tester.AddInitialState("group-table", "key", "value")
// then run the processor
// go proc.Run(ctx)
// then do the test, the processor will start with the table we initialized above
  • So intializing the tables in processors or views will be done using SetTableValue like
gkt := tester.New(t)
proc := goka.NewProcessor(...,
  goka.WithTester(gkt),
)
go proc.Run(context.Background())
gkt.SetTableValue(<table>, <key>, <value>)

Documentation and Examples still need lots of polishing, but that's essentially how the new Tester would look like.
What do you think?

Don't check copartitioning for lookup topics

Right now Goka checks for copartitioning all topics in the graph. I think it should not check that for lookup topics, since these are materialized completely in all instances of the application.

We have a use case when we want to have 10 partitions for input topic, but being able to do lookups from topics with only 1 partition.

Do you agree, or am I missing some situation where it would not be possible?

Proposal: Implement windowing semantics

It would be great if Goka could natively provide primitives for windowed processing. It’s a pretty complex topic, but seems really needed in many stream processing workloads.

Kafka streams handles that by providing timestamp extractors to identify the actual time of the message, and modifying the record keys to be stored in the state store augmenting them with the window bucket.

Another approach that may help users to implement their own windowing is that if goka.Context would allow to get and set arbitrary values from the storage. I really don’t like it conceptually but it gives more freedom and is less complicated to implement.

I’m sure you were thinking about windowing in general. Do you have any clear idea on how it could be built into Goka?

Question: How to implement compound keys (segment my data)

Kafka is optimised around storing small messages. I'm currently using goka (very effectively) to aggregate a stream of transactions, however I'm concerned that the amount of data I store in my compacted topic will grow larger and larger if I start aggregating historical data too.

A working example:

I have a stream of transactions for users spending money.
I'm aggregating how much a user spends of all time.
I want to aggregate how much a user has spent per week.

I'm currently using a single compacted topic of userid -> user object (storing aggregated values).

To solve this problem in my mind, I need to split my dataset down into smaller pieces and use compound keys to store and retrieve historical data (e.g useridX.weekY).

Is there a way I can do this with goka? It looks to me like there is no way to change the key stored in the compacted topic from the event key that comes from a stream.

The only thing I can think is to emit from my first processor into a new stream, and then have another processor that aggregates this into a weekly compacted topic. Seems like a lot of overhead and I'm not sure if there's a simpler way.

review errors to fail on

When loading a view and kafka gives the message
"response did not contain all the expected topic/partition blocks",
goka should tolerate the message and just keep going.
According to the linked issue, this is normal behavior.

IBM/sarama#1087

Consume nil messages should be an option

Currently, nil messages in streams are just ignored. I think it would be better to have a WithIgnoreNil() option passed to the processor to ignore nil messages instead of passing them to the process callback.

Organizing/modeling Views

Hi,

I have a question regarding the example-2 which pobpulates an rest api so one can access the number of clicks for a given user-id as key.
How would I implement a list of users with their number of clicks? ( basically a View.all() if this would even make sense ).
Or Would this be something I have to manage in a processor function? Like a slice of users with their id and clicks? Or is there a way to iterate over the keys? What would be the recommended way?

Any plans to extend the docs or examples for more advanced scenarios covering Lookup, Join, Loopback, etc?

Thanks for your feedback in advance.

Getting error from Emitter

Hi guys. Its a simple question about how to get Error when using Emit and EmitSync. Using promise it looks like you never treat the returning error

goka/emitter.go

Lines 62 to 64 in b7b94ac

return e.producer.Emit(e.topic, key, data).Then(func(err error) {
e.wg.Done()
}), nil

goka/emitter.go

Lines 76 to 78 in b7b94ac

promise.Then(func(err error) {
close(done)
})

Processor failures: expected rebalance OK but received rebalance error

I'm running into an issue where upon starting up some goka processors, I get an error that looks like this:

kafka error: expected rebalance OK but received rebalance error

It appears to be coming from here:

case c.events <- &Error{fmt.Errorf("expected %s but received %s", expected, actual)}:

I've noticed it never happens when I run it locally, happens sometimes when I run it in our sandbox environment, and happens consistently in our production environment. I'm thinking this may be because these environments differ in number of partitions per topic (i believe i'm using 2 locally, 8 in sandbox, and 32 in production).

Does anyone know what could be causing this or have some ideas of what I could check to investigate further?

Initial offset handling

Docs says that Goka will read from the newest offset for streams (goka.Input) and same thing appears in the log when processor starts (something like map[0:-1]).

Otherwise, for a freshly created topic, and for consumer group ID that never existed in this broker, my processor consumes all the messages from the beginning of the topic).

This is just to understand what is the correct behavior and fix docs if there’s an error.

HowToX : Is it possible to iterate on View with a subset ?

Hi,
All is in the title.. I want to iterate on the view for a subset of key as with syndtr/goleveldb

iter := db.NewIterator(util.BytesPrefix([]byte("foo-")), nil)
for iter.Next() {
   // Use key/value.
   ...
}
iter.Release()
err = iter.Error()

Can you give me some direction please ? I'm also keen to manage a PR for that if necessary.

Failing view was not added

This is really weird thing that started happening without any reason.

We have one processor that has 1 input, 1 join and 6 lookup edges in the group graph.

When processor starts, there's no error in the log and everything seems to be working fine. There are messages for corresponding views being opened and so on.

But for some reason consumer is not consuming any messages.

When we stop a processor (we handle SIGTERM and SIGINT gracefully, and then execute Stop() on the processor) it throws some errors like: failing: error in view: Errors: view error opening partition ...: error removing partition .... : partition was not added.

That does not depend on how long I wait after consumer is rebalanced. Actually those topics are empty on my local machine and same thing is happening.

Real mystery for me.

How to instantiate two views that read the same table??

I create a view that reads from table test:

view, err := goka.NewView([]string{"localhost:9092"}, goka.GroupTable(goka.Group("test")), new(codec.String))
if err != nil {
  log.Fatalf("Error: %v", err)
  return
}

When I create the second view I get an error (the instance of the two views are in different projects):

view2, err := goka.NewView([]string{"localhost:9092"}, goka.GroupTable(goka.Group("test")), new(codec.String))
if err != nil {
  log.Fatalf("Error: %v", err)
  return
}

Error:

Error creating local storage for partition 0: error opening leveldb: resource temporarily unavailable
exit status 1

Can anyone suggest me how to do that??

Thank you

Reduce metrics overhead

We should review the way we are gathering metrics because they are currently introducing too much overhead. Main issues are:

  • we have GetOrRegister calls for every message
  • we update meters for every message
  • we don't have a unified way how to gather metrics from all components (storage, kafka, and goka)

How to initialize consumer group at offset start.

I've noticed that a new consumer group does not consumer earlier messages. Its nice that a consumer group will retrieve missed messages while it was down, but this requires that the consumer group was connected at an earlier time. Is it possible that when a new group is defined, it starts at the first message in a topic?

Proposal: replace Start() and Stop() with Run(context.Context)

Currently the Start() method of processors and views use errgroup to create goroutines for each partition and passes a context.Context to them. The Stop() method simply cancels the context to stop all goroutines.

I think it would be nice to use the same mechanism to control multiple processors and views running on the same program. For that I'd propose to replace the Start() and Stop() methods with a Run(context.Context) method. The usage would be something like this:

// create the context and cancel function
ctx, cancel := context.WithCancel(context.Background())

// create error group
g, ctx := errgroup.WithContext(ctx)

// create processors and views
p, _ := goka.NewProcessor(brokers, graph)
...

// start processor and views passing the context   
g.Go(func() error { return p.Run(ctx) })

// catch signals
go func() {
  wait := make(chan os.Signal, 1)
  signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
  <-wait   // wait for SIGINT/SIGTERM
  cancel() // gracefully stop processors and views
}()

if err := g.Wait(); err != nil {
  log.Fatalln(err)
}

Perhaps we could still support Start() and Stop() via some wrapper... something like this:

        p, err := goka.StartStopper(goka.NewProcessor(brokers, graph))
	if err != nil {
		log.Fatalf("error creating processor: %v", err)
	}
	go func() {
		if err = p.Start(); err != nil {
			log.Fatalf("error running processor: %v", err)
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	p.Stop() // gracefully stop processor

Any opinions?

Question: Accessing a goka.View concurrently?

I'm looking around in the documentation and I can't spot anything that talks about concurrency and goka.View. If I've initialized a goka.View may I access it concurrently, or must I either use a mutex or dedicated goka.View per goroutine?

print error on failure right away

let's print an error when it happens. Otherwise the user has to wait until the processor has shut down completely to see the errors. If that fails the is never shown, which can be annoying.

Proposal: Embed Kafka change logger in storage

Right now, updates to group table's storage are published to Kafka from goka.Context#setValueForKey.

It can be useful to embed this functionality directly into storage. This way one could create custom storage implementations that are also backed by Kafka and use it for other purposes, not only for working with group table.

It may be useful when you want to have some custom workload around storage, but don't want to care about it's safety on disk, and have it backed by Kafka.

Kafka Streams also does it this way. They basically have a wrapper around storage that adds changeling functionality to any storage. And then they allow to access storage from their Processor API in any point of the processing pipeline: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java

Running processor without prior topics fails

On an empty kafka, running a processor fails, because the copartioning cannot detect a partition count to create the missing topics.

We should have a processor option defining the default partition count for that case.

goka does not work latest bsm/sarama-cluster

Goka works with bsm/sarama-cluster up to at least version 2ae74ad075d4f088751b8c074f7e88c1020d7687. The latest version does not behave as expected and causes processors and views to crash.

Proposal: Implement message keys as bytes instead of strings

This is rather a proposal for future versions, as it will be a breaking change.

The thing is that, message keys in Kafka are bytes, so can be not only simple strings, but also complex structures encoded as bytes.

For example we use Debezium (http://debezium.io) for doing Change Data Capture from MySQL databases, and Debezium uses table's primary key as message key for Kafka. It is commonly encoded with Avro as a struct like {"id": 1}.

Right now in order to deal with this, we need to pass goka.UpdateCallback everywhere, that would convert key string back to []byte, deserialize it to struct and then store only the int value converted to string as a table's key.

Using []byte for keys would just be more clear semantically for many Kafka users, since that is how Kafka works internally.

Emit messages with headers

Since Kafka v0.11, messages can contain headers, it would be nice to be able to specify them when emitting messages and receive them in our processors.
I'm interested in submitting a PR if that's something you'd like to consider adding to goka.

Question: Same topic as input and join

It seems like right now Goka does not allow to use the same topic as input and join at the same time in the same group.

We have a situation where we have set of common lookups that we want to use to enrich messages from topic-a and topic-b. We don't want to define separate groups for topic-a and topic-b because in this case these lookups will end up duplicated in different LevelDBs which is now common per group.

So we put topic-a and topic-b in the same group. The thing is that we also want to join topic-a messages with topic-b so we put topic-b also as a join edge in the group.

When we do this, the callback defines for topic-b is not processing anymore, and we are only able to use topic-b for joins.

What we want is to enrich topic-a messages with common lookups and also with latest topic-b message, and we also want to enrich every topic-b message with common lookups only.

Hope this explanation make sense (if not let me know).

What should be happening if same group use same topic in as input and join or lookup?

Proposal: Provide context.Context via goka.Context

Oftentimes during message processing firing another IO operation is needed (like querying a remote database, or executing remote HTTP call, or similar). Normally these IO operation would accept context.Context for cancellation.

It would be great if Goka could propagate the context passed to processor's Run method to the actual callback.

One of the way to do it in a backward-compatible way could be embedding context.Context interface inside goka.Context directly, so that goka.Context satisfies context.Context. Then one could pass goka.Context directly to the IO function or/and create child context.Context off of it.

My only concern about it is that goka.Context would do even more things, and it's already doing a lot. But as a benefit of this solution we could avoid confusion between context.Context and goka.Context as use of context.Context is becoming more wide in the community.

Remove snapshot from storage

The default local storage currently compacts writes with the snapshot package. The reasoning behind that was to reduce the disk traffic for some workloads. However, it seems the snapshot causes the memory usage to be way higher than expected.

The best would be to remove the write-compaction from the default storage, possibly leaving it as an optional wrapper to be passed via WithStorageBuilder().

panic: runtime error: invalid memory address or nil pointer dereference

I was playing around with the example code given on https://github.com/lovoo/goka/, I wanted to see what would happen with a bunch of separate instances running in parallel, each emitting a message once per second. Looking over the code it appeared as though what I needed to do was add in a bit of code to make sure each instance had its own LevelDB path, and then add in the bits to emit more than one message:

$ diff -u example.go.orig example.go
--- example.go.orig	2017-10-27 10:58:53.000000000 -0700
+++ example.go	2017-10-27 10:36:42.000000000 -0700
@@ -1,11 +1,13 @@
 package main
 
 import (
-	"fmt"
+	"io/ioutil"
 	"log"
 	"os"
 	"os/signal"
+	"path/filepath"
 	"syscall"
+	"time"
 
 	"github.com/lovoo/goka"
 	"github.com/lovoo/goka/codec"
@@ -18,17 +20,20 @@
 )
 
 // emits a single message and leave
-func runEmitter() {
+func runEmitter(n int) {
 	emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
 	if err != nil {
 		log.Fatalf("error creating emitter: %v", err)
 	}
 	defer emitter.Finish()
-	err = emitter.EmitSync("some-key", "some-value")
-	if err != nil {
-		log.Fatalf("error emitting message: %v", err)
+	for i := 0; i < n; i++ {
+		err = emitter.EmitSync("some-key", "some-value")
+		if err != nil {
+			log.Fatalf("error emitting message: %v", err)
+		}
+		log.Printf("emitted message #%d", i+1)
+		time.Sleep(time.Second)
 	}
-	fmt.Println("message emitted")
 }
 
 // process messages until ctrl-c is pressed
@@ -56,7 +61,15 @@
 		goka.Persist(new(codec.Int64)),
 	)
 
-	p, err := goka.NewProcessor(brokers, g)
+	tdir, err := ioutil.TempDir("", "")
+	if err != nil {
+		log.Fatalf("error creating temp db dir: %v", err)
+	}
+	defer os.RemoveAll(tdir)
+
+	sb := goka.WithStorageBuilder(goka.DefaultStorageBuilder(filepath.Join(tdir, "group")))
+
+	p, err := goka.NewProcessor(brokers, g, sb)
 	if err != nil {
 		log.Fatalf("error creating processor: %v", err)
 	}
@@ -73,6 +86,6 @@
 }
 
 func main() {
-	runEmitter()   // emits one message and stops
-	runProcessor() // press ctrl-c to stop
+	go runEmitter(100) // emits 100 message and stops
+	runProcessor()     // press ctrl-c to stop
 }

What I ended up with was: https://play.golang.org/p/vFakftSgcs

Running 6 instances in parallel did what I'd have expected, I could see the example-group-state topic receive a flow of steadily incrementing values until it reached 600 (6 x 100).

When I Ctrl-C to quit, five of them quit gracefully but one of them gave me this:

2017/10/27 10:54:52 emitted message #98
2017/10/27 10:54:53 emitted message #99
2017/10/27 10:54:54 emitted message #100
2017/10/27 10:55:02 Processor: rebalancing: map[]
2017/10/27 10:55:02 Processor: rebalancing: map[]
^C2017/10/27 10:55:04 Processor: stopping
2017/10/27 10:55:04 Processor: wait for main goroutine
2017/10/27 10:55:04 Processor: stopped
2017/10/27 10:55:04 Processor: removing partitions
2017/10/27 10:55:04 Processor: shutdown complete
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x38 pc=0x12782b0]

goroutine 66 [running]:
github.com/lovoo/goka/kafka.(*producer).run(0xc4200caa20)
	/Users/jimr/Library/gocode/src/github.com/lovoo/goka/kafka/producer.go:78 +0x2e0
created by github.com/lovoo/goka/kafka.NewProducer
	/Users/jimr/Library/gocode/src/github.com/lovoo/goka/kafka/producer.go:39 +0x243

How to create table based on a different key

I'm new to goka and trying to figure out the best way to create a table based on a different key than the events I'm consuming.

For example, assume I'm consuming events that look like:

{
  "client_id": "abc123",
  "business_id": "def456",
  "timestamp": "2018-01-01 01:23:45"
}

The key on these events is the client_id. What I want to do is consume these events and create a table that tracks the latest timestamp by business_id. So in that case, my new key would be business_id.

There are two options I see:

  1. Consume the original events and emit rekeyed versions of them to a new topic, then consume the new topic and update the table based on those.
  2. Consume the original events and use Context.Lookup and Context.Emit or Context.Loopback (not sure which one to use) in order to specify the key based on business_id.

Which of these options is preferred, or is there another better option?

Allow passing custom hasher for graph dependencies

Right now there is no way to pass a custom hasher for views (for example) when you configure Lookup edges. So, if your topic you use for view is not partitioned using default Sarama's hasher, it will never find the correct value.

The hasher passed to the processor of the graph is not propagated to the view (which can be the simplest solution, although not always the right one, since you can you different hashers for main input and lookups).

I believe there should be a way to pass corresponding options to each edge of the graph. Using variadic parameters should not break the compatibility.

Questions about join semantics

In the blog post announcing Goka (https://tech.lovoo.com/2017/05/23/goka/) author says about some usages of Goka at Lovoo and says there'll be future posts about that.

There are no more posts about it yet, but I would like to understand the differences between these two joins mentioned in the post:

  • key-wise stream-table joins, e.g., joining user actions with user profiles;
  • cross-joins/broadcast-joins, e.g., joining user actions with a device table;

I think the second one is what I need for my use case, but it's not clear to me how to implement it.

I'll describe the use case:

We have a compacted topic of users and compacted topic of bookings. Users are partitioned by user id, and bookings are partitioned by booking id. We create a group table for basic user data and a counter for all the bookings per user.

We've tried several approaches so far:

  1. Loopback topic:
  • We create a group with input of users topic and input of bookings topic.
  • Users callback sets the value in the group table with basic user data.
  • Bookings callback sends booking data to a loopback topic with user id as key, so that bookings and users are now partitioned by the same user id.
  • Loopback topic processor gets user data from group state, and increment a counter for bookings.

The issue with this approach is that we could have duplicated bookings because of "at least once" semantics. But we need accurate data.

  1. Joins:
  • We do the same as before, but instead of sending bookings to the loop topic, we emit them to a normal topic.
  • Then in users callback we do ctx.Join with this repartitioned bookings topic.

The problem with this one is that we loose most of the bookings, since it's not guaranteed that we've processed all bookings for a given user, before message on users topic is processed for a particular user.

It sounds like cross-joins/broadcast-joins is what we actually need, but I'm not sure.

In Kafka Streams semantics what we need I believe would be KTable-KTable join.

What would you recommend to solve this situation?

Proposal: Add joins with watermarks

I found that often we want to be able to ensure that certain part of some topic was processed before starting doing joins with that topic.

For example we join stream A with table B. We know a point in time where topic B would be mostly read. It may not be completely recovered, because messages are constantly coming in. But we know that we need at least the portion before that point in time to be available for joins.

So the idea would be to wait until certain processing time of topic B before starting processing topic A.

I could assume that using processor’s stats something like this could be achieved, but I’m not sure.

I’ve tried sleeping in A’s callback until particular record of topic B is available, but found out that topic B is stalling when topic A sleeps.

Changing Sarama Version in Emitter

Is that the only way to change config Version for Sarama producer ?

hasher := goka.DefaultHasher()
partitioner := sarama.NewCustomHashPartitioner(hasher)
config := kafka.CreateDefaultSaramaConfig("goka", partitioner, metrics.DefaultRegistry)
config.Version = sarama.V1_0_0_0
producer, err := kafka.NewProducer(brokers, &config.Config, logger.NewLogger(log.WithField("app", "service")))

if err != nil {
    return nil, err
}
emitter, err := goka.NewEmitter(brokers, goka.Stream(topic), new(codec.Proto), goka.WithEmitterProducer(producer))

Provide examples of lookups

I couldn't find any example or documentation about how to use lookups (goka.Lookup, ctx.Lookup).

I'm not sure my use case is possible, but I would like to have something similar to Kafka Streams GlobalKTable, which is a state of a topic across ALL it's partitions. Useful for some small set of lookup values that should not be co-partitioned with the main processor's topic.

So for example I have a topic called cities which is a compacted topic of ID as key and Name as value. And then I have a processor for users topic with user's ID as key and user properties as value (city id is one of the properties).

So for each user I want to lookup a city name by this user's city_id, but I don't want to partition users topic by city ID because it doesn't make sense. So cities topic should be processed entirely to fill the lookup table.

Is it currently possible with Goka?

Proposal: Don't validate co-partitioning if there's no joins

It seems like there's no big need to validate co-partitioning for topologies that has no join topics.

For example I have a graph with only inputs (multiple), and these inputs have different number of partition, which is OK as there's not stateful logic around it. But Goka complains at startup that topics doesn't have the same number of partitions.

Issues getting value from view

I'm facing weird issue with views:

I have a topic with 10 partitions, where each message has a string key.

I create a view for this topic, wait until view is recovered, than do Get for a key that I'm really sure exists, and I get nil value and nil error.

BUT, the weirdest thing is that when I create an iterator for that view, I see that there is actually a message for this key.

And even after iterating, when I try to access the same key, I continue seeing nil value and nil error.

Does anyone know what can I be missing?

Question: How to create tables with log compaction

Hi, I try to follow the tips on the wiki about how to create tables with log compaction.
I'm using the method "Using TopicManager to create table topics" but when I go to check the topic level configuration my table doesn't have the conf "cleanup.policy=compact".
I saw that the tips in the wiki were updated the last time on 25th of February, are they still valid??
Thank you
Edoardo

Prevent offset commit in case of panic

Is it possible to prevent a consumer from committing offset after it has received a record? In my callback function I would like to error out so that if the consumer fails, a future consumer will pick up the record it was on.

Add blocking recovered method

Right now the Recovered method on goka.View and other objects is returning a bool. Although it's helpful for most of the applications being able to block until it's ready.

What do you think about introducing additional method, like Ready that would return <-chan struct{} instead, so that the caller can block until there is something on the channel?

Behind the scenes, this method could poll the Recovered method internally in a goroutine with some default sleep value, until Recovered returns true.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.