
```
#ifndef __LFQUEUE_H__
#define __LFQUEUE_H__
#include <thread>
#include <iostream>
#include <atomic>
#include "dtype.h"
#include "os/Win.h"
#include "logger.h"
#define CAS2(obj, expected, desired) std::atomic::atomic_compare_exchange_weak(obj, expected, desired)
#ifdef WIN32
#define CAS(ptr, oldvalue, newvalue) InterlockedCompareExchange(ptr, newvalue, oldvalue)
#else
#define CAS(ptr, oldvalue, newvalue) __sync_val_compare_and_swap(ptr , oldvalue , newvalue)
#endif
typedef struct queue_elem
{
void* data
bool origin // 0: from GRPC, 1: from Self
queue_elem(void* e, uint32 o) { data = eorigin = o}
} QueueElem
class LockFreeQueue
{
public:
LockFreeQueue(int32 capicity)
: capicity_(capicity), header_(0), tailer_(0), guard_(0)
{
if (capicity_ <4) { capicity_ = 4}
lf_queue_ = new QueueElem*[capicity_]
for (int i = 0i <capicity_i++)
lf_queue_[i] = NULL
}
~LockFreeQueue() { if (lf_queue_) { delete [] lf_queue_} }
public:
enum QStatus {
Empty,
Full,
Normal,
Unknown
}
//bool isempty() { return header_ == tailer_}
// guard_ is the maximum dequeue item
bool isempty() { return header_ == guard_}
bool isfull() { return (internal_index(tailer_ + 1)) == header_}
int32 internal_index(int32 v) { return (v % capicity_)}
bool enqueue(QueueElem* item)
{
int32 temp,guard
assert(item)
do
{
// fetch tailer_ first and then judge isfull, else encounter concurrent problem
temp = tailer_
guard = guard_
if (isfull())
{
return false
}
// cross operate
if (CAS(&tailer_, temp, internal_index(temp + 1)) == temp)
{
lf_queue_[temp] = item
// update the guard_ for the max dequeue item
CAS(&guard_, guard, internal_index(guard + 1))
break
}
else
{
//std::cout <<"enqueue Cas failure one times" <<std::endl
}
} while (true)
return true
}
bool dequeue(QueueElem** item)
{
int32 temp
do
{
// fetch header first and then judge isempty, else encounter concurrent problem
temp = header_
*item = NULL
if (isempty())
{
return false
}
// cross operate CAS failure
*item = lf_queue_[temp]
if (CAS(&header_, temp, internal_index(temp + 1)) == temp)
{
// some producer lock one slot, but doesn't push back
// while (!lf_queue_[temp])
// {
// std::this_thread::yield()
// }
//*item = lf_queue_[temp]
lf_queue_[temp] = NULL
break
}
else
{
//std::cout <<"dequeue Cas failure one times" <<std::endl
}
} while (true)
return true
}
private:
QueueElem** lf_queue_
int32 capicity_
#ifdef WIN32
LONG header_
LONG tailer_
LONG guard_ // header <= guard <= tailer
#else
int32 header_
int32 tailer_
int32 guard_ // header <= guard <= tailer
#endif
}
#endif
```
linux 2.6.28推出了一种通用的ring-buffer实现,但某些 *** 作这种实现过多依赖锁机制,导致性能不佳。目前DPDK使用的ring是基于Steven Rostedt提出的一种无锁ring-buffer算法实现,该算法消除了写入时锁依赖,为内核的采集信息功能提供了快路径,非常高效。我们一起通过这篇文章学习下该算法的实现原理。
通用算法在linux上的使用如下:采集信息的功能由内核来完成,因此需要存储过程(写 *** 作)尽可能的耗时小,保证对处理events的影响尽可能小。读数据都是基于用户空间完成,因此对性能要求相对低些。linux内核提供的ring-buffer创建了一个环形、双向页面列表,包括一个头指针和尾指针,写基于尾指针,读基于头指针。当ring-buffer写满时,有可能writers会覆盖头指针指向的页面内容,这将会导致读 *** 作数据异常。基于上述原因,该算法提供一个独立的reader页面,该页面完全独立于双向链表,这样readers就不用担心读数据时会被writers将数据破坏。这种实现唯一的缺点是当一个reader页面处理完需要从链表中重新load一个页面时,必须要将处理完的空page插到尾指针之后,同时将当前的head page移除掉作为新的reader page,并将head page在链表上向前移动,这个过程需要依赖锁来保证安全。
Rostedt的算法中提出的ring-buffer结构如下图,H代表Header-flagged pointer:
当存在多个writer时,writer A可以被其他的writer B打断,只要A能保证在B开始前完成其写入动作。该功能可有一种称为interrupts stack机制来实现,当一个wirter初始化时,在尾指针之后预留空间来处理写入事件,当完成时更新尾指针位置,同时引入另一个指针commit指针,用来标记 最近的写入完成指针。在一个空的ring-buffer中,reader page\tail page\commit page有可能指向同一处。
为了消除写过程的锁依赖,提出了 cmpxchg() 原子 *** 作如下:
算法还假设双向链表的指针是4字节对齐,预留低2bit给flags:
-HEADER --- 指针指向当前的head page
-UPDATE --- 指针指向的page正在被写入数据,或者,即将被设为head page
当reader page消费完成时,当前的head page需要从ring-buffer中脱落下来变成新的reader page。通过在next指针中使用HEADER bit,readers可以使用cmxchg()来要求HEADER flag被标记,保证页面内容未被修改过。当writers将flag设置为UPDATE或者清除时,表示该页面内容已被修改过,cmxchg()比较失败,这样reader会重新查找一个新的head page,从而避免使用锁。
当writers更新一个新的tail page时,需要检查next 指针的HEADER flag。如果被设置,需要更新为UPDATE,表示该页面正在被writers使用,并会导致cmpxchg()检查失败,需要剥离一个新的head page为reader page。这种状态也意味着环接近写满状态,仅剩一个页面用来写数据。
参考文献:
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)