Comments (11)
Hey guys, what do you think of this change in the interface? Would it make the library easier to us? @SamiHiltunen @j0hnsmith @frairon @burdiyan @andrewmunro
I'd like to either create a PR for this or close the ticket.
(cc: @edganiukov we are now using errgroup
and context
libraries to do most of the goroutine management. It took long, but it's now implemented.)
from goka.
@burdiyan if I am not mistaken, you can have such logic with context as well - using either different contexts for each processor or child-parent context pattern (child context will be canceled after parent).
from goka.
I support this. It would simplify the lifecycle management.
from goka.
I like the idea of using a single context to stop multiple processors.
I actually have some time this week, I'll take a look maybe tomorrow if there's a general 👍
from goka.
I like it. Looks a lot cleaner than p1.stop(), p2.stop()
etc etc. Also means there's less danger in you forgetting to stop one of your emitters/processors/views gracefully.
I wonder if you could even abstract this further for beginners using the framework, maybe that patterns module you were talking about elsewhere... 🤔
from goka.
I actually used to wrap Start and Stop methods exactly like this using context.Context. But I found out that when you want to control the order in which multiple processors should stop, then you'll have a problem.
So if you have processor A and B, and you need to stop B before A using a single context doesn't give you that control.
I guess that's why standard library have io.Closer interface that many things like http.Server implement providing Close
method.
It's also handy that defer
statements are evaluated in LIFO order, so you close things in the revers order you "opened" them.
So, I personally would not implement Run with context, unless what I described above doesn't make sense for anybody :)
from goka.
@burdiyan that is an interesting point! AFAIK, we don't have that issue, but I can imagine having it at some point. So either we require the user to apply child contexts or we provide aStartStopper
wrapper as described in the issue. Perhaps I just need a better name for it. I will think of something and propose a PR.
Thanks for the quick feedback from everybody! That's awesome.
from goka.
PR #127 replaces Start()/Stop()
with Run(context)
. If you got some spare time, I'd be happy to have reviews.
My feeling is that for simple examples, things get more complicated because one has to create a context and a cancel function. But I like the result in examples/3-messaging
.
Next step to close this issue would be to come up with some simple wrapper to still provide the Start()/Stop()
pattern (or something similar) that does not require context and cancel function.
from goka.
PR #128 introduce Runset
objects to start and stop sets of processors/views together. It internally creates a context and uses error groups to start goroutines.
Processors and views are "runnables" because they implement Run(context.Context) error
with #127. A new function goka.Start()
can start multiple runnables together, returning a Runset. For example, rs := goka.Start(proc1, proc2, view1, view2, proc3)
rs
provides a Stop()
method to stop all runnables together. If one of the runnables returns, all others are stopped. There are other two methods in Runset:
Wait()
blocks until all runnables have terminated, returns a multi-error (if any).Done()
can be used to signal that the runset is going to stop.
Here is a complete example of how to use runsets (other variants are possible):
// create processors and views (they are runnables because have Run(ctx) method)
p, _ := goka.NewProcessor(brokers, DefineGroup(group, Input(topic, codec, cb)))
v, _ := goka.NewView(brokers, topic2, codec)
// start runnables creating a Runset.
rs := goka.Start(p,v)
// wait for bad things to happen
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
select {
case <-rs.Done(): // wait for one of the runnables to return
case <-wait: // wait for SIGINT/SIGTERM
rs.Stop() // gracefully stop runnables
}
// wait for all runnables to return and collect error messages
if err := rs.Wait(); err != nil {
log.Fatalln(err)
}
If the order of stopping the runnables is relevant, one can create multiple runsets (eg, one for each processors/view) and stop them accordingly.
@burdiyan do you think that this would be helpful and sufficient for your use cases? Do you have suggestions how to improve it?
from goka.
I normally do signal trapping as part of top level errgroup, like this:
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGTERM, syscall.SIGINT)
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
signal.Stop(done)
return errors.New("signal received")
}
})
g.Go(func() error {
// Start monitoring HTTP server
})
g.Go(func() error {
// Start goka processor.
})
g.Go(func() error {
// Start another goka processor.
})
// Separate goroutine for shutdown logic.
g.Go(func() error {
<-ctx.Done()
// Stop processor 1.
// Stop processor 2.
// Shutdown the HTTP server.
// Do other cleanups.
})
g.Wait() // plus error handling for errgroup.
This way the program ends cleanly if one of the "actors" in the errgroup returns error or signal is trapped.
I'm concerned about hiding context in goka.Start
, because you normally need context if you need more flexibility or have other goroutines to handle.
I'd be fine with just having Run(context.Context)
in processors and views and handle multiple processors manually.
The ordering problem for shutdown could be handled with derived context as discussed elsewhere.
from goka.
The change has been implemented and merged.
The helper wrapper seems to be an overkill and I closed the PR for now.
Thanks for the feedback from everybody.
from goka.
Related Issues (20)
- Pause and Resume Functionality in Processor HOT 4
- Link to blog post is dead HOT 1
- Processor calls wrong decode codec on message receipt HOT 1
- Any way allow to customize key of KTable HOT 1
- bug error setting up for partition HOT 3
- how to use Goka to move data from group table state to different storage system HOT 4
- EmitSync stuck in channel HOT 3
- Cannot connect to broker if the program is on docker container HOT 3
- Kafka server: Message was too large, server rejected it to avoid allocation error HOT 3
- How to modify value in Group table? HOT 4
- What storage I should use for processor and view? HOT 4
- How to fill up local cache with messages from a custom offset on start up? HOT 1
- Convert to using github.com/IBM/sarama HOT 1
- [question] Azure EventHub HOT 1
- no automatic reconnection processor ? HOT 1
- LevelDB - No space left on device HOT 1
- "panic: sync: negative WaitGroup counter" in PartitionProcessor.VisitValues HOT 8
- Behavior of View Sync HOT 1
- Configuring Sarama's initial offset to OffsetOldest leads the existing group to reprocess topic messages from start HOT 3
- Writing to group table from asynchronous function; using ctx.SetValue causes a WaitGroup panic
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from goka.