Git Product home page Git Product logo

Comments (2)

yedf2 avatar yedf2 commented on May 27, 2024

能有可以复现的例子吗?

from dtm.

bodhi-code avatar bodhi-code commented on May 27, 2024

具体的示例的如下,有两个文件,一个是test_dtm.go,代码如下:

package main

import (
	"crypto/md5"
	"fmt"
	"github.com/bwmarrin/snowflake"
	"github.com/dtm-labs/client/dtmcli"
	"github.com/gin-gonic/gin"
	"github.com/go-resty/resty/v2"
	"github.com/google/uuid"
	"log"
	"strings"
)

func CryptToMD5(v1 []byte, v2 []byte, uppercase bool) string {
	var rs = make([]string, 0)
	m := md5.New()
	m.Write(v1)
	bm := m.Sum(v2)
	for _, v := range bm {
		if uppercase {
			rs = append(rs, fmt.Sprintf("%02X", v))
		} else {
			rs = append(rs, fmt.Sprintf("%02x", v))
		}
	}
	return strings.Join(rs, "")
	/*return hex.EncodeToString(h.Sum(format.ToByte(v2)))*/ //第二种返回字符串的方法,返回的参数是小写
}

func NewGid() string {
	u := uuid.New()
	return fmt.Sprintf("%s", u)
}

func TransXa(serverUrl, businessUrl string) {
	gid := NewGid()
	transInReq := &gin.H{
		"transInUserId": 1001,
		"amount":        100,
	}
	transOutReq := &gin.H{
		"transOutUserId": 1002,
		"amount":         100,
	}
	err := dtmcli.XaGlobalTransaction(serverUrl, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
		resp, err := xa.CallBranch(transOutReq, businessUrl+"/trans-out-xa")
		if err != nil {
			return resp, err
		}
		return xa.CallBranch(transInReq, businessUrl+"/trans-in-xa")
	})
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("transaction:%s xa success", gid)
}

func main() {
	serverUrl := "http://192.168.0.101:31980/api/dtmsvr"
	businessUrl := "http://192.168.0.101:8686/api/v1/business"
	TransXa(serverUrl, businessUrl)
}

一个文件是test_dtm_http_xa.go,代码如下:

package main

import (
	"database/sql"
	"fmt"
	"github.com/dtm-labs/client/dtmcli"
	"github.com/gin-gonic/gin"
	"github.com/gin-gonic/gin/binding"
	"github.com/go-playground/locales/en"
	"github.com/go-playground/locales/zh"
	ut "github.com/go-playground/universal-translator"
	"github.com/go-playground/validator/v10"
	enTranslations "github.com/go-playground/validator/v10/translations/en"
	zhTranslations "github.com/go-playground/validator/v10/translations/zh"
	jsoniter "github.com/json-iterator/go"
	rotatelogs "github.com/lestrrat-go/file-rotatelogs"
	"github.com/pkg/errors"
	"gorm.io/driver/mysql"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
	"gorm.io/gorm/logger"
	"log"
	"net/http"
	"net/url"
	"time"
)

type TransInXaRequest struct {
	TransInUserId int     `json:"transInUserId" binding:"required" desc:"转入账号id"` //转入账号id
	Amount        float64 `json:"amount" binding:"required" desc:"转入金额"`          //转入金额
}

type TransOutXaRequest struct {
	TransOutUserId int     `json:"transOutUserId" binding:"required" desc:"转出账号id"` //转出账号id
	Amount         float64 `json:"amount" binding:"required" desc:"转出金额"`           //转出金额
}

func initValidator(locale string) (ut.Translator, error) {
	if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
		zhTranslator := zh.New()
		enTranslator := en.New()
		uni := ut.New(enTranslator, zhTranslator, enTranslator)
		translator, ok := uni.GetTranslator(locale)
		if !ok {
			return nil, errors.New("init validator failed")
		}
		var err error
		switch locale {
		case "en":
			err = enTranslations.RegisterDefaultTranslations(v, translator)
		case "zh":
			err = zhTranslations.RegisterDefaultTranslations(v, translator)
		default:
			err = enTranslations.RegisterDefaultTranslations(v, translator)
		}
		return translator, err
	}
	return nil, errors.New("init validator failed")
}

func ValidateError(ctx *gin.Context, err error) bool {
	if err == nil {
		return false
	}
	errs, ok := err.(validator.ValidationErrors)
	if ok {
		translator, err := initValidator("zh")
		if err == nil {
			return false
		}
		errMsg, _ := jsoniter.Marshal(errs.Translate(translator))
		ctx.JSON(http.StatusOK, gin.H{
			"code":    -1,
			"message": "validate request failed:" + string(errMsg),
		})
	} else {
		ctx.JSON(http.StatusOK, gin.H{
			"code":    -1,
			"message": "read request failed:" + err.Error(),
		})
	}
	return true
}

func ResponseFailed(ctx *gin.Context, message string, err error) {
	if err == nil {
		ctx.JSON(http.StatusOK, gin.H{
			"code":    -1,
			"message": message + "!",
		})
	} else {
		ctx.JSON(http.StatusOK, gin.H{
			"code":    -1,
			"message": message + "," + err.Error(),
		})
	}
}

func ResponseSuccess(ctx *gin.Context, message string, data ...interface{}) {
	if len(data) > 0 {
		ctx.JSON(http.StatusOK, gin.H{
			"code":    0,
			"message": message + "!",
			"data":    data[0],
		})
	} else {
		ctx.JSON(http.StatusOK, gin.H{
			"code":    0,
			"message": message + "!",
		})
	}
}

func ResponseWithStatusCode(ctx *gin.Context, statusCode int, code int, message string, data ...interface{}) {
	if statusCode != 200 {
		code = statusCode
	}
	if len(data) > 0 {
		ctx.JSON(statusCode, gin.H{
			"code":    code,
			"message": message,
			"data":    data[0],
		})
	} else {
		ctx.JSON(statusCode, gin.H{
			"code":    code,
			"message": message,
		})
	}
}

type DBConf struct {
	User            string `yaml:"user" desc:"用户名"`
	Password        string `yaml:"password" desc:"用户密码"`
	Host            string `yaml:"host" desc:"主机名"`
	Port            int    `yaml:"port" desc:"主机端口"`
	Database        string `yaml:"database" desc:"数据库名"`
	Dialect         string `yaml:"dialect" desc:"数据库类型"`
	DBSource        string `yaml:"db_source" desc:"数据库源"`
	DBDebug         bool   `yaml:"db_debug" desc:"是否输出gorm数据库调试语句"`
	MaxAge          int    `yaml:"max_age" desc:"日志最大保留时间"`
	RotateTimeLevel int    `yaml:"rotate_time_level" desc:"日志分片时间等级 0 自定义时间分片 1 日分片 2 1小时分片 3 1分钟分片"`
	RotateTime      int    `yaml:"rotate_time" desc:"自定义时间分片时长 单位为:min"`
}

const RotateByTimestamp = 0 //自定义时间分片
const RotateByDate = 1      //日分片
const RotateByHour = 2      //1小时分片
const RotateByMinute = 3    //1分钟分片

type Dao struct {
	DB *gorm.DB
}

func NewDBFromRawDB(db *sql.DB, dbConf DBConf) (*gorm.DB, error) {
	var newDb *gorm.DB
	var err error
	if dbConf.DBDebug {
		var err error
		var loggerWriteSyncer *rotatelogs.RotateLogs
		var loggerFileName = fmt.Sprintf("%s-db-debug.log", dbConf.Database)
		switch dbConf.RotateTimeLevel {
		case RotateByTimestamp:
			loggerWriteSyncer, err = rotatelogs.New(
				"../logs/"+loggerFileName+".%Y%m%d%H%M",
				rotatelogs.WithLinkName("../logs/"+loggerFileName),
				rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
				rotatelogs.WithRotationTime(time.Duration(dbConf.RotateTime)*time.Minute))
		case RotateByDate:
			loggerWriteSyncer, err = rotatelogs.New(
				"../logs/"+loggerFileName+".%Y%m%d",
				rotatelogs.WithLinkName("../logs/"+loggerFileName),
				rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour))
		case RotateByHour:
			loggerWriteSyncer, err = rotatelogs.New(
				"../logs/"+loggerFileName+".%Y%m%d%H",
				rotatelogs.WithLinkName("../logs/"+loggerFileName),
				rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
				rotatelogs.WithRotationTime(time.Hour))
		case RotateByMinute:
			loggerWriteSyncer, err = rotatelogs.New(
				"../logs/"+loggerFileName+".%Y%m%d%H%M",
				rotatelogs.WithLinkName("../logs/"+loggerFileName),
				rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
				rotatelogs.WithRotationTime(time.Minute))
		}
		if err != nil {
			return nil, err
		}
		dbLogger := logger.New(log.New(loggerWriteSyncer, "\r\n", log.LstdFlags), logger.Config{
			SlowThreshold: time.Second,
			LogLevel:      logger.Silent,
			Colorful:      true,
		})
		switch dbConf.Dialect {
		case "mysql":
			newDb, err = gorm.Open(mysql.New(mysql.Config{
				Conn: db,
			}), &gorm.Config{
				Logger: dbLogger,
			})
			break
		case "postgres":
			newDb, err = gorm.Open(postgres.New(postgres.Config{
				Conn: db,
			}), &gorm.Config{
				Logger: dbLogger,
			})
			break
		}
		newDb = newDb.Debug()
	} else {
		switch dbConf.Dialect {
		case "mysql":
			newDb, err = gorm.Open(mysql.New(mysql.Config{
				Conn: db,
			}))
			break
		case "postgres":
			newDb, err = gorm.Open(postgres.New(postgres.Config{
				Conn: db,
			}))
			break
		}
	}
	return newDb, err
}

func (dao *Dao) XaLocalTransaction(qs url.Values, f func(dao *Dao) error) (err error) {
	dbConf := DBConf{
		User:            "root",
		Password:        "CodeMan2022080^2*1",
		Host:            "127.0.0.1",
		Port:            3306,
		Database:        "test",
		Dialect:         "mysql",
		DBSource:        "root:CodeMan2022080^2*1@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local",
		DBDebug:         true,
		RotateTime:      1,
		RotateTimeLevel: 1,
	}
	err = dtmcli.XaLocalTransaction(qs, dtmcli.DBConf{
		Driver:   dbConf.Dialect,
		Host:     dbConf.Host,
		Port:     int64(dbConf.Port),
		User:     dbConf.User,
		Password: dbConf.Password,
		Db:       dbConf.Database,
	}, func(db *sql.DB, xa *dtmcli.Xa) error {
		dao := &Dao{}
		dao.DB, err = NewDBFromRawDB(db, dbConf)
		return f(dao)
	})
	return
}

func (dao *Dao) TransInXa(userId int, amount float64) error {
	result := dao.DB.Exec("update user_account set balance = balance + ? where user_id = ?", amount, userId)
	if result.Error != nil {
		return result.Error
	}
	return nil
}

func (dao *Dao) TransOutXa(userId int, amount float64) error {
	result := dao.DB.Exec("update user_account set balance = balance - ? where user_id = ?", amount, userId)
	if result.Error != nil {
		return result.Error
	}
	return nil
}

func transInXa(ctx *gin.Context) {
	var req TransInXaRequest
	err := ctx.ShouldBind(&req)
	if ValidateError(ctx, err) {
		return
	}
	qs := ctx.Request.URL.Query()
	dao := Dao{}
	err = dao.XaLocalTransaction(qs, func(dao *Dao) error {
		err = dao.TransInXa(req.TransInUserId, req.Amount)
		if err != nil {
			return errors.New("数据库更新出错!")
		}
		return err
	})
	if err != nil {
		ResponseWithStatusCode(ctx, 409, 409, "trans in xa failed:"+err.Error())
		return
	}
	ResponseSuccess(ctx, "trans in xa success")
}

func transOutXa(ctx *gin.Context) {
	var req TransOutXaRequest
	err := ctx.ShouldBind(&req)
	if ValidateError(ctx, err) {
		return
	}
	qs := ctx.Request.URL.Query()
	dao := Dao{}
	err = dao.XaLocalTransaction(qs, func(dao *Dao) error {
		err = dao.TransOutXa(req.TransOutUserId, req.Amount)
		if err != nil {
			return errors.New("数据库更新出错!")
		}
		return err
	})
	if err != nil {
		ResponseWithStatusCode(ctx, 409, 409, "trans out xa failed:"+err.Error())
		return
	}
	ResponseSuccess(ctx, "trans out xa success")
}

func main() {
	router := gin.Default()
	router.POST("/api/v1/business/trans-in-xa", transInXa)
	router.POST("/api/v1/business/trans-out-xa", transOutXa)
	router.Run(":8686")
}

运行环境:golang版本1.21,mysql版本8.0.32
先运行test_dtm_http_xa.go,再运行test_dtm.go,就会出现dtm显示xa事务提交成功,但是实际没有成功提交xa事务的情况。

from dtm.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.