无锁队列实现最终会在压力下产生循环
我有一个用C语言编写的无锁队列,它以链表的forms存在,包含发布到单个线程并在单个线程中处理的多个线程的请求。 经过几个小时的压力后,我最终得到了最后一个请求的下一个指针指向自身,这创建了一个无限循环并锁定了处理线程。
应用程序在Linux和Windows上运行(并失败)。 我在Windows上调试,我的COMPARE_EXCHANGE_PTR
映射到InterlockedCompareExchangePointer 。
这是将请求推送到列表头部的代码,并从多个线程调用:
void push_request(struct request * volatile * root, struct request * request) { assert(request); do { request->next = *root; } while(COMPARE_EXCHANGE_PTR(root, request, request->next) != request->next); }
这是从列表末尾获取请求的代码,仅由处理它们的单个线程调用:
struct request * pop_request(struct request * volatile * root) { struct request * volatile * p; struct request * request; do { p = root; while(*p && (*p)->next) p = &(*p)->next; // next == NULL); return request; }
请注意,我没有使用尾指针,因为我想避免在push_request
处理尾指针的push_request
。 但是我怀疑问题可能在于我找到列表末尾的方式。
有几个地方将请求推送到队列中,但它们看起来都像这样:
// device->requests is defined as struct request * volatile requests; struct request * request = malloc(sizeof(struct request)); if(request) { // fill out request fields push_request(&device->requests, request); sem_post(device->request_sem); }
处理请求的代码所做的不止于此,但实质上是在循环中执行此操作:
if(sem_wait_timeout(device->request_sem, timeout) == sem_success) { struct request * request = pop_request(&device->requests); // handle request free(request); }
我还添加了一个函数,它在每个操作之前和之后检查列表中的重复项,但是我担心这个检查会改变时间,以便我永远不会遇到它失败的地方。 (我正在等待它打破,因为我正在写这篇文章。)
当我打破挂起程序时,处理程序线程在标记位置的pop_request
中循环。 我有一个或多个请求的有效列表,最后一个指针指向自己。 请求队列通常很短,我从未见过超过10,只有1和3这两次我可以在调试器中查看这个失败。
我尽可能地想到了这一点,我得出的结论是,除非我两次推送相同的请求,否则我永远不能在列表中找到循环。 我很确定这种情况永远不会发生。 我也很确定(虽然不完全)它不是ABA问题 。
我知道我可能会同时发出多个请求,但我相信这在这里是无关紧要的,而且我从未见过这种情况。 (我也会解决这个问题)
我想我怎么能破坏我的function,但是我没有想到一种方法来结束循环。
所以问题是:有人能看到一种方式可以打破这种方式吗? 有人可以certificate这不可以吗?
最终我会解决这个问题(可能通过使用尾指针或其他一些解决方案 – 锁定将是一个问题,因为发布的线程不应该被锁定,虽然我手边有一个RW锁)但我想确保改变列表实际上解决了我的问题(而不是因为不同的时间使它更不可能)。
这很微妙,但你确实有竞争条件。
从包含一个元素的列表开始, req1
。 所以我们有:
device->requests == req1; req1->next == NULL;
现在,我们推送一个新的元素req2
,同时尝试弹出队列。 推动req2
首先开始。 while循环体运行,所以我们现在有:
device->requests == req1; req1->next == NULL; req2->next == req1;
然后COMPARE_EXCHANGE_PTR
运行,所以我们有:
device->requests == req2; req1->next == NULL; req2->next == req1;
…并且COMPARE_EXCHANGE_PTR()
返回req1
。 现在,此时, 在while
条件的比较之前 ,推送被中断并且pop开始运行。
pop正常运行完成,弹出req1
– 这意味着我们有:
device->requests == req2; req2->next == NULL;
推重启。 它现在获取request->next
进行比较 – 然后它获取req2->next
的新值,即NULL
。 它将req1
与NULL
进行比较,比较成功,while循环再次运行,现在我们有:
device->requests == req2; req2->next == req2;
这次测试失败,while循环退出,你有循环。
这应该解决它:
void push_request(struct request * volatile * root, struct request * request) { struct request *oldroot; assert(request); do { request->next = oldroot = *root; } while(COMPARE_EXCHANGE_PTR(root, request, oldroot) != oldroot); }