Git Product home page Git Product logo

jetcache-go's Introduction

Build Status codeCov Go Repport Card License

Translate to: 简体中文

Introduction

jetcache-go is a general-purpose cache access framework based on go-redis/cache. It implements the core features of the Java version of JetCache, including:

  • ✅ Flexible combination of two-level caching: You can use memory, Redis, or your own custom storage method.
  • ✅ The Once interface adopts the singleflight pattern, which is highly concurrent and thread-safe.
  • ✅ By default, MsgPack is used for encoding and decoding values. Optional sonic and native json.
  • ✅ The default local cache implementation includes TinyLFU and FreeCache.
  • ✅ The default distributed cache implementation is based on go-redis/v8, and you can also customize your own implementation.
  • ✅ You can customize the errNotFound error and use placeholders to prevent cache penetration by caching empty results.
  • ✅ Supports asynchronous refreshing of distributed caches.
  • ✅ Metrics collection: By default, it prints statistical metrics (QPM, Hit, Miss, Query, QueryFail) through logs.
  • ✅ Automatic degradation of distributed cache query failures.
  • ✅ The MGet interface supports the Load function. In a distributed caching scenario, the Pipeline mode is used to improve performance. (v1.1.0+)
  • ✅ Invalidate local caches (in all Go processes) after updates (v1.1.1+)

Installation

To start using the latest version of jetcache-go, you can import the library into your project:

go get github.com/mgtv-tech/jetcache-go

Getting started

Basic Usage

package cache_test

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/go-redis/redis/v8"

	"github.com/mgtv-tech/jetcache-go"
	"github.com/mgtv-tech/jetcache-go/local"
	"github.com/mgtv-tech/jetcache-go/remote"
)

var errRecordNotFound = errors.New("mock gorm.errRecordNotFound")

type object struct {
	Str string
	Num int
}

func mockDBGetObject(id int) (*object, error) {
	if id > 100 {
		return nil, errRecordNotFound
	}
	return &object{Str: "mystring", Num: 42}, nil
}

func mockDBMGetObject(ids []int) (map[int]*object, error) {
	ret := make(map[int]*object)
	for _, id := range ids {
		if id == 3 {
			continue
		}
		ret[id] = &object{Str: "mystring", Num: id}
	}
	return ret, nil
}

func Example_basicUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound))

	ctx := context.TODO()
	key := "mykey:1"
	obj, _ := mockDBGetObject(1)
	if err := mycache.Set(ctx, key, cache.Value(obj), cache.TTL(time.Hour)); err != nil {
		panic(err)
	}

	var wanted object
	if err := mycache.Get(ctx, key, &wanted); err == nil {
		fmt.Println(wanted)
	}
	// Output: {mystring 42}

	mycache.Close()
}

func Example_advancedUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRefreshDuration(time.Minute))

	ctx := context.TODO()
	key := "mykey:1"
	obj := new(object)
	if err := mycache.Once(ctx, key, cache.Value(obj), cache.TTL(time.Hour), cache.Refresh(true),
		cache.Do(func(ctx context.Context) (any, error) {
			return mockDBGetObject(1)
		})); err != nil {
		panic(err)
	}
	fmt.Println(obj)
	// Output: &{mystring 42}

	mycache.Close()
}

func Example_mGetUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRemoteExpiry(time.Minute),
	)
	cacheT := cache.NewT[int, *object](mycache)

	ctx := context.TODO()
	key := "mget"
	ids := []int{1, 2, 3}

	ret := cacheT.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
		return mockDBMGetObject(ids)
	})

	var b bytes.Buffer
	for _, id := range ids {
		b.WriteString(fmt.Sprintf("%v", ret[id]))
	}
	fmt.Println(b.String())
	// Output: &{mystring 1}&{mystring 2}<nil>

	cacheT.Close()
}

func Example_syncLocalUsage() {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs: map[string]string{
			"localhost": ":6379",
		},
	})

	sourceID := "12345678" // Unique identifier for this cache instance
	channelName := "syncLocalChannel"
	pubSub := ring.Subscribe(context.Background(), channelName)

	mycache := cache.New(cache.WithName("any"),
		cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
		cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
		cache.WithErrNotFound(errRecordNotFound),
		cache.WithRemoteExpiry(time.Minute),
		cache.WithSourceId(sourceID),
		cache.WithSyncLocal(true),
		cache.WithEventHandler(func(event *cache.Event) {
			// Broadcast local cache invalidation for the received keys
			bs, _ := json.Marshal(event)
			ring.Publish(context.Background(), channelName, string(bs))
		}),
	)
	obj, _ := mockDBGetObject(1)
	if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
		panic(err)
	}

	go func() {
		for {
			msg := <-pubSub.Channel()
			var event *cache.Event
			if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
				panic(err)
			}
			fmt.Println(event.Keys)

			// Invalidate local cache for received keys (except own events)
			if event.SourceID != sourceID {
				for _, key := range event.Keys {
					mycache.DeleteFromLocalCache(key)
				}
			}
		}
	}()

	// Output: [mykey]
	mycache.Close()
	time.Sleep(time.Second)
}

Configure settings

// Options are used to store cache options.
Options struct {
    name                       string             // Cache name, used for log identification and metric reporting
    remote                     remote.Remote      // Remote is distributed cache, such as Redis.
    local                      local.Local        // Local is memory cache, such as FreeCache.
    codec                      string             // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
    errNotFound                error              // Error to return for cache miss. Used to prevent cache penetration.
    remoteExpiry               time.Duration      // Remote cache ttl, Default is 1 hour.
    notFoundExpiry             time.Duration      // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
    offset                     time.Duration      // Expiration time jitter factor for cache misses.
    refreshDuration            time.Duration      // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
    stopRefreshAfterLastAccess time.Duration      // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
    refreshConcurrency         int                // Maximum number of concurrent cache refreshes. Default is 4.
    statsDisabled              bool               // Flag to disable cache statistics.
    statsHandler               stats.Handler      // Metrics statsHandler collector.
    sourceID                   string             // Unique identifier for cache instance.
    syncLocal                  bool               // Enable events for syncing local cache (only for "Both" cache type).
    eventChBufSize             int                // Buffer size for event channel (default: 100).
    eventHandler               func(event *Event) // Function to handle local cache invalidation events.
}

Cache metrics collection and statistics

You can implement the stats.Handler interface and register it with the Cache component to customize metric collection, for example, using Prometheus to collect metrics. We have provided a default implementation that logs the statistical metrics, as shown below:

2023/09/11 16:42:30.695294 statslogger.go:178: [INFO] jetcache-go stats last 1m0s.
cache       |         qpm|   hit_ratio|         hit|        miss|       query|  query_fail
------------+------------+------------+------------+------------+------------+------------
bench       |   216440123|     100.00%|   216439867|         256|         256|           0|
bench_local |   216440123|     100.00%|   216434970|        5153|           -|           -|
bench_remote|        5153|      95.03%|        4897|         256|           -|           -|
------------+------------+------------+------------+------------+------------+------------

Custom Logger

import "github.com/mgtv-tech/jetcache-go/logger"

// Set your Logger
logger.SetDefaultLogger(l logger.Logger)

Custom Encoding and Decoding

import (
    "github.com/mgtv-tech/jetcache-go"
    "github.com/mgtv-tech/jetcache-go/encoding"
)

// Register your codec
encoding.RegisterCodec(codec Codec)

// Set your codec name
mycache := cache.New("any",
    cache.WithRemote(...),
    cache.WithCodec(yourCodecName string))

Usage Scenarios

Automatic Cache Refresh

jetcache-go provides automatic cache refresh capability to prevent cache avalanche and database overload when cache misses occur. It is suitable for scenarios with a small number of keys, low real-time requirements, and high loading overhead. The code below specifies a refresh every minute, and stops refreshing after 1 hour without access. If the cache is Redis or the last level of a multi-level cache is Redis, the cache loading behavior is globally unique, which means that only one server is refreshing at a time regardless of the number of servers, to reduce the load on the backend.

mycache := cache.New(cache.WithName("any"),
       // ...
       // cache.WithRefreshDuration sets the asynchronous refresh interval
       cache.WithRefreshDuration(time.Minute),
       // cache.WithStopRefreshAfterLastAccess sets the time to cancel the refresh task after the cache key is not accessed
        cache.WithStopRefreshAfterLastAccess(time.Hour))

// `Once` interface starts automatic refresh by `cache.Refresh(true)`
err := mycache.Once(ctx, key, cache.Value(obj), cache.Refresh(true), cache.Do(func(ctx context.Context) (any, error) {
    return mockDBGetObject(1)
}))

MGet Batch Query

MGet utilizes golang generics and the Load function to provide a user-friendly way to batch query entities corresponding to IDs in a multi-level cache. If the cache is Redis or the last level of a multi-level cache is Redis, Pipeline is used to implement read and write operations to improve performance. It's worth noting that for abnormal scenarios (IO exceptions, serialization exceptions, etc.), our design philosophy is to provide lossy services as much as possible to prevent cache penetration.

mycache := cache.New(cache.WithName("any"),
       // ...
       cache.WithRemoteExpiry(time.Minute),
    )
cacheT := cache.NewT[int, *object](mycache)

ctx := context.TODO()
key := "mykey"
ids := []int{1, 2, 3}

ret := mycache.MGet(ctx, key, ids, func(ctx context.Context, ids []int) (map[int]*object, error) {
    return mockDBMGetObject(ids)
})

Codec Selection

jetcache-go implements three serialization and deserialization (codec) methods by default: sonicMsgPack, and native json.

Selection Guide:

  • For high-performance encoding and decoding: If the local cache hit rate is extremely high, but the deserialization operation of converting byte arrays to objects in the local cache consumes a lot of CPU, choose sonic.
  • For balanced performance and extreme storage space: Choose MsgPack, which uses MsgPack encoding and decoding. Content > 64 bytes will be compressed with snappy.

Tip: Remember to import the necessary packages as needed to register the codec.

 _ "github.com/mgtv-tech/jetcache-go/encoding/sonic"

jetcache-go's People

Contributors

daoshenzzg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

jetcache-go's Issues

getSetItemBytesOnce duplicate statistics bugs

getSetItemBytesOnce duplicate statistics bugs

func (c *jetCache) getSetItemBytesOnce(item *item) (b []byte, cached bool, err error) {
	if c.local != nil {
		b, ok := c.local.Get(item.key)
		if ok {
			c.statsHandler.IncrHit()
			c.statsHandler.IncrLocalHit()
			if bytes.Compare(b, notFoundPlaceholder) == 0 {
				return nil, true, c.errNotFound
			}
			return b, true, nil
		}
	}
        
        v, err, _ := c.group.Do(item.key, func() (any, error) {
		b, err := c.getBytes(item.Context(), item.key, item.skipLocal)
		if err == nil {
			cached = true
			return b, nil
                }
       }
      // ....
 
}

func (c *jetCache) getBytes(ctx context.Context, key string, skipLocal bool) ([]byte, error) {
	if !skipLocal && c.local != nil {
		b, ok := c.local.Get(key)
		if ok {
			c.statsHandler.IncrHit()
			c.statsHandler.IncrLocalHit()
			if bytes.Compare(b, notFoundPlaceholder) == 0 {
				return nil, c.errNotFound
			}
			return b, nil
		}
		c.statsHandler.IncrLocalMiss()
	}
}

getSetItemBytesOnce does not use skipLocal

getSetItemBytesOnce does not use skipLocal bug

func (c *jetCache) getSetItemBytesOnce(item *item) (b []byte, cached bool, err error) {
	if c.local != nil {
		b, ok := c.local.Get(item.key)
		if ok {
			c.statsHandler.IncrHit()
			c.statsHandler.IncrLocalHit()
			if bytes.Compare(b, notFoundPlaceholder) == 0 {
				return nil, true, c.errNotFound
			}
			return b, true, nil
		}
	}

}

Feature: Invalidate local caches (in all Go processes) after updates

Motivation

This feature is designed to improve cache consistency for cache.Both mode (local cache + distributed cache). The implementation approach is as follows:

Open up the registration of cache change handling functions to receive cache change events.
Registrants can broadcast the event type, cache key, cache instance name, and cache instance unique ID of the event to other business nodes through MQ.
Upon receiving the broadcast message, nodes invalidate the corresponding local cache keys to achieve cache consistency.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.