Git Product home page Git Product logo

delay-queue's Introduction

delay-queue

Go Report Card Downloads license Release

基于Redis实现的延迟队列, 参考有赞延迟队列设计实现

应用场景

  • 订单超过30分钟未支付,自动关闭
  • 订单完成后, 如果用户一直未评价, 5天后自动好评
  • 会员到期前15天, 到期前3天分别发送短信提醒

支付宝异步通知实现

支付宝异步通知时间间隔是如何实现的(通知的间隔频率一般是:2m,10m,10m,1h,2h,6h,15h)  

订单支付成功后, 生成通知任务, 放入消息队列中.
任务内容包含Array{0,0,2m,10m,10m,1h,2h,6h,15h}和通知到第几次N(这里N=1, 即第1次).
消费者从队列中取出任务, 根据N取得对应的时间间隔为0, 立即发送通知.

第1次通知失败, N += 1 => 2
从Array中取得间隔时间为2m, 添加一个延迟时间为2m的任务到延迟队列, 任务内容仍包含Array和N

第2次通知失败, N += 1 => 3, 取出对应的间隔时间10m, 添加一个任务到延迟队列, 同上
......
第7次通知失败, N += 1 => 8, 取出对应的间隔时间15h, 添加一个任务到延迟队列, 同上
第8次通知失败, N += 1 => 9, 取不到间隔时间, 结束通知

实现原理

利用Redis的有序集合,member为JobId, score为任务执行的时间戳,
每秒扫描一次集合,取出执行时间小于等于当前时间的任务.

依赖

  • Redis

下载

releases

源码安装

  • go语言版本1.7+
  • go get -d github.com/ouqiang/delay-queue
  • go build

运行

./delay-queue -c delay-queue.conf

HTTP Server监听0.0.0.0:9277, Redis连接地址127.0.0.1:6379, 数据库编号1

客户端

PHP

HTTP接口

  • 请求方法 POST
  • 请求Body及返回值均为json

返回值

{
  "code": 0,
  "message": "添加成功",
  "data": null
}
参数名 类型 含义 备注
code int 状态码 0: 成功 非0: 失败
message string 状态描述信息
data object, null 附加信息

添加任务

URL地址 /push

{
  "topic": "order",
  "id": "15702398321",
  "delay": 3600,
  "ttr": 120,
  "body": "{\"uid\": 10829378,\"created\": 1498657365 }"
}
参数名 类型 含义 备注
topic string Job类型
id string Job唯一标识 需确保JobID唯一
delay int Job需要延迟的时间, 单位:秒
ttr int Job执行超时时间, 单位:秒
body string Job的内容,供消费者做具体的业务处理,如果是json格式需转义

轮询队列获取任务

服务端会Hold住连接, 直到队列中有任务或180秒后超时返回,
任务执行完成后需调用finish接口删除任务, 否则任务会重复投递, 消费端需能处理同一任务的多次投递

URL地址 /pop

{
  "topic": "order"
}
参数名 类型 含义 备注
topic string Job类型

队列中有任务返回值

{
  "code": 0,
  "message": "操作成功",
  "data": {
    "id": "15702398321",
    "body": "{\"uid\": 10829378,\"created\": 1498657365 }"
  }
}

队列为空返回值

{
  "code": 0,
  "message": "操作成功",
  "data": null
}

删除任务

URL地址 /delete

{
  "id": "15702398321"
}
参数名 类型 含义 备注
id string Job唯一标识

完成任务

URL地址 /finish

{
  "id": "15702398321"
}
参数名 类型 含义 备注
id string Job唯一标识

查询任务

URL地址 /get

{
  "id": "15702398321"
}
参数名 类型 含义 备注
id string Job唯一标识

返回值

{
    "code": 0,
    "message": "操作成功",
    "data": {
        "topic": "order",
        "id": "15702398321",
        "delay": 1506787453,
        "ttr": 60,
        "body": "{\"uid\": 10829378,\"created\": 1498657365 }"
    
    }
}
参数名 类型 含义 备注
topic string Job类型
id string Job唯一标识
delay int Job延迟执行的时间戳
ttr int Job执行超时时间, 单位:秒
body string Job内容,供消费者做具体的业务处理

Job不存在返回值

{
  "code": 0,
  "message": "操作成功",
  "data": null
}

delay-queue's People

Contributors

bolls avatar ouqiang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

delay-queue's Issues

建议

delay_queue.go 添加一个job.Id查询Job的 和 添加一个查询队列数量的 两个接口方便管理和查询

代理的逻辑有问题

	// 再次确认元信息中delay是否小于等于当前时间
	if job.Delay > t.Unix() {  // 这里的逻辑有问题。
		// 重新计算delay时间并放入bucket中
		pushToBucket(<-bucketNameChan, job.Delay, bucketItem.jobId)

		// 从bucket中删除之前的bucket
		removeFromBucket(bucketName, bucketItem.jobId)

		continue
	}

在 delay_queue.go 中,有如上的代码片段。if job.Delay > t.Unix() 这个写的不对。
应该是:if job.Delay < t.Unix()

因为,如果延迟时间大于 当前时间了,应该立马投递到 ready queue 里面,供消费端消费了?
不知道理解的是否正确?

多客户端问题

如果开启连个客户端(php),我发现一次消息延迟执行,会被执行两次。两个客户端各自执行了一次。

请问如何解决高可用问题?

采用定时器的方式,通过计时器的值与需要多久过期的差值进行比较来判断,但考虑到如果是一台主机或云服务器,若突然发生宕机或项目发布新版本等某些意外操作,觉得会造成时间上的偏差。希望大佬在README里面提下当前设计的不足和未来架构的方向。不胜感激

msgpack用起来挺香,但和go-codec比起来怎么样呢?

支持分布式吗?

这个延迟队列支持分布式嘛?redis 肯定是分布式的啦。但如果 delay-queue 是多实例部署的话,就有可能出现重复的呀

支持 redis 集群嘛

我看了代码。感觉是不支持的。是不是在 redis 的配置文件改改,就就可以支持 redis 集群了呢?

id需要全局唯一,不同topic无法隔离

push进队列时传了topic和id,建议所有接口都要传topic,保证每个topic下的id唯一性即可
这样这个延时队列可以同时服务多个业务线,不会互相影响

pushToReadyQueue()与removeFromBucket()需要考虑事务

在Push、Pop和tickHandler会使用<-bucketNameChan分配bucket
而在tickHandler中需要先pushToReadyQueue(),再removeFromBucket(),这个过程中如果Job被Pop(),并且pop后pushToBucket()取到的bucketName与原来相同,这时候时序会出问题,pop向bucket中写入新的timestamp,但随后被remove,导致bucket中的Job丢失

几点疑问

您好:
以下有几点疑问:

  1. 同一topic下,若多个job delay相同,loop时是否有冲突
  2. 业务方jobid必须要保证唯一,若有多个业务方,服务端是否要保证全局jobid的唯一性?(结合redis)
  3. 依靠客户端不断轮训获取延迟job,是否采用pub/sub方式更好?
    期待您的回复!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.