无锁队列C++实现

无锁队列C++实现,第1张

实现了一个lock free queue,自测通过。有需要的拿去。

```

#ifndef __LFQUEUE_H__

#define __LFQUEUE_H__

#include <thread>

#include <iostream>

#include <atomic>

#include "dtype.h"

#include "os/Win.h"

#include "logger.h"

#define CAS2(obj, expected, desired) std::atomic::atomic_compare_exchange_weak(obj, expected, desired)

#ifdef WIN32

#define CAS(ptr, oldvalue, newvalue) InterlockedCompareExchange(ptr, newvalue, oldvalue)

#else

#define CAS(ptr, oldvalue, newvalue) __sync_val_compare_and_swap(ptr , oldvalue , newvalue)

#endif

typedef struct queue_elem

{

void* data

bool origin // 0: from GRPC, 1: from Self

queue_elem(void* e, uint32 o) { data = eorigin = o}

} QueueElem

class LockFreeQueue

{

public:

LockFreeQueue(int32 capicity)

: capicity_(capicity), header_(0), tailer_(0), guard_(0)

{

if (capicity_ <4) { capicity_ = 4}

lf_queue_ = new QueueElem*[capicity_]

for (int i = 0i <capicity_i++)

lf_queue_[i] = NULL

}

~LockFreeQueue() { if (lf_queue_) { delete [] lf_queue_} }

public:

enum QStatus {

Empty,

Full,

Normal,

Unknown

}

//bool isempty() { return header_ == tailer_}

// guard_ is the maximum dequeue item

bool isempty() { return header_ == guard_}

bool isfull() { return (internal_index(tailer_ + 1)) == header_}

int32 internal_index(int32 v) { return (v % capicity_)}

bool enqueue(QueueElem* item)

{

int32 temp,guard

assert(item)

do

{

// fetch tailer_ first and then judge isfull, else encounter concurrent problem

temp = tailer_

guard = guard_

if (isfull())

{

return false

}

// cross operate

if (CAS(&tailer_, temp, internal_index(temp + 1)) == temp)

{

lf_queue_[temp] = item

// update the guard_ for the max dequeue item

CAS(&guard_, guard, internal_index(guard + 1))

break

}

else

{

//std::cout <<"enqueue Cas failure one times" <<std::endl

}

} while (true)

return true

}

bool dequeue(QueueElem** item)

{

int32 temp

do

{

// fetch header first and then judge isempty, else encounter concurrent problem

temp = header_

*item = NULL

if (isempty())

{

return false

}

// cross operate CAS failure

*item = lf_queue_[temp]

if (CAS(&header_, temp, internal_index(temp + 1)) == temp)

{

// some producer lock one slot, but doesn't push back

// while (!lf_queue_[temp])

// {

// std::this_thread::yield()

// }

//*item = lf_queue_[temp]

lf_queue_[temp] = NULL

break

}

else

{

//std::cout <<"dequeue Cas failure one times" <<std::endl

}

} while (true)

return true

}

private:

QueueElem** lf_queue_

int32 capicity_

#ifdef WIN32

LONG header_

LONG tailer_

LONG guard_ // header <= guard <= tailer

#else

int32 header_

int32 tailer_

int32 guard_ // header <= guard <= tailer

#endif

}

#endif

```

linux 2.6.28推出了一种通用的ring-buffer实现,但某些 *** 作这种实现过多依赖锁机制,导致性能不佳。目前DPDK使用的ring是基于Steven Rostedt提出的一种无锁ring-buffer算法实现,该算法消除了写入时锁依赖,为内核的采集信息功能提供了快路径,非常高效。我们一起通过这篇文章学习下该算法的实现原理。

通用算法在linux上的使用如下:采集信息的功能由内核来完成,因此需要存储过程(写 *** 作)尽可能的耗时小,保证对处理events的影响尽可能小。读数据都是基于用户空间完成,因此对性能要求相对低些。linux内核提供的ring-buffer创建了一个环形、双向页面列表,包括一个头指针和尾指针,写基于尾指针,读基于头指针。当ring-buffer写满时,有可能writers会覆盖头指针指向的页面内容,这将会导致读 *** 作数据异常。基于上述原因,该算法提供一个独立的reader页面,该页面完全独立于双向链表,这样readers就不用担心读数据时会被writers将数据破坏。这种实现唯一的缺点是当一个reader页面处理完需要从链表中重新load一个页面时,必须要将处理完的空page插到尾指针之后,同时将当前的head page移除掉作为新的reader page,并将head page在链表上向前移动,这个过程需要依赖锁来保证安全。

Rostedt的算法中提出的ring-buffer结构如下图,H代表Header-flagged pointer:

当存在多个writer时,writer A可以被其他的writer B打断,只要A能保证在B开始前完成其写入动作。该功能可有一种称为interrupts stack机制来实现,当一个wirter初始化时,在尾指针之后预留空间来处理写入事件,当完成时更新尾指针位置,同时引入另一个指针commit指针,用来标记 最近的写入完成指针。在一个空的ring-buffer中,reader page\tail page\commit page有可能指向同一处。

为了消除写过程的锁依赖,提出了 cmpxchg() 原子 *** 作如下:

算法还假设双向链表的指针是4字节对齐,预留低2bit给flags:

-HEADER --- 指针指向当前的head page

-UPDATE --- 指针指向的page正在被写入数据,或者,即将被设为head page

当reader page消费完成时,当前的head page需要从ring-buffer中脱落下来变成新的reader page。通过在next指针中使用HEADER bit,readers可以使用cmxchg()来要求HEADER flag被标记,保证页面内容未被修改过。当writers将flag设置为UPDATE或者清除时,表示该页面内容已被修改过,cmxchg()比较失败,这样reader会重新查找一个新的head page,从而避免使用锁。

当writers更新一个新的tail page时,需要检查next 指针的HEADER flag。如果被设置,需要更新为UPDATE,表示该页面正在被writers使用,并会导致cmpxchg()检查失败,需要剥离一个新的head page为reader page。这种状态也意味着环接近写满状态,仅剩一个页面用来写数据。

参考文献:


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/yw/8680696.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-19
下一篇2023-04-19

发表评论

登录后才能评论

评论列表(0条)

    保存