欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

KFiFo浅析

程序员文章站 2022-07-14 16:22:10
...

#1.前言
这段时间在写一个基于PCIE实现FPGA与上位机通信,利用多线程实现读写同步,本次使用到了KFIFO无锁队列,以下是本人自己对kfifo无锁队列原理的理解。
#2.概述
提供一个无边界的字节流服务,最重要的一点是,它使用并行无锁编程技术,即当它用于只有一个入队线程和一个出队线程的场情时,两个线程可以并发操作,而不需要任何加锁行为。当然,若是采用其他消费模式自然是要加锁了。对应kfifo结构体如下:


struct kfifo {
    unsigned char *buffer;	/* the begaining address for the buffer holding the data, not changed */
    unsigned int size;	/* the size of the allocated buffer */
    unsigned int in;	/* data is added at offset (in % size) */
    unsigned int out;	/* data is extracted from off. (out % size) */
};

buffer:用于存放数据缓存;size:buffer空间大小,初始化时将其向上扩展为2的幂;in:指buffer队头;out:指队尾。其中由buffer、in、out实现循环队列。
#3.kfifo功能描述及实现
##3.1kfifo_alloc 分配kfifo内存和初始化工作
kfifo中尤为重要的一个是对size(buffer空间大小)向上取2的幂。对于设置的size,我们首先要做的是判断这个数是不是2的次幂,关于这个问题,参考这个链接,这个链接也含了kfifo的原理:关于2的次幂问题
下面是roundup_pow_of_two()函数代码:

//kfifo内存分配
struct kfifo *KFiFo::kfifo_alloc(unsigned int size)
{
    unsigned char *buffer = NULL;
    struct kfifo *fifo = NULL;
    /*
     * round up to the next power of 2, since our 'let the indices
     * wrap' tachnique works only in this case.
     */
    if (size & (size - 1)) {
        if(size > 0x80000000)
            return NULL;
        size = roundup_pow_of_two(size);
    }
    fifo = (struct kfifo *) malloc(sizeof(struct kfifo));
    if (!fifo)
        return NULL;
    buffer = (unsigned char *)malloc(size);
    if (!buffer)
    {
        free(fifo);
        return NULL;
    }
    fifo->buffer = buffer;
    fifo->size = size;
    fifo->in = fifo->out = 0;
    return fifo;
}

unsigned int KFiFo::roundup_pow_of_two(const unsigned int x)
{
    if (x == 0){ return 0; }
    if (x == 1){ return 2; }
    unsigned int ret = 1;
    while (ret < x)
    {
        ret = ret << 1;
    }
    return ret;
}

##3.2kfifo中的入队与出队
kfifo的入队操作:kfifo_in,它首先将数据放入buffer中,然后改变kfifo->in的值。同样的对于kfifo的出队操作:kfifo_out,也是先将数据从buffer中取出,再改变的kfifo->out的值。那么kfifo的入队与出队是怎么样实现队列的循环的呢?下面将结合下图进行解释,
kfifo在进行初始化时,定义的kfifo的地址是*buffer,该缓存空间大小为size,队头为in,队尾为out。此时,如下图所示fifo->in=fifo->out=0(在操作过程中,只要fifo->in=fifo->out即代表队列为空):
KFiFo浅析
假设,在第一次放入一些数据后,此时的fifo空间如下图所示,我们可以看到放入一定长度的数据后,fifo-in的值在数据放入buffer后随之修改,由于没有取数据,所以fifo->out的值并没有发生改变。
此时,buffer中数据的长度:data=fifo->in-fifo->out。buffer剩余空闲空间长度为:fifo->size-(fifo->in-fifo->out)。
KFiFo浅析
在以上的基础上取出一定长度数据后,如下图所示,可见,此时fifo->out的值发生了变化。
KFiFo浅析
对于kfifo_in()函数,对应代码如下:

/**
 * kfifo_in - puts some data into the FIFO queue
 * @from: the data to be pushed in the queue head.
 * @len: the length of the data wanted to be pushed in.
 * return: number of bytes coied into the queue.
 *
 * This function copies at most @len bytes from the @from buffer into
 * the FIFO queue depending on the free space, and returns the number of
 * bytes copied. If there is no lefted space in the FIFO queue, no bytes
 * will be copied.
 *
 */
unsigned int KFiFo::kfifo_in(const void *from, unsigned int len)
{
    /*lefted is the avaiable size in the fifo buffer*/
    unsigned int lefted = fifo->size - fifo->in + fifo->out;

    /*get the min value between the lefted data size in the buffer and the wanted size */
    len = min(len, lefted);

    /*off is the length from fifo->buffer, the buffer beginning to the queue head*/
    unsigned int off = fifo->in & (fifo->size - 1);

    /* first put l data starting from the queue head to the buffer end */
    unsigned int l = min(len, fifo->size - off);
    memcpy(fifo->buffer + off, from, l);
    
    /* then put the rest (if any) at fifo->buffer, the beginning of the buffer */
    memcpy(fifo->buffer, (unsigned char *)from + l, len - l);
    
    fifo->in += len;
    return len;
}

那么在下一次向buffer中放入数据之前,需要考虑由in到buffer_end的空闲空间长度是否足够存放这一组数据。从上面的kfifo_in()函数, lefted = fifo->size - fifo->in + fifo->out计算出整个fifo剩余的空闲空间,由min()判断剩余的空闲空间能否完全存储这一组数据。再计算出buffer的初始位置到队列头部in这一段的长度off(off = fifo->in & (fifo->size - 1)),最后将数据由in至buffer_end放入数据,若还有剩余数据未放入队列中,则将剩余数据从buffer初始位置开始放入buffer中。此过程示意图如下:
KFiFo浅析
KFiFo浅析
对于kfifo_out()函数,与kfifo_in()函数类似,此处不详解,代码如下:

/**
 * kfifo_out - gets some data from the FIFO queue
 * @to: where the data copied.
 * @len: the size of the buffer wanted to be copied.
 * return: number of bytes copied out of the queue.
 *
 * This function copies at most @len bytes from the FIFO into the
 * @to buffer and returns the number of copied bytes.
 * If there is no lefted bytes in the FIFO queue, no bytes will be copied out. *
 */
unsigned int KFiFo::kfifo_out(void *to, unsigned int len)
{
    /*buffered is the length of the queue*/
    unsigned int buffered = fifo->in - fifo->out;

    /*get the min value between the buffered size and the wanted size */
    len = min(len, buffered);

    /*off is the length from the buffer beginning to the queue tail*/
    unsigned int off = fifo->out & (fifo->size - 1);

    /* first get the data from (fifo->buffer + off), the queue tail to the end of the buffer */
    unsigned int l = min(len, fifo->size - off);;
    memcpy(to, fifo->buffer + off, l);

    /* then get the rest (if any) from fifo->buffer, the beginning of the buffer */
    memcpy((unsigned char *)to + l, fifo->buffer, len - l);
    
    fifo->out += len;
    return len;
}

当前只是实现了单生产者和单消费者模式的无锁队列,那么对于环形队列和多生产和多消费者模式下的应用又该如何实现。