Git Product home page Git Product logo

gookit / event Goto Github PK

View Code? Open in Web Editor NEW
478.0 18.0 57.0 144 KB

📢 Lightweight event manager and dispatcher implements by Go. Go实现的轻量级的事件管理、调度程序库, 支持设置监听器的优先级, 支持使用通配符来进行一组事件的监听

Home Page: https://pgk.go.dev/github.com/gookit/event

License: MIT License

Go 100.00%
event-management event-dispatcher event-listener gookit multiple-listeners eventbus events

event's Introduction

Event

GitHub go.mod Go version GoDoc Actions Status Coverage Status Go Report Card

Lightweight event management, dispatch tool library implemented by Go

  • Support for custom definition event objects
  • Support for adding multiple listeners to an event
  • Support setting the priority of the event listener, the higher the priority, the first to trigger
  • Support for a set of event listeners based on the event name prefix PREFIX.*.
    • ModeSimple(default) - app.* event listen, trigger app.run app.end, Both will fire the app.* listener
  • New match mode: ModePath
    • * Only match a segment of characters that are not ., allowing for finer monitoring and matching
    • ** matches any number of characters and can only be used at the beginning or end
  • Support for using the wildcard * to listen for triggers for all events
  • Support async trigger event by go channel consumers. use Async(), FireAsync()
  • Complete unit testing, unit coverage > 95%

中文说明请看 README.zh-CN

GoDoc

Install

go get github.com/gookit/event

Main method

  • On/Listen(name string, listener Listener, priority ...int) Register event listener
  • Subscribe/AddSubscriber(sbr Subscriber) Subscribe to support registration of multiple event listeners
  • Trigger/Fire(name string, params M) (error, Event) Trigger event by name and params
  • MustTrigger/MustFire(name string, params M) Event Trigger event, there will be panic if there is an error
  • FireEvent(e Event) (err error) Trigger an event based on a given event instance
  • FireBatch(es ...interface{}) (ers []error) Trigger multiple events at once
  • Async/FireC(name string, params M) Push event to chan, asynchronous consumption processing
  • FireAsync(e Event) Push event to chan, asynchronous consumption processing
  • AsyncFire(e Event) Async fire event by 'go' keywords

Quick start

package main

import (
	"fmt"
	
	"github.com/gookit/event"
)

func main() {
	// Register event listener
	event.On("evt1", event.ListenerFunc(func(e event.Event) error {
		fmt.Printf("handle event: %s\n", e.Name())
		return nil
	}), event.Normal)

	// Register multiple listeners
	event.On("evt1", event.ListenerFunc(func(e event.Event) error {
		fmt.Printf("handle event: %s\n", e.Name())
		return nil
	}), event.High)

	// ... ...

	// Trigger event
	// Note: The second listener has a higher priority, so it will be executed first.
	event.MustFire("evt1", event.M{"arg0": "val0", "arg1": "val1"})
}

Note: The second listener has a higher priority, so it will be executed first.

Using the wildcard

Match mode ModePath

Register event listener and name end with wildcard *:

func main() {
	dbListener1 := event.ListenerFunc(func(e event.Event) error {
		fmt.Printf("handle event: %s\n", e.Name())
		return nil
	})

	event.On("app.db.*", dbListener1, event.Normal)
}

Trigger events on other logic:

func doCreate() {
	// do something ...
	// Trigger event
	event.MustFire("app.db.create", event.M{"arg0": "val0", "arg1": "val1"})
}

func doUpdate() {
	// do something ...
	// Trigger event
	event.MustFire("app.db.update", event.M{"arg0": "val0"})
}

Like the above, triggering the app.db.create app.db.update event will trigger the execution of the dbListener1 listener.

Match mode ModePath

ModePath It is a new pattern of v1.1.0, and the wildcard * matching logic has been adjusted:

  • * Only match a segment of characters that are not ., allowing for finer monitoring and matching
  • ** matches any number of characters and can only be used at the beginning or end
em := event.NewManager("test", event.UsePathMode)

// register listener
em.On("app.**", appListener)
em.On("app.db.*", dbListener)
em.On("app.*.create", createListener)
em.On("app.*.update", updateListener)

// ... ...

// fire event
// TIP: will trigger appListener, dbListener, createListener
em.Fire("app.db.create", event.M{"arg0": "val0", "arg1": "val1"})

Async fire events

Use chan fire events

You can use the Async/FireC/FireAsync method to trigger events, and the events will be written to chan for asynchronous consumption. You can use CloseWait() to close the chan and wait for all events to be consumed.

Added option configuration:

  • ChannelSize Set buffer size for chan
  • ConsumerNum Set how many coroutines to start to consume events
func main() {
	// Note: close event chan on program exit
	defer event.CloseWait()
	// defer event.Close()
	
    // register event listener
    event.On("app.evt1", event.ListenerFunc(func(e event.Event) error {
        fmt.Printf("handle event: %s\n", e.Name())
        return nil
    }), event.Normal)
    
    event.On("app.evt1", event.ListenerFunc(func(e event.Event) error {
        fmt.Printf("handle event: %s\n", e.Name())
        return nil
    }), event.High)
    
    // ... ...
    
    // Asynchronous consumption of events
    event.FireC("app.evt1", event.M{"arg0": "val0", "arg1": "val1"})
}

Note: The event chan should be closed when the program exits. You can use the following method:

  • event.Close() Close chan and no longer accept new events
  • event.CloseWait() Close chan and wait for all event processing to complete

Write event listeners

Using anonymous functions

You can use anonymous function for quick write an event lister.

package mypgk

import (
	"fmt"

	"github.com/gookit/event"
)

var fnHandler = func(e event.Event) error {
	fmt.Printf("handle event: %s\n", e.Name())
	return nil
}

func Run() {
	// register
	event.On("evt1", event.ListenerFunc(fnHandler), event.High)
}

Using the structure method

You can use struct write an event lister, and it should implementation interface event.Listener.

interface:

// Listener interface
type Listener interface {
	Handle(e Event) error
}

example:

Implementation interface event.Listener

package mypgk

import "github.com/gookit/event"

type MyListener struct {
	// userData string
}

func (l *MyListener) Handle(e event.Event) error {
	e.Set("result", "OK")
	return nil
}

Register multiple event listeners

Can implementation interface event.Subscriber for register multiple event listeners at once.

interface:

// Subscriber event subscriber interface.
// you can register multi event listeners in a struct func.
type Subscriber interface {
	// SubscribedEvents register event listeners
	// key: is event name
	// value: can be Listener or ListenerItem interface
	SubscribedEvents() map[string]interface{}
}

Example

Implementation interface event.Subscriber

package mypgk

import (
	"fmt"

	"github.com/gookit/event"
)

type MySubscriber struct {
	// ooo
}

func (s *MySubscriber) SubscribedEvents() map[string]interface{} {
	return map[string]interface{}{
		"e1": event.ListenerFunc(s.e1Handler),
		"e2": event.ListenerItem{
			Priority: event.AboveNormal,
			Listener: event.ListenerFunc(func(e Event) error {
				return fmt.Errorf("an error")
			}),
		},
		"e3": &MyListener{},
	}
}

func (s *MySubscriber) e1Handler(e event.Event) error {
	e.Set("e1-key", "val1")
	return nil
}

Write custom events

If you want to customize the event object or define some fixed event information in advance, you can implement the event.Event interface.

interface:

// Event interface
type Event interface {
	Name() string
	// Target() interface{}
	Get(key string) interface{}
	Add(key string, val interface{})
	Set(key string, val interface{})
	Data() map[string]interface{}
	SetData(M) Event
	Abort(bool)
	IsAborted() bool
}

examples:

package mypgk

import "github.com/gookit/event"

type MyEvent struct {
	event.BasicEvent
	customData string
}

func (e *MyEvent) CustomData() string {
	return e.customData
}

Usage:

e := &MyEvent{customData: "hello"}
e.SetName("e1")
event.AddEvent(e)

// add listener
event.On("e1", event.ListenerFunc(func(e event.Event) error {
	fmt.Printf("custom Data: %s\n", e.(*MyEvent).CustomData())
	return nil
}))

// trigger
event.Fire("e1", nil)
// OR
// event.FireEvent(e)

Note: is used to add pre-defined public event information, which is added in the initialization phase, so it is not locked. Event dynamically created in business can be directly triggered by FireEvent()

Gookit packages

  • gookit/ini Go config management, use INI files
  • gookit/rux Simple and fast request router for golang HTTP
  • gookit/gcli build CLI application, tool library, running CLI commands
  • gookit/slog Lightweight, extensible, configurable logging library written in Go
  • gookit/event Lightweight event manager and dispatcher implements by Go
  • gookit/cache Generic cache use and cache manager for golang. support File, Memory, Redis, Memcached.
  • gookit/config Go config management. support JSON, YAML, TOML, INI, HCL, ENV and Flags
  • gookit/color A command-line color library with true color support, universal API methods and Windows support
  • gookit/filter Provide filtering, sanitizing, and conversion of golang data
  • gookit/validate Use for data validation and filtering. support Map, Struct, Form data
  • gookit/goutil Some utils for the Go: string, array/slice, map, format, cli, env, filesystem, test and more
  • More, please see https://github.com/gookit

LICENSE

MIT

event's People

Contributors

dependabot-preview[bot] avatar dependabot[bot] avatar inhere avatar kanpachi888 avatar lowitea avatar purevirtual avatar relicoftesla avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

event's Issues

Any more examples on how to use some of the capabilities?

Hello,

I am developing a new type of database that will also allow for event triggers when things are added, modified, deleted, etc. as well as a number of other functionality as this will be a type of Graph database.

The use of triggers should substantially the capabilities of the database for the user and so far, your "event" repo seems like it is the best one that I have come across for this purpose.

It would be nice to see a number of different examples showing various capabilities of the library.

Do you have more examples that I could see?
Thanks

event.on("*", xxxx) not work

Event2FurcasTicketCreate              = "kapal.furcas.ticket.create"

event.On("*", &kapal.DefaultNotify)                           // not work
event.On("kapal.furcas.ticket*", &kapal.DefaultNotify         // not work
event.On(kapal.Event2FurcasTicketReset, &kapal.DefaultNotify) // work

异步多线程消费

System (please complete the following information):

  • OS: linux
  • GO Version: 1.20
  • Pkg Version: 1.1.1

Describe the bug

  1. 设置 o.ConsumerNum = 10 多个消费者,与预期不一致(感觉还是只有一个routine在执行)

To Reproduce

// preapre: 此时设置 ConsumerNum = 10, 每个任务耗时1s, 触发100个任务
// expected: 10s左右执行完所有任务
// actual: 执行了 100s左右
func TestChanEvent2(t *testing.T) {
	var em = event.NewManager("default", func(o *event.Options) {
		o.ConsumerNum = 10
	})
	defer em.CloseWait()

	var listener event.ListenerFunc = func(e event.Event) error {
		time.Sleep(1 * time.Second)
		return nil
	}

	em.On("app.evt1", listener, event.Normal)

	for i := 0; i < 100; i++ {
		em.FireAsync(event.New("app.evt1", event.M{"arg0": "val2"}))
	}

	fmt.Println("publish event finished!")
}

Screenshots

image

Additional context

  1. 是有什么特殊设置嘛(打开姿势不对? @RelicOfTesla @inhere 可以帮忙看一下嘛,谢谢~
  2. go 可以多个消费者并发读取 chan啊
    // 这个例子就是 10s 左右完成所有任务
    func TestConcurrentRead(t *testing.T) {
        var wg sync.WaitGroup
        defer wg.Wait()
        ch := make(chan int, 500)
    
        // 1. 启动多个 goroutine 读取数据
        routine(&wg, 10, ch)
    
        // 2. 向 chan 内写数据
        for i := 0; i < 100; i++ {
    	    ch <- i
        }
    
        // 3. 关闭 channel
        close(ch)
    }
    
    func routine(wg *sync.WaitGroup, count int, ch chan int) {
        for i := 0; i < count; i++ {
    	    wg.Add(1)
    	    go func(id int) {
    		    defer wg.Done()
    		    for {
    			    data, ok := <-ch
    			    if !ok {
    				    fmt.Printf("Goroutine %d: Channel closed\n", id)
    				    return
    			    }
    
    			    time.Sleep(1 * time.Second)
    			    fmt.Printf("Goroutine %d: Received %d\n", id, data)
    		    }
    	    }(i)
        }
    }

Possible to add Event Timer Triggers?

Hello,

I am liking your Event library more and more as I think that it really has good potential.

One question is as to how well it scales?

and another question is if it could be extended to allow for setting up of Timed Event Triggers that would fire:

  1. Fire on a particular Time or Date?
  2. Fire periodically as to say every 30 seconds, or minutes, etc. (maybe event shorter or longer time intervals)?
  3. Other ways that times might be used?

Then functions could be activated based upon timed events.

This would be a very useful addition.
Thanks

How to register a subscriber ?

System (please complete the following information):

linux

Describe the bug

I don't understand how to register my subscriber.
I have follow the documentation but my listener registered in the subscriber are not called.
I suppose that i have to "init" the subscriber ? Or is it automatic ?

Maybe it's just that i don't understand how to use it.

Thanks in advance.

I'm here again,V1.1.1 has goroutine leak.please see this eg.

System (please complete the following information):

  • OS: linux
  • GO Version: 1.20
  • Pkg Version: 1.1.1

Describe the bug

goroutine leak!!!

To Reproduce

leak

func concurrentReq() {
	var cnt int32 = 0
	event.On("evt1", event.ListenerFunc(func(e event.Event) error {
		//ZapLog.Debug(fmt.Sprintf("handle event start: %s,data:%v", e.Name(), e.Data()))
		time.Sleep(10 * time.Second)
		//ZapLog.Debug(fmt.Sprintf("handle event end: %s,data:%v", e.Name(), e.Data()))
		atomic.AddInt32(&cnt, 1)
		return nil
	}), event.Normal)

	for i := 0; i < 10000; i++ {
		go func(num int) {
			err, _ := event.Fire("evt1", event.M{"task": num})
			if err != nil {
				ZapLog.Debug(fmt.Sprintf("num:%d,event err: %v", err, num))
			}
		}(i)
	}
	time.Sleep(20 * time.Second)

	ZapLog.Debug(fmt.Sprintf("handle event cnt: %d,goroutes:%d", cnt, runtime.NumGoroutine()))
}

the result is:2023-09-01T11:36:44.616+0800 DEBUG library/event.go:12 handle event cnt: 2,goroutes:9999

no leak

func concurrentReq() {
	var cnt int32 = 0
	event.On("evt1", event.ListenerFunc(func(e event.Event) error {
		//ZapLog.Debug(fmt.Sprintf("handle event start: %s,data:%v", e.Name(), e.Data()))
		//time.Sleep(10 * time.Second)
		//ZapLog.Debug(fmt.Sprintf("handle event end: %s,data:%v", e.Name(), e.Data()))
		atomic.AddInt32(&cnt, 1)
		return nil
	}), event.Normal)

	for i := 0; i < 10000; i++ {
		go func(num int) {
			err, _ := event.Fire("evt1", event.M{"task": num})
			if err != nil {
				ZapLog.Debug(fmt.Sprintf("num:%d,event err: %v", err, num))
			}
		}(i)
	}
	time.Sleep(20 * time.Second)

	ZapLog.Debug(fmt.Sprintf("handle event cnt: %d,goroutes:%d", cnt, runtime.NumGoroutine()))
}

the result is:2023-09-01T11:36:11.441+0800 DEBUG library/event.go:12 handle event cnt: 10000,goroutes:1

异步事件,不执行

event.Async("new.member", event.M{"memberId": member.ID, "superiorId": superiorId})

我这有 投递了一个异步事件, 经常发现 这个事件执行了几次 后面都没 执行了,
换成同步方法
event.Fire("new.member", event.M{"memberId": member.ID, "superiorId": superiorId})

fire 就都能成功

见图片,并发下数据会有问题

func (em *Manager) Fire(name string, params M) (err error, e Event) {
	name = goodName(name)

	// NOTICE: must check the '*' global listeners
	if false == em.HasListeners(name) && false == em.HasListeners(Wildcard) {
		// has group listeners. "app.*" "app.db.*"
		// eg: "app.db.run" will trigger listeners on the "app.db.*"
		pos := strings.LastIndexByte(name, '.')
		if pos < 0 || pos == len(name)-1 {
			return // not found listeners.
		}

		groupName := name[:pos+1] + Wildcard // "app.db.*"
		if false == em.HasListeners(groupName) {
			return // not found listeners.
		}
	}

	// call listeners use defined Event
	if e, ok := em.events[name]; ok {
		if params != nil {
			e.SetData(params)
		}

		err = em.FireEvent(e)
		return err, e
	}

	// create a basic event instance
	e = em.newBasicEvent(name, params)
	// call listeners handle event
	err = em.FireEvent(e)
	return
}

这个函数的如下逻辑

if e, ok := em.events[name]; ok {
		if params != nil {
			e.SetData(params)
		}

		err = em.FireEvent(e)
		return err, e
	}

在并发下会有数据互相覆盖的情况

支持rocketmq Kafka之类的驱动吗

System (please complete the following information):

  • OS: linux [e.g. linux, macOS]
  • GO Version: 1.13 [e.g. 1.13]
  • Pkg Version: 1.1.1 [e.g. 1.1.1]

Describe the bug

A clear and concise description of what the bug is.

To Reproduce

// go code

Expected behavior

A clear and concise description of what you expected to happen.

Screenshots

If applicable, add screenshots to help explain your problem.

Additional context

Add any other context about the problem here.

RemoveListener not support closure function

func makeFn(a int) event.ListenerFunc {
	return func(e event.Event) error {
		fmt.Println(a)
		return nil
	}
}
func main() {
	evBus := event.NewManager("")
	f1 := makeFn(11)
	evBus.On("evt1", f1)
	f2 := makeFn(22)
	evBus.On("evt1", f2)
	evBus.RemoveListener("evt1", f1) // DON'T REMOVE ALL !!!
	evBus.MustFire("evt1", event.M{"arg0": "val0", "arg1": "val1"})
}

functionality Wildcard

Thanks a lot for the library. Let me ask a question.
Are there any plans to implement the functionality Wildcard event name ex. "eve.some.*.*" and "eve.some.*.run"?

使用自定义事件,并发会有问题

for {
		time.Sleep(1 * time.Second)
		go fire()
		go fire()
		go fire()
}
func fire() {
	//mu.Lock()
	defer func() {
		//mu.Unlock()
		if panicErr := recover(); panicErr != nil {
			fmt.Printf("xxxxx %s", panicErr)
			//logrus.Errorf("[Recovery from panic]", zap.Any("panicErr:", panicErr), zap.String("stack", string(debug.Stack())))
		}
	}()
	d := &listener.DisposeFile{
		DevName: "aaa",
		DevIp:   "xczxc",
	}
	e := &listener.MyEvent{DisposeFile: d}
	e.SetName("e3")
	event.AddEvent(e)
	event.Fire(e.Name(), nil)
}
type Manager struct {
	sync.Mutex
	// EnableLock enable lock on fire event.
	EnableLock bool
	// name of the manager
	name string
	// pool sync.Pool
	// is a sample for new BasicEvent
	sample *BasicEvent
	// storage user custom Event instance. you can pre-define some Event instances.
	events map[string]Event
	// storage all event name and ListenerQueue map
	listeners map[string]*ListenerQueue
	// storage all event names by listened
	listenedNames map[string]int
}

代码使用events map[string]Event 存放事件,存在并发写入map的问题

Channels on Fire func

The library is very good.

But I am interested in working under high load.

Have you thought about adding channel support to event.Fire()? That would be able to handle load peaks?

建议 Event Name 检查出错后返回 error,而不是直接 panic

目前库中,检查 Event Name 不符合规则后会直接 panic,逻辑隐蔽,处理粗暴,很容易被坑。

建议改成返回 error

相关代码见 util.go:

// goodName check event name is valid.
func goodName(name string, isReg bool) string {
	name = strings.TrimSpace(name)
	if name == "" {
		panic("event: the event name cannot be empty")
	}

	// on add listener
	if isReg {
		if name == AllNode || name == Wildcard {
			return Wildcard
		}
		if strings.HasPrefix(name, AllNode) {
			return name
		}
	}

	if !goodNameReg.MatchString(name) {
		panic(`event: name is invalid, must match regex:` + goodNameReg.String())
	}
	return name
}

event.on("*", xxxx) not work

Event2FurcasTicketCreate              = "kapal.furcas.ticket.create"

event.On("*", &kapal.DefaultNotify)                           // not work
event.On("kapal.furcas.ticket*", &kapal.DefaultNotify)         // not work
event.On(kapal.Event2FurcasTicketReset, &kapal.DefaultNotify) // work

这是最小可复现代码

type test_notify struct {}

var is_Run = false

func (notify *test_notify) Handle(e event.Event) error {
	is_Run = true
	return nil
}

func TestTicketCreate_Handle_1(t *testing.T) {
	TestNotify := test_notify{}
	event.On("*", &TestNotify)
	err, _ := event.Fire("test_notify", event.M{})
	assert.Nil(t, err)
	assert.NotEqual(t, is_Run, true)

	event.On("test_notify", &TestNotify)
	err, _ = event.Fire("test_notify", event.M{})
	assert.Nil(t, err)
	assert.Equal(t, is_Run, true)
}

AwaitFire leak, chan not close

func (em *Manager) AwaitFire(e Event) (err error) {
	ch := make(chan error)

	go func(e Event) {
		err := em.FireEvent(e)
		ch <- err
	}(e)

	err = <-ch
                      // MUST    close(ch)
	return
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.