在pthreads中实现FIFO互斥

我正在尝试实现支持并发插入的二叉树(甚至可能在节点之间发生),但无需为每个节点分配全局锁或单独的互斥锁或互斥锁。 相反,分配的此类锁的数量应该是使用树的线程数量级

因此,我最终遇到了一种锁定护航问题。 更简单地解释一下,当两个或多个线程执行以下操作时, 可能会发生这种情况:

 1 for(;;){
 2锁(互斥锁)
 3 do_stuff
 4解锁(互斥)
 5}

也就是说,如果线程#1在一个“cpu突发”中执行指令4-> 5-> 1-> 2,那么线程#2将从执行中匮乏。

另一方面,如果pthreads中存在用于互斥锁的FIFO类型锁定选项,则可以避免这样的问题。 那么,有没有办法在pthreads中实现FIFO类型的互斥锁? 可以改变线程优先级吗?

你可以这样做:

  • 定义一个“排队锁”,它由一个忙/闲标志和一个pthread条件变量的链表组成。 对queued_lock的访问受互斥锁保护

  • 锁定queued_lock:

    • 抓住互斥锁
    • 检查’忙’标志
    • 如果不忙; set busy = true; 释放互斥; DONE
    • 如果忙 创建一个新的条件@结束队列并等待它(释放互斥锁)
  • 开锁:

    • 抓住互斥锁
    • 如果没有其他线程排队,busy = false; 释放互斥; DONE
    • pthread_cond_signal第一个等待条件
    • 不要清除’busy’标志 – 所有权传递给另一个线程
    • 释放互斥
  • 当等待线程被pthread_cond_signal解除阻塞时:

    • 从队列头部删除我们的条件var
    • 释放互斥

请注意,只有在更改queued_lock的状态时才会锁定互斥锁,而不是在保持queued_lock的整个持续时间内。

您可以实现一个公平的排队系统,其中每个线程在阻塞时都会添加到队列中,并且队列中的第一个线程在资源可用时始终获取该资源。 在pthreads基元上构建的这种“公平”票证锁可能如下所示:

 #include  typedef struct ticket_lock { pthread_cond_t cond; pthread_mutex_t mutex; unsigned long queue_head, queue_tail; } ticket_lock_t; #define TICKET_LOCK_INITIALIZER { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER } void ticket_lock(ticket_lock_t *ticket) { unsigned long queue_me; pthread_mutex_lock(&ticket->mutex); queue_me = ticket->queue_tail++; while (queue_me != ticket->queue_head) { pthread_cond_wait(&ticket->cond, &ticket->mutex); } pthread_mutex_unlock(&ticket->mutex); } void ticket_unlock(ticket_lock_t *ticket) { pthread_mutex_lock(&ticket->mutex); ticket->queue_head++; pthread_cond_broadcast(&ticket->cond); pthread_mutex_unlock(&ticket->mutex); } 

发布时的示例没有解决方案。 基本上你只有一个关键部分,并没有平行的地方。

也就是说,您会发现将线程持有互斥锁的时间减少到最少,只需要少量指令就很重要。 这很难插入动态数据结构,例如树。 概念上最简单的解决方案是每个树节点有一个读写锁。

如果您不希望每个树节点具有单独的锁,则每个树的级别可以有一个锁结构。 我会尝试使用读写锁。 当您遍历树时,您可以仅使用手中节点级别的读取锁定(加上下一级别)。 然后当你找到合适的一个插入锁定该级别进行写入。

解决方案可能是使用primefaces操作 。 没有锁定,没有上下文切换,没有睡眠,并且比互斥锁或条件变量快得多。 primefaces操作不是最终所有解决方案,但我们使用primefaces操作创建了许多通用数据结构的线程安全版本。 它们非常快。

primefaces操作是一系列简单的操作,如增量,减量或赋值,保证在multithreading环境中以primefaces方式执行。 如果两个线程同时命中op,则cpu确保一个线程一次执行op。 primefaces操作是硬件指令,因此它们很快。 “比较和交换”对于线程安全数据结构非常有用。 在我们的测试中,primefaces比较和交换与32位整数赋值一样快。 也许2倍慢。 当您考虑使用互斥锁消耗多少CPU时,primefaces操作会无限快。

通过旋转来平衡树与primefaces操作并非易事,但并非不可能。 我在过去遇到过这个要求,并且通过制作一个线程安全的跳过列表来欺骗,因为跳过列表可以通过primefaces操作轻松完成。 对不起,我不能给你一份我们的代码……我的公司会解雇我,但它很容易自己做。

简单线程安全链表示例可以显示primefaces操作如何使无锁数据结构可视化。 在不使用锁的情况下将项添加到全局链接列表(_pHead)。 首先保存_pHead,pOld的副本。 在执行并发操作时,我认为这些副本是“世界状态”。 接下来创建一个新节点pNew,并将其pNext设置为COPY 。 然后使用primefaces“比较和交换”将_pHead改为pNew仅在pHead仍然存在的情况下。 只有当_pHead没有改变时,primefaces操作才会成功。 如果失败,则循环返回以获取新_pHead的副本并重复。

如果操作成功,世界其他地方现在将看到一个新的头脑。 如果一个线程在之前的一个纳秒内得到了旧的头部,那个线程将不会看到新的项目,但列表仍然可以安全地迭代。 由于我们将pNext预设为旧头,因此我们将新项目添加到列表中,如果线程在添加新头之后的纳秒内看到新头,则列表可以安全遍历。

全球的东西:

 typedef struct _TList { int data; struct _TList *pNext; } TList; TList *_pHead; 

添加到列表:

 TList *pOld, *pNew; ... // allocate/fill/whatever to make pNew ... while (1) { // concurrency loop pOld = _pHead; // copy the state of the world. We operate on the copy pNew->pNext = pOld; // chain the new node to the current head of recycled items if (CAS(&_pHead, pOld, pNew)) // switch head of recycled items to new node break; // success } 

CAS是__sync_bool_compare_and_swap等的简写。 看看有多容易? 没有互斥锁……没有锁! 在极少数情况下,2个线程同时命中该代码,一个只是第二次循环。 我们只看到第二个循环,因为调度程序在并发循环中交换了一个线程。 所以这是罕见而无关紧要的。

事情可以以类似的方式从链表的头部拉出来。 如果使用联合,则可以primefaces地更改多个值,并且可以使用uup到128位primefaces操作。 我们在32位redhat linux上测试了128位,它们的速度与32位,64位primefaces操作相同。

您将不得不弄清楚如何在树上使用这种技术。 b树节点将具有两个子节点的ptrs。 你可以CAS来改变它们。 平衡问题很棘手。 我可以看到在添加内容之前如何分析树分支并从某个点创建分支的副本。 当你完成更改分支时,CAS就是新的分支。这对于大分支来说是个问题。 当线程没有在树上战斗时,也许可以“稍后”进行平衡。 也许你可以做到这样,即使你没有一直级联旋转,树仍然是可搜索的…换句话说,如果线程A添加了一个节点并且递归地旋转节点,则线程b仍然可以读取或添加节点。 只是一些想法。 在某些情况下,我们在pNext的32位之后的32位中创建一个具有版本号或锁定标志的结构。 我们然后使用64位CAS。 也许你可以在没有锁的情况下随时安全地阅读树,但你可能不得不在正在修改的分支上使用版本控制技术。

以下是我讨论过primefaces操作优点的一堆post:

Pthreads和互斥体; 锁定数组的一部分

线程参数的高效快捷方式

使用pthreads配置自动重新加载

使用条件变量优于互斥锁的优点

单位操作

linux中的内存分配是非阻塞的吗?

您可以使用@caf概述的想法获得公平的Mutex,但执行实际锁定之前使用primefaces操作来获取票证。

 #if defined(_MSC_VER) typedef volatile LONG Sync32_t; #define SyncFetchAndIncrement32(V) (InterlockedIncrement(V) - 1) #elif (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__) > 40100 typedef volatile uint32_t Sync32_t; #define SyncFetchAndIncrement32(V) __sync_fetch_and_add(V, 1) #else #error No atomic operations #endif class FairMutex { private: Sync32_t _nextTicket; Sync32_t _curTicket; pthread_mutex_t _mutex; pthread_cond_t _cond; public: inline FairMutex() : _nextTicket(0), _curTicket(0), _mutex(PTHREAD_MUTEX_INITIALIZER), _cond(PTHREAD_COND_INITIALIZER) { } inline ~FairMutex() { pthread_cond_destroy(&_cond); pthread_mutex_destroy(&_mutex); } inline void lock() { unsigned long myTicket = SyncFetchAndIncrement32(&_nextTicket); pthread_mutex_lock(&_mutex); while (_curTicket != myTicket) { pthread_cond_wait(&_cond, &_mutex); } } inline void unlock() { _curTicket++; pthread_cond_broadcast(&_cond); pthread_mutex_unlock(&_mutex); } }; 

更广泛地说,我不会称之为FIFO Mutex,因为它给人的印象是维持一个首先不存在的订单。 如果您的线程并行调用lock(),则在调用锁之前它们不能有一个订单,因此创建一个保留不存在的订单关系的互斥锁是没有意义的。

您可以查看pthread_mutexattr_setprioceiling函数。

 int pthread_mutexattr_setprioceiling ( pthread_mutexatt_t * attr, int prioceiling, int * oldceiling ); 

从文档:

pthread_mutexattr_setprioceiling(3THR)设置互斥锁属性对象的优先级上限属性。

attr指向由先前调用pthread_mutexattr_init()创建的互斥锁属性对象。

prioceiling指定初始化互斥锁的优先级上限。 上限定义了执行互斥锁保护的临界区的最低优先级。 prioceiling将在SCHED_FIFO定义的最大优先级范围内。 为了避免优先级倒置,prioceiling将被设置为高于或等于可能锁定特定互斥锁的所有线程的最高优先级的优先级。

oldceiling包含旧的优先级上限值。