Comments (2)
能有可以复现的例子吗?
from dtm.
具体的示例的如下,有两个文件,一个是test_dtm.go,代码如下:
package main
import (
"crypto/md5"
"fmt"
"github.com/bwmarrin/snowflake"
"github.com/dtm-labs/client/dtmcli"
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
"github.com/google/uuid"
"log"
"strings"
)
func CryptToMD5(v1 []byte, v2 []byte, uppercase bool) string {
var rs = make([]string, 0)
m := md5.New()
m.Write(v1)
bm := m.Sum(v2)
for _, v := range bm {
if uppercase {
rs = append(rs, fmt.Sprintf("%02X", v))
} else {
rs = append(rs, fmt.Sprintf("%02x", v))
}
}
return strings.Join(rs, "")
/*return hex.EncodeToString(h.Sum(format.ToByte(v2)))*/ //第二种返回字符串的方法,返回的参数是小写
}
func NewGid() string {
u := uuid.New()
return fmt.Sprintf("%s", u)
}
func TransXa(serverUrl, businessUrl string) {
gid := NewGid()
transInReq := &gin.H{
"transInUserId": 1001,
"amount": 100,
}
transOutReq := &gin.H{
"transOutUserId": 1002,
"amount": 100,
}
err := dtmcli.XaGlobalTransaction(serverUrl, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
resp, err := xa.CallBranch(transOutReq, businessUrl+"/trans-out-xa")
if err != nil {
return resp, err
}
return xa.CallBranch(transInReq, businessUrl+"/trans-in-xa")
})
if err != nil {
log.Fatal(err)
}
log.Printf("transaction:%s xa success", gid)
}
func main() {
serverUrl := "http://192.168.0.101:31980/api/dtmsvr"
businessUrl := "http://192.168.0.101:8686/api/v1/business"
TransXa(serverUrl, businessUrl)
}
一个文件是test_dtm_http_xa.go,代码如下:
package main
import (
"database/sql"
"fmt"
"github.com/dtm-labs/client/dtmcli"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/go-playground/locales/en"
"github.com/go-playground/locales/zh"
ut "github.com/go-playground/universal-translator"
"github.com/go-playground/validator/v10"
enTranslations "github.com/go-playground/validator/v10/translations/en"
zhTranslations "github.com/go-playground/validator/v10/translations/zh"
jsoniter "github.com/json-iterator/go"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/pkg/errors"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"log"
"net/http"
"net/url"
"time"
)
type TransInXaRequest struct {
TransInUserId int `json:"transInUserId" binding:"required" desc:"转入账号id"` //转入账号id
Amount float64 `json:"amount" binding:"required" desc:"转入金额"` //转入金额
}
type TransOutXaRequest struct {
TransOutUserId int `json:"transOutUserId" binding:"required" desc:"转出账号id"` //转出账号id
Amount float64 `json:"amount" binding:"required" desc:"转出金额"` //转出金额
}
func initValidator(locale string) (ut.Translator, error) {
if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
zhTranslator := zh.New()
enTranslator := en.New()
uni := ut.New(enTranslator, zhTranslator, enTranslator)
translator, ok := uni.GetTranslator(locale)
if !ok {
return nil, errors.New("init validator failed")
}
var err error
switch locale {
case "en":
err = enTranslations.RegisterDefaultTranslations(v, translator)
case "zh":
err = zhTranslations.RegisterDefaultTranslations(v, translator)
default:
err = enTranslations.RegisterDefaultTranslations(v, translator)
}
return translator, err
}
return nil, errors.New("init validator failed")
}
func ValidateError(ctx *gin.Context, err error) bool {
if err == nil {
return false
}
errs, ok := err.(validator.ValidationErrors)
if ok {
translator, err := initValidator("zh")
if err == nil {
return false
}
errMsg, _ := jsoniter.Marshal(errs.Translate(translator))
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": "validate request failed:" + string(errMsg),
})
} else {
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": "read request failed:" + err.Error(),
})
}
return true
}
func ResponseFailed(ctx *gin.Context, message string, err error) {
if err == nil {
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": message + "!",
})
} else {
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": message + "," + err.Error(),
})
}
}
func ResponseSuccess(ctx *gin.Context, message string, data ...interface{}) {
if len(data) > 0 {
ctx.JSON(http.StatusOK, gin.H{
"code": 0,
"message": message + "!",
"data": data[0],
})
} else {
ctx.JSON(http.StatusOK, gin.H{
"code": 0,
"message": message + "!",
})
}
}
func ResponseWithStatusCode(ctx *gin.Context, statusCode int, code int, message string, data ...interface{}) {
if statusCode != 200 {
code = statusCode
}
if len(data) > 0 {
ctx.JSON(statusCode, gin.H{
"code": code,
"message": message,
"data": data[0],
})
} else {
ctx.JSON(statusCode, gin.H{
"code": code,
"message": message,
})
}
}
type DBConf struct {
User string `yaml:"user" desc:"用户名"`
Password string `yaml:"password" desc:"用户密码"`
Host string `yaml:"host" desc:"主机名"`
Port int `yaml:"port" desc:"主机端口"`
Database string `yaml:"database" desc:"数据库名"`
Dialect string `yaml:"dialect" desc:"数据库类型"`
DBSource string `yaml:"db_source" desc:"数据库源"`
DBDebug bool `yaml:"db_debug" desc:"是否输出gorm数据库调试语句"`
MaxAge int `yaml:"max_age" desc:"日志最大保留时间"`
RotateTimeLevel int `yaml:"rotate_time_level" desc:"日志分片时间等级 0 自定义时间分片 1 日分片 2 1小时分片 3 1分钟分片"`
RotateTime int `yaml:"rotate_time" desc:"自定义时间分片时长 单位为:min"`
}
const RotateByTimestamp = 0 //自定义时间分片
const RotateByDate = 1 //日分片
const RotateByHour = 2 //1小时分片
const RotateByMinute = 3 //1分钟分片
type Dao struct {
DB *gorm.DB
}
func NewDBFromRawDB(db *sql.DB, dbConf DBConf) (*gorm.DB, error) {
var newDb *gorm.DB
var err error
if dbConf.DBDebug {
var err error
var loggerWriteSyncer *rotatelogs.RotateLogs
var loggerFileName = fmt.Sprintf("%s-db-debug.log", dbConf.Database)
switch dbConf.RotateTimeLevel {
case RotateByTimestamp:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d%H%M",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
rotatelogs.WithRotationTime(time.Duration(dbConf.RotateTime)*time.Minute))
case RotateByDate:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour))
case RotateByHour:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d%H",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
rotatelogs.WithRotationTime(time.Hour))
case RotateByMinute:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d%H%M",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
rotatelogs.WithRotationTime(time.Minute))
}
if err != nil {
return nil, err
}
dbLogger := logger.New(log.New(loggerWriteSyncer, "\r\n", log.LstdFlags), logger.Config{
SlowThreshold: time.Second,
LogLevel: logger.Silent,
Colorful: true,
})
switch dbConf.Dialect {
case "mysql":
newDb, err = gorm.Open(mysql.New(mysql.Config{
Conn: db,
}), &gorm.Config{
Logger: dbLogger,
})
break
case "postgres":
newDb, err = gorm.Open(postgres.New(postgres.Config{
Conn: db,
}), &gorm.Config{
Logger: dbLogger,
})
break
}
newDb = newDb.Debug()
} else {
switch dbConf.Dialect {
case "mysql":
newDb, err = gorm.Open(mysql.New(mysql.Config{
Conn: db,
}))
break
case "postgres":
newDb, err = gorm.Open(postgres.New(postgres.Config{
Conn: db,
}))
break
}
}
return newDb, err
}
func (dao *Dao) XaLocalTransaction(qs url.Values, f func(dao *Dao) error) (err error) {
dbConf := DBConf{
User: "root",
Password: "CodeMan2022080^2*1",
Host: "127.0.0.1",
Port: 3306,
Database: "test",
Dialect: "mysql",
DBSource: "root:CodeMan2022080^2*1@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local",
DBDebug: true,
RotateTime: 1,
RotateTimeLevel: 1,
}
err = dtmcli.XaLocalTransaction(qs, dtmcli.DBConf{
Driver: dbConf.Dialect,
Host: dbConf.Host,
Port: int64(dbConf.Port),
User: dbConf.User,
Password: dbConf.Password,
Db: dbConf.Database,
}, func(db *sql.DB, xa *dtmcli.Xa) error {
dao := &Dao{}
dao.DB, err = NewDBFromRawDB(db, dbConf)
return f(dao)
})
return
}
func (dao *Dao) TransInXa(userId int, amount float64) error {
result := dao.DB.Exec("update user_account set balance = balance + ? where user_id = ?", amount, userId)
if result.Error != nil {
return result.Error
}
return nil
}
func (dao *Dao) TransOutXa(userId int, amount float64) error {
result := dao.DB.Exec("update user_account set balance = balance - ? where user_id = ?", amount, userId)
if result.Error != nil {
return result.Error
}
return nil
}
func transInXa(ctx *gin.Context) {
var req TransInXaRequest
err := ctx.ShouldBind(&req)
if ValidateError(ctx, err) {
return
}
qs := ctx.Request.URL.Query()
dao := Dao{}
err = dao.XaLocalTransaction(qs, func(dao *Dao) error {
err = dao.TransInXa(req.TransInUserId, req.Amount)
if err != nil {
return errors.New("数据库更新出错!")
}
return err
})
if err != nil {
ResponseWithStatusCode(ctx, 409, 409, "trans in xa failed:"+err.Error())
return
}
ResponseSuccess(ctx, "trans in xa success")
}
func transOutXa(ctx *gin.Context) {
var req TransOutXaRequest
err := ctx.ShouldBind(&req)
if ValidateError(ctx, err) {
return
}
qs := ctx.Request.URL.Query()
dao := Dao{}
err = dao.XaLocalTransaction(qs, func(dao *Dao) error {
err = dao.TransOutXa(req.TransOutUserId, req.Amount)
if err != nil {
return errors.New("数据库更新出错!")
}
return err
})
if err != nil {
ResponseWithStatusCode(ctx, 409, 409, "trans out xa failed:"+err.Error())
return
}
ResponseSuccess(ctx, "trans out xa success")
}
func main() {
router := gin.Default()
router.POST("/api/v1/business/trans-in-xa", transInXa)
router.POST("/api/v1/business/trans-out-xa", transOutXa)
router.Run(":8686")
}
运行环境:golang版本1.21,mysql版本8.0.32
先运行test_dtm_http_xa.go,再运行test_dtm.go,就会出现dtm显示xa事务提交成功,但是实际没有成功提交xa事务的情况。
from dtm.
Related Issues (20)
- Azure MySQL SSL-mode requird
- Expect the Server to support SqlServer storage
- Lua not supported? HOT 1
- mod\github.com\dtm-labs\[email protected]\dtmimp\vars.go:48:22: undefined: dtmdriver.GetHTTPDriver HOT 2
- 并发情况下 偶尔会出现死锁 HOT 2
- 基于mysql做存储时间长了会不会影响性能? HOT 2
- 为什么二阶消息回查不直接由DTM HOT 2
- k8s部署时,如何配置MicroService的EndPoint? HOT 1
- 性能测试和文档内描述的差距较大 HOT 1
- tcc模式下,Cancel阶段怎么拿到Try或者Confirm阶段生成的数据 HOT 1
- java客户端的子事务屏蔽实现不完整 HOT 1
- Can't go mod tidy HOT 3
- nacos HOT 8
- dtmcli-java的api问题 HOT 4
- docker compose 怎么配置文件读取配置文件 HOT 3
- dtmimp.DBExec需要返回数据ID HOT 1
- C# SAGA模式下,接口请求超时,会一直重试
- 怎么关闭浏览器的admin页面访问
- 微服务架构鉴权问题
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from dtm.