欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用Golang的singleflight防止缓存击穿

程序员文章站 2022-06-27 20:54:42
背景 在使用缓存时,容易发生缓存击穿。 缓存击穿:一个存在的key,在缓存过期的瞬间,同时有大量的请求过来,造成所有请求都去读dB,这些请求都会击穿到DB,造成瞬时DB请求量大、压力骤增。 singleflight 介绍 singleflight类的使用方法就新建一个singleflight.Gro ......

背景

在使用缓存时,容易发生缓存击穿。

缓存击穿:一个存在的key,在缓存过期的瞬间,同时有大量的请求过来,造成所有请求都去读db,这些请求都会击穿到db,造成瞬时db请求量大、压力骤增。

singleflight

介绍

import "golang.org/x/sync/singleflight"
singleflight类的使用方法就新建一个singleflight.group,使用其方法do或者dochan来包装方法,被包装的方法在对于同一个key,只会有一个协程执行,其他协程等待那个协程执行结束后,拿到同样的结果。

  • group结构体
    代表一类工作,同一个group中,同样的key同时只能被执行一次。
  • do方法
    func (g *group) do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
    key:同一个key,同时只有一个协程执行。
    fn:被包装的函数。
    v:返回值,即执行的结果。其他等待的协程都会拿到。
    shared:表示是否有其他协程得到了这个结果v。
  • dochan方法
    func (g *group) dochan(key string, fn func() (interface{}, error)) <-chan result
    与do方法一样,只是返回的是一个channel,执行结果会发送到channel中,其他等待的协程都可以从channel中拿到结果。

ref:

示例

  • 使用do方法来模拟,解决缓存击穿的问题
func main() {
   var singlesetcache singleflight.group

   getandsetcache:=func (requestid int,cachekey string) (string, error) {
      log.printf("request %v start to get and set cache...",requestid)
      value,_, _ :=singlesetcache.do(cachekey, func() (ret interface{}, err error) {//do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读db
         log.printf("request %v is setting cache...",requestid)
         time.sleep(3*time.second)
         log.printf("request %v set cache success!",requestid)
         return "value",nil
      })
      return value.(string),nil
   }

   cachekey:="cachekey"
   for i:=1;i<10;i++{//模拟多个协程同时请求
      go func(requestid int) {
         value,_:=getandsetcache(requestid,cachekey)
         log.printf("request %v get value: %v",requestid,value)
      }(i)
   }
   time.sleep(20*time.second)
}

输出:

2020/04/12 18:18:40 request 4 start to get and set cache...
2020/04/12 18:18:40 request 4 is setting cache...
2020/04/12 18:18:40 request 2 start to get and set cache...
2020/04/12 18:18:40 request 7 start to get and set cache...
2020/04/12 18:18:40 request 5 start to get and set cache...
2020/04/12 18:18:40 request 1 start to get and set cache...
2020/04/12 18:18:40 request 6 start to get and set cache...
2020/04/12 18:18:40 request 3 start to get and set cache...
2020/04/12 18:18:40 request 8 start to get and set cache...
2020/04/12 18:18:40 request 9 start to get and set cache...
2020/04/12 18:18:43 request 4 set cache success!
2020/04/12 18:18:43 request 4 get value: value
2020/04/12 18:18:43 request 9 get value: value
2020/04/12 18:18:43 request 6 get value: value
2020/04/12 18:18:43 request 3 get value: value
2020/04/12 18:18:43 request 8 get value: value
2020/04/12 18:18:43 request 1 get value: value
2020/04/12 18:18:43 request 5 get value: value
2020/04/12 18:18:43 request 2 get value: value
2020/04/12 18:18:43 request 7 get value: value

可以看到确实只有一个协程执行了被包装的函数,并且其他协程都拿到了结果。

源码分析

看一下这个do方法是怎么实现的。
首先看一下group的结构:

type group struct {
   mu sync.mutex      
   m  map[string]*call //保存key对应的函数执行过程和结果的变量。
}

group的结构非常简单,一个锁来保证并发安全,另一个map用来保存key对应的函数执行过程和结果的变量。
看下call的结构:

type call struct {
   wg sync.waitgroup //用waitgroup实现只有一个协程执行函数
   val interface{} //函数执行结果
   err error
   forgotten bool
   dups  int //含义是duplications,即同时执行同一个key的协程数量
   chans []chan<- result
}

看下do方法

func (g *group) do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
   g.mu.lock()//写group的m字段时,加锁保证写安全。
   if g.m == nil {
      g.m = make(map[string]*call)
   }
   if c, ok := g.m[key]; ok {//如果key已经存在,说明已经有协程在执行,则dups++,并等待其执行完毕后,返回其执行结果,执行结果保存在对应的call的val字段里
      c.dups++
      g.mu.unlock()
      c.wg.wait()
      return c.val, c.err, true
   }
   //如果key不存在,则新建一个call,并使用waitgroup来阻塞其他协程,同时在m字段里写入key和对应的call
   c := new(call)
   c.wg.add(1)
   g.m[key] = c
   g.mu.unlock()

   g.docall(c, key, fn)//第一个进来的协程来执行这个函数
   return c.val, c.err, c.dups > 0
}

继续看下g.docall里具体干了什么

func (g *group) docall(c *call, key string, fn func() (interface{}, error)) {
   c.val, c.err = fn()//执行被包装的函数
   c.wg.done()//执行完毕后,就可以通知其他协程可以拿结果了

   g.mu.lock()
   if !c.forgotten {//其实这里是为了保证执行完毕之后,对应的key被删除,group有一个方法forget(key string),可以用来主动删除key,这里是判断那个方法是否被调用过,被调用过则字段forgotten会置为true,如果没有被调用过,则在这里把key删除。
      delete(g.m, key)
   }
   for _, ch := range c.chans {//将执行结果发送到channel里,这里是给dochan方法使用的
      ch <- result{c.val, c.err, c.dups > 0}
   }
   g.mu.unlock()
}

由此看来,其实现是非常简单的。不得不赞叹一百来行代码就实现了功能。

其他

顺便附上dochan方法的使用示例:

func main() {
   var singlesetcache singleflight.group

   getandsetcache:=func (requestid int,cachekey string) (string, error) {
      log.printf("request %v start to get and set cache...",requestid)
      retchan:=singlesetcache.dochan(cachekey, func() (ret interface{}, err error) {
         log.printf("request %v is setting cache...",requestid)
         time.sleep(3*time.second)
         log.printf("request %v set cache success!",requestid)
         return "value",nil
      })

      var ret singleflight.result

      timeout := time.after(5 * time.second)

      select {//加入了超时机制
      case <-timeout:
         log.printf("time out!")
         return "",errors.new("time out")
      case ret =<- retchan://从chan中取出结果
         return ret.val.(string),ret.err
      }
      return "",nil
   }

   cachekey:="cachekey"
   for i:=1;i<10;i++{
      go func(requestid int) {
         value,_:=getandsetcache(requestid,cachekey)
         log.printf("request %v get value: %v",requestid,value)
      }(i)
   }
   time.sleep(20*time.second)
}

看下dochan的源码

func (g *group) dochan(key string, fn func() (interface{}, error)) <-chan result {
   ch := make(chan result, 1)
   g.mu.lock()
   if g.m == nil {
      g.m = make(map[string]*call)
   }
   if c, ok := g.m[key]; ok {
      c.dups++
      c.chans = append(c.chans, ch)//可以看到,每个等待的协程,都有一个结果channel。从之前的g.docall里也可以看到,每个channel都给塞了结果。为什么不所有协程共用一个channel?因为那样就得在channel里塞至少与协程数量一样的结果数量,但是你却无法保证用户一个协程只读取一次。
      g.mu.unlock()
      return ch
   }
   c := &call{chans: []chan<- result{ch}}
   c.wg.add(1)
   g.m[key] = c
   g.mu.unlock()

   go g.docall(c, key, fn)

   return ch
}