欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

最快线程间数据交换算法,有效避免锁竞争 -- TwoQueues

程序员文章站 2022-07-12 15:42:29
...
本人CSDN博客地址:http://blog.csdn.net/hzdiy/article/details/8694642
处理多线程数据共享问题注意的几个要点:
1、锁竞争:尽量减少锁竞争的时间和次数。
2、内存:尽量是使用已分配内存,减少内存分配和释放的次数。尽量是用连续内存,减少共享占用的内存量。


多线程数据交换简单方案A:
定义一个list,再所有操作list的地方进行加锁和解锁。
简单模拟代码:

class CSimpleQueue  
{  
public:  
    CSimpleQueue()  
    {  
        InitializeCriticalSection(&m_crit);  
    }  
    ~CSimpleQueue()  
    {  
        DeleteCriticalSection(&m_crit);  
    }  
  
    // 添加数据  
    void AddData(DATA pData)  
    {  
        EnterCriticalSection(&m_crit);  
        m_listDATA.push_back(pData);  
        LeaveCriticalSection(&m_crit);  
    }  
    // 获取数据  
    bool GetData(DATA data)  
    {  
        EnterCriticalSection(&m_crit);  
        if (m_listDATA.size() > 0)  
        {  
            data = m_listDATA.front();  
            m_listDATA.pop_front();  
            LeaveCriticalSection(&m_crit);  
  
            return true;  
        }  
        else  
        {  
            LeaveCriticalSection(&m_crit);  
            return false;  
        }  
    }  
  
private:  
    list<DATA> m_listDATA;  
    CRITICAL_SECTION m_crit;  
};  


多线程数据交换优化方案B -- TwoQueues:
文章最后有TwoQueues的详细实现源代码。

TwoQueues的实现分析:
锁竞争:TwoQueues使用双内存块交换的方式减少锁竞争,数据写入时会进行加锁和解锁操作,但读取数据时,只是当当前读取队列的数据读取完毕时,进行两个队列交换时才进行加锁和解锁操作。可以说,数据的读取是不进行加锁的。这样,最大限度的降低了锁竞争问题。
内存:TwoQueues采用预先分配内存块的方式,初始化TwoQueues时就已经将存放数据的内存分配好了。之后数据存放于已经分配的内存块上。无需再次分配内存,不会再次进行内存分配。
TwoQueues的进一步优化,TwoQueues在存放数据时,完全可以模仿【数据长度+数据】的方式存放数据。但是这种方式会增加数据存在性检测的效率。TwoQueues则采用了一种链表的方式进行存放数据。
链表结构:

typedef struct tagDataHead  
    {  
        tagDataHead()  
        {  
            pHead = NULL;  
            nLen = 0;  
            pNext = NULL;  
        }  
        void ReSet()  
        {  
            pHead = NULL;  
            nLen = 0;  
            pNext = NULL;  
        }  
        char* pHead;  
        unsigned int nLen;  
        tagDataHead* pNext;  
    }DATAHEAD;  

此链表存放了数据头指针和数据长度。当TwoQueues中存在数据时,可以直接通过链表节点拿数据。因为链表每次创建时也是需要进行申请内存,而内存申请是一个比较耗效率的事情,TwoQueues再此做了一个小小的处理,当链表头不存在时,会进行ReAlloc调用,一性申请MALLOC_SIZE个链表头结构。并且链表从链表上解下时并不会释放内存而是放入一个pFreeDataHead链表上,当需要申请链表头结构时,会先从pFreeDataHead链表上取链表结构。此处理减少了内存分配和释放的次数,提高了多线程数据共享的效率。
以上的各种优化处理,使TwoQueues的效率得到了极大的提升。
之前做过的内存共享的测试。A线程不停的写入数据,B线程不停的读取数据。
debug版本 release版本
TwoQueues 80-100万次/秒 170-180万次/秒
CSimpleQueues 20万次/秒 5万次/秒

双队列的使用:
创建对象:
CTwoQueues m_cTwoQueues;
// 初始化双队列大小,因为是双队列,所以内存占用量是两份。
m_cTwoQueues.Init(0x0FFFFFFF);

写入方式:
m_cTwoQueues.PushData(sz, strlen(sz)+1);

读取方式:
const void* pData = NULL;

unsigned int nLen = 0;
if (m_cTwoQueues.PrepareData(pData, nLen))
{
// 处理数据  ...
// 确认(丢弃)此数据
m_cTwoQueues.ConfimData();
}

// 下面是TwoQueues的所有源代码  源代码下载
#pragma once  
#include <assert.h>  
  
namespace clwCore  
{  
#define MALLOC_SIZE 128  
  
    typedef struct tagDataHead  
    {  
        tagDataHead()  
        {  
            pHead = NULL;  
            nLen = 0;  
            pNext = NULL;  
        }  
        void ReSet()  
        {  
            pHead = NULL;  
            nLen = 0;  
            pNext = NULL;  
        }  
        char* pHead;  
        unsigned int nLen;  
        tagDataHead* pNext;  
    }DATAHEAD;  
  
    typedef struct tagDataMem  
    {  
        tagDataMem(unsigned int nSize)  
        {  
            if (0 == nSize)  
            {  
                assert(false);  
                return ;  
            }  
  
            nMaxSize = nSize;  
            pDataMem = (char*)malloc(nSize*sizeof(char));  
            nDataPos = 0;  
  
            if (NULL == pDataMem)  
            {  
                // CTwoQueues申请malloc内存失败。  
                assert(false);  
            }  
  
            pListDataHead = NULL;  
            pCurDataHead = NULL;  
            pFreeDataHead = NULL;  
        }  
        ~tagDataMem()  
        {  
            // 释放节点  
            ReSet();  
            while(NULL != pFreeDataHead)  
            {  
                DATAHEAD* pTempDataHead = pFreeDataHead;  
                pFreeDataHead = pFreeDataHead->pNext;  
  
                delete pTempDataHead;  
                pTempDataHead = NULL;  
            }  
  
            free(pDataMem);  
            pDataMem = NULL;  
            nDataPos = 0;  
        }  
        bool ReAlloc()  
        {  
            for (unsigned short i=0; i<MALLOC_SIZE; ++i)  
            {  
                DATAHEAD* pTempDataHead = new DATAHEAD;  
                //pTempDataHead->ReSet();    //构造时已经初始化  
  
                if (NULL == pTempDataHead)  
                {  
                    // 申请DATAHEAD内存失败。  
                    assert(false);  
                    return false;  
                }  
  
                pTempDataHead->pNext = pFreeDataHead;  
                pFreeDataHead = pTempDataHead;  
            }  
  
            return true;  
        }  
        DATAHEAD* GetDataHead()  
        {  
            if (NULL != pFreeDataHead)  
            {  
                DATAHEAD* pTempDataHead = pFreeDataHead;  
                pFreeDataHead = pFreeDataHead->pNext;  
  
                return pTempDataHead;  
            }  
  
            if (ReAlloc())  
            {  
                if (NULL != pFreeDataHead)  
                {  
                    DATAHEAD* pTempDataHead = pFreeDataHead;  
                    pFreeDataHead = pFreeDataHead->pNext;  
  
                    return pTempDataHead;  
                }  
            }  
  
            // ASSERT("GetDataHead返回NULL。");  
            assert(false);  
            return NULL;  
        }  
        unsigned int GetFreeLen()   //空闲内存长度  
        {  
            return nMaxSize-nDataPos;  
        }  
        bool PushData(void* pData, unsigned int nLen)  
        {  
            if (nDataPos+nLen >= nMaxSize)  
            {  
                return false;  
            }  
  
            DATAHEAD* pTempDataHead = GetDataHead();  
  
            if (NULL == pTempDataHead)  
            {  
                return false;  
            }  
  
            // 构造数据头结构  
            pTempDataHead->pHead = (pDataMem+nDataPos);  
            pTempDataHead->nLen = nLen;  
            pTempDataHead->pNext = NULL;  
  
            // 拷贝数据  
            memcpy(pDataMem+nDataPos, pData, nLen);  
            nDataPos += nLen;  
  
            if (NULL == pListDataHead)  
            {  
                pListDataHead = pTempDataHead;  
                pCurDataHead = pTempDataHead;  
                return true;  
            }  
            else  
            {  
                pCurDataHead->pNext = pTempDataHead;  
                pCurDataHead = pCurDataHead->pNext;  
                return true;  
            }  
        }  
  
        bool IsEmpty()  //判断是否有数据  
        {  
            return (NULL==pListDataHead)?true:false;  
        }  
  
        bool PrepareData(const void*& pData, unsigned int& nLen)    //准备一条数据  
        {  
            if (NULL != pListDataHead)  
            {  
                pData = pListDataHead->pHead;  
                nLen = pListDataHead->nLen;  
                return true;  
            }  
            else  
            {  
                return false;  
            }  
        }  
        void ConfimData()   //删除一条数据  
        {  
            if (NULL == pListDataHead)  
            {  
                return ;  
            }  
  
            DATAHEAD* pTempDataHead = pListDataHead;  
            pListDataHead = pListDataHead->pNext;  
  
            pTempDataHead->ReSet();  
            pTempDataHead->pNext = pFreeDataHead;  
            pFreeDataHead = pTempDataHead;  
        }  
        void ReSet()    //重置内存存储对象  
        {  
            while(NULL != pListDataHead)  
            {  
                DATAHEAD* pTempDataHead = pListDataHead;  
                pListDataHead = pListDataHead->pNext;  
  
                pTempDataHead->ReSet();  
                pTempDataHead->pNext = pFreeDataHead;  
                pFreeDataHead = pTempDataHead;  
            }  
  
            nDataPos = 0;  
            pCurDataHead = NULL;  
        }  
  
        char* pDataMem;         //数据内存区域  
        unsigned int nDataPos;  //数据存储位置  
        unsigned int nMaxSize;  //最大存储区域大小  
  
        DATAHEAD* pListDataHead;  
        DATAHEAD* pCurDataHead;  
        DATAHEAD* pFreeDataHead;    //空闲头结构队列  
    }DATAMEM;  
  
    class CTwoQueues  
    {  
    public:  
        CTwoQueues(void)  
        {  
            InitializeCriticalSection(&m_crit);  
            m_pDataMemPush = NULL;  
            m_pDataMemPop = NULL;  
        }  
        ~CTwoQueues(void)  
        {  
            if (NULL != m_pDataMemPush)  
            {  
                delete m_pDataMemPush;  
                m_pDataMemPush = NULL;  
            }  
  
            if (NULL != m_pDataMemPop)  
            {  
                delete m_pDataMemPop;  
                m_pDataMemPop = NULL;  
            }  
            DeleteCriticalSection(&m_crit);  
        }  
  
    public:  
        void Init(unsigned int nSize)  
        {  
            if (0 == nSize)  
            {  
                // 初始化CTwoQueues对象失败。  
                assert(false);  
                return ;  
            }  
  
            m_pDataMemPush = new DATAMEM(nSize);  
            m_pDataMemPop = new DATAMEM(nSize);  
        }  
  
        bool PushData(void* pData, unsigned int nLen)  
        {  
            //unsigned int nFreeLen = m_pDataMemPush->GetFreeLen();  
  
            bool bResult = false;  
  
            EnterCriticalSection(&m_crit);  
            bResult = m_pDataMemPush->PushData(pData, nLen);  
            LeaveCriticalSection(&m_crit);  
  
            return bResult;  
        }  
        bool PrepareData(const void*& pData, unsigned int& nLen)  
        {  
            bool bCanRead = true;  
            if (m_pDataMemPop->IsEmpty())  
            {  
                // 队列没有数据了  
                EnterCriticalSection(&m_crit);  
                if (m_pDataMemPush->IsEmpty())  
                {  
                    // Push队列为空  
                    LeaveCriticalSection(&m_crit);  
                    bCanRead = false;  
                }  
                else  
                {  
                    m_pDataMemPop->ReSet();  //充值读取队列  
                    DATAMEM* pTempDataMem = m_pDataMemPush;  
                    m_pDataMemPush = m_pDataMemPop;  
                    m_pDataMemPop = pTempDataMem;  
                    LeaveCriticalSection(&m_crit);  
                    bCanRead = true;  
                }  
            }  
  
            if (bCanRead)  
            {  
                return m_pDataMemPop->PrepareData(pData, nLen);  
            }  
            else  
            {  
                return false;  
            }  
        }  
  
        void ConfimData()  
        {  
            m_pDataMemPop->ConfimData();  
        }  
  
    private:  
        DATAMEM* m_pDataMemPush;  
        DATAMEM* m_pDataMemPop;  
        CRITICAL_SECTION m_crit;  
  
    };  
}