Comments (9)
package main
import (
"github.com/shima-park/agollo"
"fmt"
"os"
)
var cluster = []string {"config.properties","init.properties"}
func main() {
for _,i :=range cluster{
fmt.Println(i)
// 通过默认根目录下的app.properties初始化agollo
err := agollo.InitWithDefaultConfigFile(
agollo.WithLogger(agollo.NewLogger(agollo.LoggerWriter(os.Stdout))), // 打印日志信息
agollo.Cluster(i),
agollo.DefaultNamespace("1.1.1.1"),
agollo.PreloadNamespaces("1.1.1.1"), // 预先加载的namespace列表,如果是通过配置启动,会在app.properties配置的基础上追加
agollo.AutoFetchOnCacheMiss(), // 在配置未找到时,去apollo的带缓存的获取配置接口,获取配置
agollo.FailTolerantOnBackupExists(), // 在连接apollo失败时,如果在配置的目录下存在.agollo备份配置,会读取备份在服务器无法连接的情况下
)
if err != nil {
panic(err)
}
/*
通过指定配置文件地址的方式初始化
agollo.InitWithConfigFile(configFilePath string, opts ....Option)
参数形式初始化agollo的方式,适合二次封装
agollo.Init(
"localhost:8080",
"AppTest",
opts...,
)
*/
/*
// 获取默认配置中cluster=default namespace=application key=Name的值
fmt.Println("Name:", agollo.Get("Name"))
// 获取默认配置中cluster=default namespace=application key=Name的值,提供默认值返回
fmt.Println("YourConfigKey:", agollo.Get("YourConfigKey", agollo.WithDefault("YourDefaultValue")))
// 获取默认配置中cluster=default namespace=Test.Namespace key=Name的值,提供默认值返回
fmt.Println("YourConfigKey2:", agollo.Get("YourConfigKey2", agollo.WithDefault("YourDefaultValue"), agollo.WithNamespace("YourNamespace")))
// 获取namespace下的所有配置项
fmt.Println("Configuration of the namespace:", agollo.GetNameSpace("application"))
// TEST.Namespace1是非预加载的namespace
// agollo初始化是带上agollo.AutoFetchOnCacheMiss()可选项的话
// 陪到非预加载的namespace,会去apollo缓存接口获取配置
// 未配置的话会返回空或者传入的默认值选项
fmt.Println(agollo.Get("Name", agollo.WithDefault("foo"), agollo.WithNamespace("TEST.Namespace1")))
*/
// 如果想监听并同步服务器配置变化,启动apollo长轮训
// 返回一个期间发生错误的error channel,按照需要去处理
errorCh := agollo.Start()
// 监听apollo配置更改事件
// 返回namespace和其变化前后的配置,以及可能出现的error
watchCh := agollo.Watch()
go func() {
for {
select {
case err := <-errorCh:
fmt.Println("Error:", err)
case update := <-watchCh:
fmt.Println("Apollo Update:", update)
}
}
}()
select {}
}
}
这样会一致阻塞
from agollo.
(1)一致阻塞
示例代码最后有一段空的select{},为了在监听的时候去修改apollo配置,查看输出效果,阻塞不让进程退出。
当然,直接去掉,并将go func(){}中的代码拿出来也可达到这个效果。
实际你自己的业务代码中可以去除。
(2)配置监听多个cluster
你需要通过agollo.New(configServerURL, appID string, opts ...Option),
分别为每个集群创建一个agollo接口对象。你这里循环调用agollo.Init,
实际会将内部存储的Cluster字段修改成数组最后一个Cluster的值。
for _, cluster := range appConfig.Clusters {
newAgollo, err := agollo.New(
appConfig.ConfigServerUrl,
appConfig.AppId,
// your agollo options...
)
if err != nil {
panic(err)
}
val := newAgollo.GetNameSpace(appConfig.NamespaceName)
// your business code
}
如果你要监听各个集群的修改,最好自己再封装一下,为集群新建的多个agollo接口对象的
agollo.Start(),agollo.Watch()事件启动/监听方法组合起来。
type Cluster struct{
ctx context.Context
agollos map[string]agollo.Agollo
}
func (c *Cluster) Start() chan error {
errorCh := make(chan error)
for cluster, clusterAgollo := range c.agollos{
go func(){
for{
select{
case <-ctx.Done():
case err <- clusterAgollo.Start():
errorCh <- fmt.Errorf("Cluster:%s Error:%v", cluster, err)
}
}
}()
}
return errorCh
}
watch类似
from agollo.
请教,golang初学~~~~~~~~
请教我监听多个cluster,实现每间隔10s 检查一次临时文件(配置中心获取的配置)与业务程序的配置文件md5 是否一样,若不一样实现覆盖重启动作。 如下我在select --case -- default 中加入了default, default 中实现文件校验动作。。。 但是加入之后只实现每10s 检查一次,当配置发生变化获取不到监听事件~ ~~ 感谢!
`package main
import (
"github.com/astaxie/beego/config"
"github.com/shima-park/agollo"
"strings"
"fmt"
"os"
"io"
"log"
"os/exec"
"io/ioutil"
"crypto/md5"
"time"
"reflect"
)
const (
// 时间格式化
timeFormat = "20060102150405"
timeSleep = time.Second * 10
)
var (
appConfig *Config
)
type Config struct {
ConfigServerUrl string json:"config_server_url"
AppId string json:"app_id"
Clusters []string json:"clusters"
NamespaceName string json:"namespace_name"
ConfigDir string json:"config_dir"
ConfigBackDir string json:"config_back_dir"
TmpDir string json:"tmp_dir"
ServerRestartCmd string json:"server_restart_cmd"
TimeSleep int json:"time_sleep"
}
func initConfig(confType, confFile string) {
conf, err := config.NewConfig(confType, confFile)
if err != nil {
fmt.Println("config failed, err : \n", err)
return
}
appConfig = &Config{}
appConfig.ConfigServerUrl = conf.String("apollo::configServerUrl")
if len(appConfig.ConfigServerUrl) == 0 {
appConfig.ConfigServerUrl = "127.0.0.1:8080"
}
appConfig.AppId = conf.String("apollo::appId")
if len(appConfig.AppId) == 0 {
appConfig.AppId = "SimpleApp"
}
clusters := conf.String("apollo::cluster")
if len(clusters) == 0 {
log.Println("get App config error \n")
return
}
appConfig.Clusters = strings.Split(clusters, ",")
log.Printf("This App Cluster cfgs: %s\n", appConfig.Clusters)
appConfig.NamespaceName = conf.String("apollo::namespaceName")
if len(appConfig.NamespaceName) == 0 {
appConfig.NamespaceName = "127.0.0.1"
}
appConfig.ConfigDir = conf.String("apollo::configDir")
if len(appConfig.ConfigDir) == 0 {
appConfig.ConfigDir = "."
}
appConfig.ConfigBackDir = conf.String("apollo::configBackDir")
if len(appConfig.ConfigBackDir) == 0 {
appConfig.ConfigBackDir = "/home/bak"
}
appConfig.TmpDir = conf.String("apollo::tmpDir")
if len(appConfig.TmpDir) == 0 {
appConfig.TmpDir = "/home/tmp"
}
appConfig.TimeSleep, err = conf.Int("apollo::timeSleep")
if err != nil {
appConfig.TimeSleep = 10
}
appConfig.ServerRestartCmd = conf.String("apollo::serverRestartCmd")
if len(appConfig.ServerRestartCmd) == 0 {
log.Println("The server restart command is null")
return
}
log.Println("All fileConfig init success \n")
}
func writeNamespaceInfo(cluster string) {
err := agollo.Init(
appConfig.ConfigServerUrl,
appConfig.AppId,
agollo.Cluster(cluster),
//agollo.PreloadNamespaces(appConfig.NamespaceName),
agollo.AutoFetchOnCacheMiss(),
agollo.FailTolerantOnBackupExists(),
agollo.WithLogger(agollo.NewLogger(agollo.LoggerWriter(os.Stdout))),
)
if err != nil {
panic(err)
}
tmpFile := fmt.Sprintf("%s/%s", appConfig.TmpDir, cluster)
file, err := os.Open(tmpFile)
if err != nil && os.IsNotExist(err) {
file, _ = os.Create(tmpFile)
defer file.Close()
}
fount, err := os.OpenFile(tmpFile, os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
panic(err)
}
//defer fount.Close()
res := agollo.GetNameSpace(appConfig.NamespaceName)
for k, v := range res {
data := fmt.Sprintf("%s=%s\n", k, v)
//fmt.Println(data)
if _, err := fount.WriteString(data); err != nil {
panic(err)
}
}
log.Printf("[%s] write Success\n", tmpFile)
}
func checkExists(path string) bool {
_, err := os.Stat(path)
if err != nil {
if os.IsExist(err) {
return true
}
if err := os.MkdirAll(path, os.ModePerm); err != nil {
log.Printf("%s mkdir failed \n", path)
//panic(err)
return false
}
return true
}
return true
}
func copyFile(dstName, srcName string) (written int64, err error) {
// 目标 、源文件
src, err := os.Open(srcName)
if err != nil {
return
}
defer src.Close()
dst, err := os.Create(dstName)
if err != nil {
return
}
defer dst.Close()
return io.Copy(dst, src)
}
func updateLocalFile(configFile string) {
nowTime := time.Now().Format(timeFormat)
// checkDir
if checkExists(appConfig.ConfigDir) == true {
//fmt.Printf("ConfigDir is exists, [%s] \n",appConfig.ConfigDir)
log.Printf("ConfigDir is exists, [%s] \n", appConfig.ConfigDir)
}
appBackupDir := fmt.Sprintf("%s/%s", appConfig.ConfigBackDir, appConfig.AppId)
if checkExists(appBackupDir) == true {
log.Printf("AppBackDir is exists, [%s] \n", appBackupDir)
}
// backup
oldFile := fmt.Sprintf("%s/%s", appConfig.ConfigDir, configFile)
bakFile := fmt.Sprintf("%s/%s.%s", appBackupDir, configFile, nowTime)
_, err := copyFile(bakFile, oldFile)
if err != nil {
log.Printf("%s backup failed ,err : %s \n", configFile, err)
return
}
log.Printf("[%s] backup success,from [%s] to [%s] \n", configFile, oldFile, bakFile)
// cover
tmpFile := fmt.Sprintf("%s/%s", appConfig.TmpDir, configFile)
_, err = copyFile(oldFile, tmpFile)
if err != nil {
log.Printf("%s update failed \n", configFile)
return
}
log.Printf("%s update success \n", configFile)
}
func serverRestart(cmdString string) {
cmd := exec.Command("/bin/bash", "-c", cmdString)
log.Printf("The server restart command is :%s ", cmdString)
//创建获取命令输出管道
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Error:can not obtain stdout pipe for command: %s\n", err)
return
}
//执行命令
if err := cmd.Start(); err != nil {
log.Println("Error:The command is err,", err)
return
}
//读取所有输出
bytes, err := ioutil.ReadAll(stdout)
if err != nil {
log.Println("ReadAll Stdout:", err.Error())
return
}
if err := cmd.Wait(); err != nil {
log.Println("Wait:", err.Error())
return
}
log.Printf("Stdout:\n\n%s", bytes)
}
func md5FileCheck(srcFile, dstFile string) bool {
// 校验两个文件md5
// 原文件md5
src, err := os.Open(srcFile)
if err != nil {
log.Println(err)
return false
}
srcMd5h := md5.New()
io.Copy(srcMd5h, src)
srcH := fmt.Sprintf("%x", srcMd5h.Sum([]byte(""))) //md5
log.Printf("srcFile: %s, md5: %s", srcFile, srcH)
// 新文件md5
dst, err := os.Open(dstFile)
if err != nil {
log.Println(err)
return false
}
dstMd5h := md5.New()
io.Copy(dstMd5h, dst)
dstH := fmt.Sprintf("%x", dstMd5h.Sum([]byte(""))) //md5
log.Printf("dstFile: %s, md5: %s", dstFile, dstH)
if srcH == dstH {
log.Println("The srcFile same as dstFile \n")
return true
} else {
log.Println("The srcFile do not same as dstFile \n")
return false
}
}
func init() {
log.SetPrefix("[goApollo] ")
log.SetFlags(log.LstdFlags | log.Lshortfile)
log.Println("App starting ......")
}
func main() {
initConfig("ini", "goApollo.ini")
for _, cluster := range appConfig.Clusters {
err := agollo.Init(
appConfig.ConfigServerUrl,
appConfig.AppId,
agollo.Cluster(cluster),
agollo.PreloadNamespaces(appConfig.NamespaceName),
agollo.AutoFetchOnCacheMiss(),
agollo.FailTolerantOnBackupExists(),
//agollo.WithLogger(agollo.NewLogger(agollo.LoggerWriter(os.Stdout))),
)
if err != nil {
panic(err)
}
if checkExists(appConfig.TmpDir) == true {
log.Printf("TmpDir is exists, [%s] \n", appConfig.TmpDir)
}
// 判断临时文件
tmpFile := fmt.Sprintf("%s/%s", appConfig.TmpDir, cluster)
file, err := os.Open(tmpFile)
if err != nil && os.IsNotExist(err) {
file, _ = os.Create(tmpFile)
defer file.Close()
}
fount, err := os.OpenFile(tmpFile, os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
panic(err)
}
//defer fount.Close()
res := agollo.GetNameSpace(appConfig.NamespaceName)
for k, v := range res {
data := fmt.Sprintf("%s=%s\n", k, v)
//fmt.Println(data)
if _, err := fount.WriteString(data); err != nil {
panic(err)
}
}
log.Printf("[%s] write success \n", tmpFile)
//updateLocalFile(cluster)
errorCh := agollo.Start()
watchCh := agollo.Watch()
//
go func() {
for {
select {
case err := <-errorCh:
log.Println("Error:", err)
case update := <-watchCh:
log.Println("Apollo Update:", update)
fmt.Println(reflect.TypeOf(update))
for _, cluster := range appConfig.Clusters {
log.Printf("%s have updated \n", cluster)
writeNamespaceInfo(cluster)
updateLocalFile(cluster)
}
serverRestart(appConfig.ServerRestartCmd)
//default:
// fmt.Println(time.Now(),"-------default--------")
// time.Sleep(time.Second*10)
default:
//
//fmt.Println(appConfig.TimeSleep, reflect.TypeOf(appConfig.TimeSleep), "----------")
for _, cluster := range appConfig.Clusters {
// 从配置中心获取的
currentTmpFile := fmt.Sprintf("%s/%s", appConfig.TmpDir, cluster)
//fmt.Println(currentTmpFile)
// 业务程序的配置
serverConfigFile := fmt.Sprintf("%s/%s", appConfig.ConfigDir, cluster)
md5Res := md5FileCheck(currentTmpFile, serverConfigFile)
if md5Res == false {
copyFile(serverConfigFile, currentTmpFile)
serverRestart(appConfig.ServerRestartCmd)
}
}
time.Sleep(timeSleep)
}
}
}()
}
select {}
}
`
from agollo.
- 参考以下示例代码。测试了下是可以的,对应配置改下就好。
!!! star
!!! star
!!! star
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"github.com/shima-park/agollo"
)
var (
ConfigServerUrl = ""
AppId = ""
Namespace = "TEST.Namespace1"
)
func main() {
var (
ctx, cancel = context.WithCancel(context.Background())
)
for _, cluster := range []string{"default", "cluster2", "cluster3"} {
// !你的问题是在这里没用有agollo.New
newAgo, err := agollo.New(
ConfigServerUrl,
AppId,
agollo.Cluster(cluster),
agollo.PreloadNamespaces(Namespace),
agollo.AutoFetchOnCacheMiss(),
agollo.FailTolerantOnBackupExists(),
//agollo.WithLogger(agollo.NewLogger(agollo.LoggerWriter(os.Stdout))),
)
if err != nil {
panic(err)
}
// 读取指定命名空间的配置信息
res := newAgo.GetNameSpace(Namespace)
for k, v := range res {
fmt.Printf("%s=%s\n", k, v)
}
// !以及这里,使用新建的agollo对象开启监听和获取事件通道
errorCh := newAgo.Start()
watchCh := newAgo.Watch()
go func(cluster string) {
for {
select {
case <-ctx.Done():
fmt.Println(cluster, "watch quit...")
return
case err := <-errorCh:
fmt.Println("Error:", err)
case update := <-watchCh:
// 在这里处理你的业务配置文件覆盖动作就可以了,
// apollo修改后的时间会在这里触发到事件。
for k, v := range update.NewValue {
fmt.Printf("%s=%s\n", k, v)
}
// cluster 集群
// update.Namespace 命名控件
// update.OldValue 旧的配置项字典,数据类型map[string]interface{}
// update.NewValue 新的配置项字典,数据类型同上
// update.Error 是否发生错误
fmt.Printf("Apollo cluster(%s) namespace(%s) old_value:(%v) new_value:(%v) error:(%v)\n",
cluster, update.Namespace, update.OldValue, update.NewValue, update.Error)
}
}
}(cluster)
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
fmt.Println("Listening for signals")
select {
// wait on kill signal
case <-ch:
// wait on context cancel
fmt.Println("Signal received, wait on context cancel...")
cancel()
}
}
from agollo.
你好,可能是我的意思没有表述清楚。
我原本意思是我去掉default那一段代码是可以实现客户端接收服务端监听变化的【符合预期】。 然后在此基础上添加一个功能:每间隔10s 检查一次,业务代码的配置文件(serverConfigFile )和从配置中心获取的文件(写在一个临时文件currentTmpFile )是否一致,否则执行覆盖动作~~~
但是我添加了一个default ,sleep 10s 检测的逻辑之后,程序始终走的default~
大神,我原本意思类似 间隔10s 打印一个hello world ,在哪个地方比较合适~~~
因为客户端只能从服务端获取配置,当服务端变化时次才可以触发,但是防止有时候人为在服务器上修改配置文件,而配置中心没有更新 产生的差异。
from agollo.
for + select
select中的case都未触发的时候会一直走default逻辑块。
没有default时,一直阻塞等待case中的事件触发。
按你的意思,参考代码
func main(){
var (
wg sync.WaitGroup
ctx, cancel = context.WithCancel(context.Background())
)
for _, cluster := range appConfig.Clusters {
// ...
wg.Add(1)
go func(cluster string) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case err := <-errorCh:
// 错误处理...
case update := <-watchCh:
// 处理配置文件更新事件...
}
}
}(cluster)
}
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(time.Second * 10)
defer ticker.Close()
for {
select {
case <-ctx.Done():
return
case <-ticker.C: // 每10秒触发下面检查逻辑
for _, cluster := range appConfig.Clusters {
// 从配置中心获取的
currentTmpFile := fmt.Sprintf("%s/%s", appConfig.TmpDir, cluster)
//fmt.Println(currentTmpFile)
// 业务程序的配置
serverConfigFile := fmt.Sprintf("%s/%s", appConfig.ConfigDir, cluster)
md5Res := md5FileCheck(currentTmpFile, serverConfigFile)
if md5Res == false {
copyFile(serverConfigFile, currentTmpFile)
serverRestart(appConfig.ServerRestartCmd)
}
}
}
}
}()
// 收到进程退出信号,通知所有goroutine退出
wg.Add(1)
go func() {
defer wg.Done()
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
fmt.Println("Listening for signals")
select {
case <-ctx.Done():
return
// wait on kill signal
case <-ch:
// wait on context cancel
fmt.Println("Signal received, wait on context cancel...")
cancel()
}
}()
// 等待goroutine退出
wg.Wait()
}
from agollo.
感谢大神,还得学习go func select的知识~
from agollo.
不要叫大神了,担当不起。我只是个写bug的-_-。没有别的问题这个iisue我就关了。
from agollo.
没问题,感谢感谢
from agollo.
Related Issues (20)
- agollo v1.2.8, 启用从备份文件加载时, 在调用GetNameSpace(key)正常时, 会将备份文件的其他namespace的数据删除 HOT 1
- 能否为ApolloClient提供自定义签名入口 HOT 8
- 我不想要备份文件, 可以去掉么, 它无论如何都会创建一个备份文件我关不掉 HOT 6
- go mod tidy 报错 HOT 4
- 此客户端是如何支持多环境配置的? HOT 1
- can't get value by Env? HOT 2
- 没有贡献值不开心啊 HOT 4
- apollo中使用yaml格式的配置文件,viper读取为空 HOT 3
- 与viper 一起用时候怎么切换或者说选择apollo的cluster呢 HOT 4
- 为什么app.properties文件下储存的是json格式而不是property格式? HOT 1
- 为什么除了properties类型的数据能拉到,其他Json,Yaml的都拉不出来呢? HOT 1
- 用golang 1.18构建后拉不到配置,1.17正常。 HOT 5
- 可否加一个关闭备份选项
- 使用viper时,是否能暴露一个配置变更的方法 HOT 2
- agollo.Start()和viper同用时,配置变更后的读取操作会存在并发问题 HOT 1
- viper说明文档 好像不全 HOT 1
- viper 支持 secretKey HOT 1
- 签名计算bug HOT 3
- 启动后提示函数time.UnixMilli()错误
- 依赖的github.com/bketelsen/crypt 版本太低,存在 CWE-285/CWE-770/CWE-125/CWE-863/CWE-79安全漏洞
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from agollo.