Git Product home page Git Product logo

Comments (9)

hardy1970 avatar hardy1970 commented on June 12, 2024

package main

import (
"github.com/shima-park/agollo"
"fmt"
"os"
)

var cluster = []string {"config.properties","init.properties"}

func main() {
for _,i :=range cluster{
fmt.Println(i)

// 通过默认根目录下的app.properties初始化agollo
err := agollo.InitWithDefaultConfigFile(
	agollo.WithLogger(agollo.NewLogger(agollo.LoggerWriter(os.Stdout))), // 打印日志信息
	agollo.Cluster(i),
	agollo.DefaultNamespace("1.1.1.1"),
	agollo.PreloadNamespaces("1.1.1.1"),                          // 预先加载的namespace列表,如果是通过配置启动,会在app.properties配置的基础上追加
	agollo.AutoFetchOnCacheMiss(),                                       // 在配置未找到时,去apollo的带缓存的获取配置接口,获取配置
	agollo.FailTolerantOnBackupExists(),                                 // 在连接apollo失败时,如果在配置的目录下存在.agollo备份配置,会读取备份在服务器无法连接的情况下
)
if err != nil {
	panic(err)
}

/*
	通过指定配置文件地址的方式初始化
	agollo.InitWithConfigFile(configFilePath string, opts ....Option)

	参数形式初始化agollo的方式,适合二次封装
	agollo.Init(
		"localhost:8080",
		"AppTest",
			opts...,
	)
*/

/*
// 获取默认配置中cluster=default namespace=application key=Name的值
fmt.Println("Name:", agollo.Get("Name"))

// 获取默认配置中cluster=default namespace=application key=Name的值,提供默认值返回
fmt.Println("YourConfigKey:", agollo.Get("YourConfigKey", agollo.WithDefault("YourDefaultValue")))

// 获取默认配置中cluster=default namespace=Test.Namespace key=Name的值,提供默认值返回
fmt.Println("YourConfigKey2:", agollo.Get("YourConfigKey2", agollo.WithDefault("YourDefaultValue"), agollo.WithNamespace("YourNamespace")))

// 获取namespace下的所有配置项
fmt.Println("Configuration of the namespace:", agollo.GetNameSpace("application"))

// TEST.Namespace1是非预加载的namespace
// agollo初始化是带上agollo.AutoFetchOnCacheMiss()可选项的话
// 陪到非预加载的namespace,会去apollo缓存接口获取配置
// 未配置的话会返回空或者传入的默认值选项
fmt.Println(agollo.Get("Name", agollo.WithDefault("foo"), agollo.WithNamespace("TEST.Namespace1")))

*/
// 如果想监听并同步服务器配置变化,启动apollo长轮训
// 返回一个期间发生错误的error channel,按照需要去处理
errorCh := agollo.Start()

// 监听apollo配置更改事件
// 返回namespace和其变化前后的配置,以及可能出现的error
watchCh := agollo.Watch()

go func() {
	for {
		select {
		case err := <-errorCh:
			fmt.Println("Error:", err)
		case update := <-watchCh:
			fmt.Println("Apollo Update:", update)
		} 
	}
}()

select {}

}
}

这样会一致阻塞

from agollo.

shima-park avatar shima-park commented on June 12, 2024

(1)一致阻塞
示例代码最后有一段空的select{},为了在监听的时候去修改apollo配置,查看输出效果,阻塞不让进程退出。
当然,直接去掉,并将go func(){}中的代码拿出来也可达到这个效果。
实际你自己的业务代码中可以去除。

(2)配置监听多个cluster
你需要通过agollo.New(configServerURL, appID string, opts ...Option),
分别为每个集群创建一个agollo接口对象。你这里循环调用agollo.Init,
实际会将内部存储的Cluster字段修改成数组最后一个Cluster的值。


for _, cluster := range appConfig.Clusters {
        newAgollo, err := agollo.New(
		appConfig.ConfigServerUrl,
		appConfig.AppId,
		// your agollo options...
	)
	if err != nil {
		panic(err)
	}

	val := newAgollo.GetNameSpace(appConfig.NamespaceName)
         // your business code
}

如果你要监听各个集群的修改,最好自己再封装一下,为集群新建的多个agollo接口对象的
agollo.Start(),agollo.Watch()事件启动/监听方法组合起来。

type Cluster struct{
    ctx context.Context
    agollos map[string]agollo.Agollo
}

func (c *Cluster) Start() chan error {
    errorCh := make(chan error)
    for cluster, clusterAgollo := range c.agollos{
        go func(){
            for{
                select{
                    case <-ctx.Done():
                    case err <- clusterAgollo.Start():
                        errorCh <- fmt.Errorf("Cluster:%s Error:%v", cluster, err)
                }
            }
        }()
    }
    return errorCh
}

watch类似

from agollo.

hardy1970 avatar hardy1970 commented on June 12, 2024

请教,golang初学~~~~~~~~
请教我监听多个cluster,实现每间隔10s 检查一次临时文件(配置中心获取的配置)与业务程序的配置文件md5 是否一样,若不一样实现覆盖重启动作。 如下我在select --case -- default 中加入了default, default 中实现文件校验动作。。。 但是加入之后只实现每10s 检查一次,当配置发生变化获取不到监听事件~ ~~ 感谢!

`package main

import (
"github.com/astaxie/beego/config"
"github.com/shima-park/agollo"
"strings"
"fmt"
"os"
"io"
"log"
"os/exec"
"io/ioutil"
"crypto/md5"
"time"
"reflect"
)

const (
// 时间格式化
timeFormat = "20060102150405"
timeSleep = time.Second * 10
)

var (
appConfig *Config
)

type Config struct {
ConfigServerUrl string json:"config_server_url"
AppId string json:"app_id"
Clusters []string json:"clusters"
NamespaceName string json:"namespace_name"
ConfigDir string json:"config_dir"
ConfigBackDir string json:"config_back_dir"
TmpDir string json:"tmp_dir"
ServerRestartCmd string json:"server_restart_cmd"
TimeSleep int json:"time_sleep"
}

func initConfig(confType, confFile string) {
conf, err := config.NewConfig(confType, confFile)
if err != nil {
fmt.Println("config failed, err : \n", err)
return
}

appConfig = &Config{}

appConfig.ConfigServerUrl = conf.String("apollo::configServerUrl")
if len(appConfig.ConfigServerUrl) == 0 {
	appConfig.ConfigServerUrl = "127.0.0.1:8080"
}

appConfig.AppId = conf.String("apollo::appId")
if len(appConfig.AppId) == 0 {
	appConfig.AppId = "SimpleApp"
}

clusters := conf.String("apollo::cluster")
if len(clusters) == 0 {
	log.Println("get App config error \n")
	return
}
appConfig.Clusters = strings.Split(clusters, ",")
log.Printf("This App Cluster cfgs: %s\n", appConfig.Clusters)

appConfig.NamespaceName = conf.String("apollo::namespaceName")
if len(appConfig.NamespaceName) == 0 {
	appConfig.NamespaceName = "127.0.0.1"
}

appConfig.ConfigDir = conf.String("apollo::configDir")
if len(appConfig.ConfigDir) == 0 {
	appConfig.ConfigDir = "."
}
appConfig.ConfigBackDir = conf.String("apollo::configBackDir")
if len(appConfig.ConfigBackDir) == 0 {
	appConfig.ConfigBackDir = "/home/bak"
}
appConfig.TmpDir = conf.String("apollo::tmpDir")
if len(appConfig.TmpDir) == 0 {
	appConfig.TmpDir = "/home/tmp"
}

appConfig.TimeSleep, err = conf.Int("apollo::timeSleep")
if err != nil {
	appConfig.TimeSleep = 10
}

appConfig.ServerRestartCmd = conf.String("apollo::serverRestartCmd")
if len(appConfig.ServerRestartCmd) == 0 {
	log.Println("The server restart command is null")
	return
}

log.Println("All fileConfig init success \n")

}

func writeNamespaceInfo(cluster string) {
err := agollo.Init(
appConfig.ConfigServerUrl,
appConfig.AppId,
agollo.Cluster(cluster),
//agollo.PreloadNamespaces(appConfig.NamespaceName),
agollo.AutoFetchOnCacheMiss(),
agollo.FailTolerantOnBackupExists(),
agollo.WithLogger(agollo.NewLogger(agollo.LoggerWriter(os.Stdout))),
)
if err != nil {
panic(err)
}

tmpFile := fmt.Sprintf("%s/%s", appConfig.TmpDir, cluster)
file, err := os.Open(tmpFile)
if err != nil && os.IsNotExist(err) {
	file, _ = os.Create(tmpFile)
	defer file.Close()
}

fount, err := os.OpenFile(tmpFile, os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
	panic(err)
}
//defer fount.Close()
res := agollo.GetNameSpace(appConfig.NamespaceName)

for k, v := range res {
	data := fmt.Sprintf("%s=%s\n", k, v)
	//fmt.Println(data)
	if _, err := fount.WriteString(data); err != nil {
		panic(err)
	}
}
log.Printf("[%s] write Success\n", tmpFile)

}

func checkExists(path string) bool {
_, err := os.Stat(path)
if err != nil {
if os.IsExist(err) {
return true
}
if err := os.MkdirAll(path, os.ModePerm); err != nil {
log.Printf("%s mkdir failed \n", path)
//panic(err)
return false
}
return true
}
return true
}

func copyFile(dstName, srcName string) (written int64, err error) {
// 目标 、源文件
src, err := os.Open(srcName)
if err != nil {
return
}
defer src.Close()

dst, err := os.Create(dstName)
if err != nil {
	return
}
defer dst.Close()
return io.Copy(dst, src)

}

func updateLocalFile(configFile string) {
nowTime := time.Now().Format(timeFormat)
// checkDir
if checkExists(appConfig.ConfigDir) == true {
//fmt.Printf("ConfigDir is exists, [%s] \n",appConfig.ConfigDir)
log.Printf("ConfigDir is exists, [%s] \n", appConfig.ConfigDir)
}
appBackupDir := fmt.Sprintf("%s/%s", appConfig.ConfigBackDir, appConfig.AppId)

if checkExists(appBackupDir) == true {
	log.Printf("AppBackDir is exists, [%s] \n", appBackupDir)
}

// backup
oldFile := fmt.Sprintf("%s/%s", appConfig.ConfigDir, configFile)
bakFile := fmt.Sprintf("%s/%s.%s", appBackupDir, configFile, nowTime)
_, err := copyFile(bakFile, oldFile)
if err != nil {
	log.Printf("%s backup failed ,err : %s \n", configFile, err)
	return
}
log.Printf("[%s] backup success,from [%s] to [%s] \n", configFile, oldFile, bakFile)

// cover
tmpFile := fmt.Sprintf("%s/%s", appConfig.TmpDir, configFile)
_, err = copyFile(oldFile, tmpFile)
if err != nil {
	log.Printf("%s update failed \n", configFile)
	return
}
log.Printf("%s update success \n", configFile)

}

func serverRestart(cmdString string) {
cmd := exec.Command("/bin/bash", "-c", cmdString)
log.Printf("The server restart command is :%s ", cmdString)
//创建获取命令输出管道
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Error:can not obtain stdout pipe for command: %s\n", err)
return
}
//执行命令
if err := cmd.Start(); err != nil {
log.Println("Error:The command is err,", err)
return
}

//读取所有输出
bytes, err := ioutil.ReadAll(stdout)
if err != nil {
	log.Println("ReadAll Stdout:", err.Error())
	return
}

if err := cmd.Wait(); err != nil {
	log.Println("Wait:", err.Error())
	return
}
log.Printf("Stdout:\n\n%s", bytes)

}

func md5FileCheck(srcFile, dstFile string) bool {
// 校验两个文件md5
// 原文件md5
src, err := os.Open(srcFile)
if err != nil {
log.Println(err)
return false
}
srcMd5h := md5.New()
io.Copy(srcMd5h, src)
srcH := fmt.Sprintf("%x", srcMd5h.Sum([]byte(""))) //md5
log.Printf("srcFile: %s, md5: %s", srcFile, srcH)

// 新文件md5
dst, err := os.Open(dstFile)
if err != nil {
	log.Println(err)
	return false
}
dstMd5h := md5.New()
io.Copy(dstMd5h, dst)
dstH := fmt.Sprintf("%x", dstMd5h.Sum([]byte(""))) //md5
log.Printf("dstFile: %s, md5: %s", dstFile, dstH)

if srcH == dstH {
	log.Println("The srcFile same as dstFile \n")
	return true
} else {
	log.Println("The srcFile do not same as dstFile \n")
	return false
}

}

func init() {
log.SetPrefix("[goApollo] ")
log.SetFlags(log.LstdFlags | log.Lshortfile)
log.Println("App starting ......")
}

func main() {
initConfig("ini", "goApollo.ini")
for _, cluster := range appConfig.Clusters {

	err := agollo.Init(
		appConfig.ConfigServerUrl,
		appConfig.AppId,
		agollo.Cluster(cluster),
		agollo.PreloadNamespaces(appConfig.NamespaceName),
		agollo.AutoFetchOnCacheMiss(),
		agollo.FailTolerantOnBackupExists(),
		//agollo.WithLogger(agollo.NewLogger(agollo.LoggerWriter(os.Stdout))),
	)
	if err != nil {
		panic(err)
	}

	if checkExists(appConfig.TmpDir) == true {
		log.Printf("TmpDir is exists, [%s] \n", appConfig.TmpDir)
	}
	//  判断临时文件
	tmpFile := fmt.Sprintf("%s/%s", appConfig.TmpDir, cluster)
	file, err := os.Open(tmpFile)
	if err != nil && os.IsNotExist(err) {
		file, _ = os.Create(tmpFile)
		defer file.Close()
	}

	fount, err := os.OpenFile(tmpFile, os.O_TRUNC|os.O_WRONLY, 0644)
	if err != nil {
		panic(err)
	}
	//defer fount.Close()
	res := agollo.GetNameSpace(appConfig.NamespaceName)

	for k, v := range res {
		data := fmt.Sprintf("%s=%s\n", k, v)
		//fmt.Println(data)
		if _, err := fount.WriteString(data); err != nil {
			panic(err)
		}
	}
	log.Printf("[%s] write success \n", tmpFile)

	//updateLocalFile(cluster)

	errorCh := agollo.Start()
	watchCh := agollo.Watch()
	//

	go func() {
		for {

			select {

			case err := <-errorCh:
				log.Println("Error:", err)
			case update := <-watchCh:
				log.Println("Apollo Update:", update)
				fmt.Println(reflect.TypeOf(update))
				for _, cluster := range appConfig.Clusters {
					log.Printf("%s have updated \n", cluster)
					writeNamespaceInfo(cluster)
					updateLocalFile(cluster)

				}
				serverRestart(appConfig.ServerRestartCmd)

				//default:
				//	fmt.Println(time.Now(),"-------default--------")
				//	time.Sleep(time.Second*10)


			default:
				//
				//fmt.Println(appConfig.TimeSleep, reflect.TypeOf(appConfig.TimeSleep), "----------")

				for _, cluster := range appConfig.Clusters {
                                             // 从配置中心获取的
					currentTmpFile := fmt.Sprintf("%s/%s", appConfig.TmpDir, cluster)
					//fmt.Println(currentTmpFile)
                                           //  业务程序的配置
					serverConfigFile := fmt.Sprintf("%s/%s", appConfig.ConfigDir, cluster)
					md5Res := md5FileCheck(currentTmpFile, serverConfigFile)
					if md5Res == false {
						copyFile(serverConfigFile, currentTmpFile)
						serverRestart(appConfig.ServerRestartCmd)
					}

				}

				time.Sleep(timeSleep)



			}

		}
	}()

}

select {}

}
`

from agollo.

shima-park avatar shima-park commented on June 12, 2024
  1. 参考以下示例代码。测试了下是可以的,对应配置改下就好。

!!! star
!!! star
!!! star

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"

	"github.com/shima-park/agollo"
)

var (
	ConfigServerUrl = ""
	AppId           = ""
	Namespace       = "TEST.Namespace1"
)

func main() {
	var (
		ctx, cancel = context.WithCancel(context.Background())
	)
	for _, cluster := range []string{"default", "cluster2", "cluster3"} {
		// !你的问题是在这里没用有agollo.New
		newAgo, err := agollo.New(
			ConfigServerUrl,
			AppId,
			agollo.Cluster(cluster),
			agollo.PreloadNamespaces(Namespace),
			agollo.AutoFetchOnCacheMiss(),
			agollo.FailTolerantOnBackupExists(),
			//agollo.WithLogger(agollo.NewLogger(agollo.LoggerWriter(os.Stdout))),
		)
		if err != nil {
			panic(err)
		}

		// 读取指定命名空间的配置信息
		res := newAgo.GetNameSpace(Namespace)
		for k, v := range res {
			fmt.Printf("%s=%s\n", k, v)
		}

		// !以及这里,使用新建的agollo对象开启监听和获取事件通道
		errorCh := newAgo.Start()
		watchCh := newAgo.Watch()

		go func(cluster string) {
			for {
				select {
				case <-ctx.Done():
                                         fmt.Println(cluster, "watch quit...")
                                         return
				case err := <-errorCh:
					fmt.Println("Error:", err)
				case update := <-watchCh:
					// 在这里处理你的业务配置文件覆盖动作就可以了,
					// apollo修改后的时间会在这里触发到事件。
					for k, v := range update.NewValue {
						fmt.Printf("%s=%s\n", k, v)
					}

					// cluster 集群
					// update.Namespace 命名控件
					// update.OldValue  旧的配置项字典,数据类型map[string]interface{}
					// update.NewValue  新的配置项字典,数据类型同上
					// update.Error     是否发生错误
					fmt.Printf("Apollo cluster(%s) namespace(%s) old_value:(%v) new_value:(%v) error:(%v)\n",
						cluster, update.Namespace, update.OldValue, update.NewValue, update.Error)
				}
			}
		}(cluster)

	}

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
	fmt.Println("Listening for signals")
	select {
	// wait on kill signal
	case <-ch:
		// wait on context cancel
		fmt.Println("Signal received, wait on context cancel...")
		cancel()
	}

}

from agollo.

hardy1970 avatar hardy1970 commented on June 12, 2024

你好,可能是我的意思没有表述清楚。
我原本意思是我去掉default那一段代码是可以实现客户端接收服务端监听变化的【符合预期】。 然后在此基础上添加一个功能:每间隔10s 检查一次,业务代码的配置文件(serverConfigFile )和从配置中心获取的文件(写在一个临时文件currentTmpFile )是否一致,否则执行覆盖动作~~~
但是我添加了一个default ,sleep 10s 检测的逻辑之后,程序始终走的default~

大神,我原本意思类似 间隔10s 打印一个hello world ,在哪个地方比较合适~~~
因为客户端只能从服务端获取配置,当服务端变化时次才可以触发,但是防止有时候人为在服务器上修改配置文件,而配置中心没有更新 产生的差异。

from agollo.

shima-park avatar shima-park commented on June 12, 2024

for + select
select中的case都未触发的时候会一直走default逻辑块。
没有default时,一直阻塞等待case中的事件触发。

按你的意思,参考代码

func main(){
        var (
                wg sync.WaitGroup 
		ctx, cancel = context.WithCancel(context.Background())
	)
	for _, cluster := range appConfig.Clusters {
		// ...
		wg.Add(1)
		go func(cluster string) {
			defer wg.Done()
			for {
				select {
				case <-ctx.Done():
					return
				case err := <-errorCh:
					// 错误处理...
				case update := <-watchCh:
					// 处理配置文件更新事件...
				}
			}
		}(cluster)
	}

	wg.Add(1)
	go func() {
		defer wg.Done()

                 ticker := time.NewTicker(time.Second * 10)
                 defer ticker.Close()
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:  // 每10秒触发下面检查逻辑
				for _, cluster := range appConfig.Clusters {
					// 从配置中心获取的
					currentTmpFile := fmt.Sprintf("%s/%s", appConfig.TmpDir, cluster)
					//fmt.Println(currentTmpFile)
					//  业务程序的配置
					serverConfigFile := fmt.Sprintf("%s/%s", appConfig.ConfigDir, cluster)
					md5Res := md5FileCheck(currentTmpFile, serverConfigFile)
					if md5Res == false {
						copyFile(serverConfigFile, currentTmpFile)
						serverRestart(appConfig.ServerRestartCmd)
					}

				}
			}
		}
	}()

        // 收到进程退出信号,通知所有goroutine退出
        wg.Add(1)
	go func() {
                defer wg.Done()
		ch := make(chan os.Signal, 1)
		signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
		fmt.Println("Listening for signals")
		select {
                case <-ctx.Done():
		        return
		// wait on kill signal
		case <-ch:
			// wait on context cancel
			fmt.Println("Signal received, wait on context cancel...")
			cancel()
		}
	}()
        // 等待goroutine退出
	wg.Wait()
}

from agollo.

hardy1970 avatar hardy1970 commented on June 12, 2024

感谢大神,还得学习go func select的知识~

from agollo.

shima-park avatar shima-park commented on June 12, 2024

不要叫大神了,担当不起。我只是个写bug的-_-。没有别的问题这个iisue我就关了。

from agollo.

hardy1970 avatar hardy1970 commented on June 12, 2024

没问题,感谢感谢

from agollo.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.