欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

golang实现redis的延时消息队列功能示例

程序员文章站 2023-02-17 16:44:07
前言 在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现。本次主要采用了re...

前言

在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现。本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo。

提前准备 安装redis, redis-go

因为用的是macos, 直接

$ brew install redis
$ go get github.com/garyburd/redigo/redis

又因为比较懒,生成任务的唯一id时,直接采用了bson中的objectid,所以:

$ go get gopkg.in/mgo.v2/bson

唯一id不是必须有,但如果之后有实际应用需要携带,便于查找相应任务。

生产者

通过一个for循环生成10w个任务, 每一个任务有不同的时间

func producer() {
 count := 0
 //生成100000个任务
 for count < 100000 {
 count++
 dealtime := int64(rand.intn(5)) + time.now().unix()
 uuid := bson.newobjectid().hex()
 redis.client.addjob(&job.jobmessage{
 id: uuid,
 dealtime: dealtime,
 }, + int64(dealtime))
 }
}

其中addjob函数在另一个包中, 将上一个函数中随机生成的时间作为需要处理的时间戳.

// 添加任务
func (client *redisclient) addjob(msg *job.jobmessage, dealtime int64) {
 conn := client.get()
 defer conn.close()

 key := "job_message_queue"
 conn.do("zadd", key, dealtime, util.jsonencode(msg))
}

消费者

消费者处理流程分为两个步骤:

  • 获取小于等于当前时间戳的任务
  • 通过删除当前任务来判断谁获得了当前任务

因为在获取小于等于当前时间戳的任务时,可能有多个go routine同时读到了当前任务,而只有一个任务可以来处理当前任务。因此我们需要通过一个方案来判断究竟由谁来处理这个任务(当然如果只有一个消费者可以读到就直接处理):这个时候可以通过redis的删除操作来获取,因为删除指定value时只有成功的操作才会返回不为0,所以我们可以认为删除当前队列成功的那个go routine拿到了当前的任务。

下面是代码:

// 消费者
func consumer() {
 // 启动10个go routine一起去拿
 count := 0
 for count < 10 {
 go func() {
 for {
 jobs := redis.client.getjob()
 if len(jobs) <= 0 {
  time.sleep(time.second * 1)
  continue
 }
 currentjob := jobs[0]
 // 如果当前抢redis队列成功,
 if redis.client.deljob(currentjob) > 0 {
  var jobmessage job.jobmessage
  util.jsondecode(currentjob, &jobmessage) //自定义的json解析函数
  handlemessage(&jobmessage)
 }

 }

 }()
 count++
 }
}

// 处理任务用函数
func handlemessage(msg *job.jobmessage) {
 fmt.printf("deal job: %s, require time: %d \n", msg.id, msg.dealtime)
 go func() {
 countchan <- true
 }()
}

redis部分的代码,获取任务和删除任务

// 获取任务
func (client *redisclient) getjob() []string {
 conn := client.get()
 defer conn.close()

 key := "job_message_queue"
 timenow := time.now().unix()
 ret, err := redis.strings(conn.do("zrangebyscore", key, 0, timenow, "limit", 0, 1))
 if err != nil {
 panic(err)
 }
 return ret
}

// 删除当前任务, 用来判断是否抢到了当前任务
func (client *redisclient) deljob(value string) int {
 conn := client.get()
 defer conn.close()

 key := "job_message_queue"
 ret, err := redis.int(conn.do("zrem", key, value))
 if err != nil {
 panic(err)
 }
 return ret
}

代码大抵如此。最后跑起来之后,大概每3-4秒钟能够处理掉1w个任务,速度上确实是...

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。