Lock Free堆栈实现的想法 – 目前已经破解

我想出了一个想法,我试图实现一个无锁堆栈,不依赖引用计数来解决ABA问题,并且还正确处理内存回收。 它在概念上与RCU类似,并且依赖于两个特征:将列表条目标记为已删除,以及跟踪阅读器遍历列表。 前者很简单,它只使用指针的LSB。 后者是我对实现无限制无锁堆栈的方法的“聪明”尝试。

基本上,当任何线程试图遍历列表时,一个primefaces计数器(list.entries)会递增。 遍历完成后,第二个计数器(list.exits)递增。

节点分配由push处理,释放由pop处理。

推送和弹出操作与天真无锁堆栈实现非常相似,但必须遍历标记为删除的节点才能到达未标记的条目。 因此推送基本上就像链表插入一样。

pop操作类似地遍历列表,但它使用atomic_fetch_or将节点标记为在遍历时被删除,直到它到达未标记的节点。

遍历0个或更多标记节点的列表后,弹出的线程将尝试CAS堆栈的头部。 至少有一个并发弹出的线程将成功,在此之后,进入堆栈的所有读者将不再看到以前标记的节点。

成功更新列表的线程然后加载primefaceslist.entries,并基本上自旋加载atomic.exits,直到该计数器最终超过list.entries。 这应该意味着列表的“旧”版本的所有读者都已完成。 然后,该线程只是释放它从列表顶部交换的标记节点列表。

因此,弹出操作的含义应该(我认为)可能没有ABA问题,因为释放的节点不会返回到可用的指针池,直到所有使用它们的并发读取器完成,显然内存回收问题由于同样的原因,处理也是如此。

所以,无论如何,这是理论,但我仍然在实施上摸不着头脑,因为它目前无法正常工作(在multithreading情况下)。 似乎我在免费问题之后得到了一些写作,但是我在查找问题时遇到了麻烦,或者我的假设可能存在缺陷而且无法正常工作。

无论是概念还是调试代码的方法,都会非常感谢任何见解。

这是我当前(损坏的)代码(使用gcc -D_GNU_SOURCE -std = c11 -Wall -O0 -g -pthread -o list list.c编译):

#include  #include  #include  #include  #include  #include  #include  #include  #define NUM_THREADS 8 #define NUM_OPS (1024 * 1024) typedef uint64_t list_data_t; typedef struct list_node_t { struct list_node_t * _Atomic next; list_data_t data; } list_node_t; typedef struct { list_node_t * _Atomic head; int64_t _Atomic size; uint64_t _Atomic entries; uint64_t _Atomic exits; } list_t; enum { NODE_IDLE = (0x0), NODE_REMOVED = (0x1 << 0), NODE_FREED = (0x1 <data = data; stats.mallocd++; return new; } void list_node_free(list_node_t * node) { free(node); stats.freed++; } static void list_add(list_t * list, list_data_t data) { atomic_fetch_add_explicit(&list->entries, 1, memory_order_seq_cst); list_node_t * new = list_node_new(data); list_node_t * _Atomic * next = &list->head; list_node_t * current = atomic_load_explicit(next, memory_order_seq_cst); do { stats.add_count++; while ((NODE_POINTER(current) != NULL) && NODE_IS_SET(current, NODE_REMOVED)) { stats.add_count++; current = NODE_POINTER(current); next = &current->next; current = atomic_load_explicit(next, memory_order_seq_cst); } atomic_store_explicit(&new->next, current, memory_order_seq_cst); } while(!atomic_compare_exchange_weak_explicit( next, &current, new, memory_order_seq_cst, memory_order_seq_cst)); atomic_fetch_add_explicit(&list->exits, 1, memory_order_seq_cst); atomic_fetch_add_explicit(&list->size, 1, memory_order_seq_cst); stats.added++; } static bool list_remove(list_t * list, list_data_t * pData) { uint64_t entries = atomic_fetch_add_explicit( &list->entries, 1, memory_order_seq_cst); list_node_t * start = atomic_fetch_or_explicit( &list->head, NODE_REMOVED, memory_order_seq_cst); list_node_t * current = start; stats.remove_count++; while ((NODE_POINTER(current) != NULL) && NODE_IS_SET(current, NODE_REMOVED)) { stats.remove_count++; current = NODE_POINTER(current); current = atomic_fetch_or_explicit(&current->next, NODE_REMOVED, memory_order_seq_cst); } uint64_t exits = atomic_fetch_add_explicit( &list->exits, 1, memory_order_seq_cst) + 1; bool result = false; current = NODE_POINTER(current); if (current != NULL) { result = true; *pData = current->data; current = atomic_load_explicit( &current->next, memory_order_seq_cst); atomic_fetch_add_explicit(&list->size, -1, memory_order_seq_cst); stats.removed++; } start = NODE_SET_FLAG(start, NODE_REMOVED); if (atomic_compare_exchange_strong_explicit( &list->head, &start, current, memory_order_seq_cst, memory_order_seq_cst)) { entries = atomic_load_explicit(&list->entries, memory_order_seq_cst); while ((int64_t)(entries - exits) > 0) { pthread_yield(); exits = atomic_load_explicit(&list->exits, memory_order_seq_cst); } list_node_t * end = NODE_POINTER(current); list_node_t * current = NODE_POINTER(start); while (current != end) { list_node_t * tmp = current; current = atomic_load_explicit(&current->next, memory_order_seq_cst); list_node_free(tmp); current = NODE_POINTER(current); } } return result; } static list_t list; pthread_mutex_t ioLock = PTHREAD_MUTEX_INITIALIZER; void * thread_entry(void * arg) { sleep(2); int id = *(int *)arg; for (int i = 0; i < NUM_OPS; i++) { bool insert = random() % 2; if (insert) { list_add(&list, i); } else { list_data_t data; list_remove(&list, &data); } } struct rusage u; getrusage(RUSAGE_THREAD, &u); pthread_mutex_lock(&ioLock); printf("Thread %d stats:\n", id); printf("\tadded = %lu\n", stats.added); printf("\tremoved = %lu\n", stats.removed); printf("\ttotal added = %ld\n", (int64_t)(stats.added - stats.removed)); printf("\tadded count = %lu\n", stats.add_count); printf("\tremoved count = %lu\n", stats.remove_count); printf("\tadd average = %f\n", (float)stats.add_count / stats.added); printf("\tremove average = %f\n", (float)stats.remove_count / stats.removed); printf("\tmallocd = %lu\n", stats.mallocd); printf("\tfreed = %lu\n", stats.freed); printf("\ttotal mallocd = %ld\n", (int64_t)(stats.mallocd - stats.freed)); printf("\tutime = %f\n", u.ru_utime.tv_sec + u.ru_utime.tv_usec / 1000000.0f); printf("\tstime = %f\n", u.ru_stime.tv_sec + u.ru_stime.tv_usec / 1000000.0f); pthread_mutex_unlock(&ioLock); return NULL; } int main(int argc, char ** argv) { struct { pthread_t thread; int id; } threads[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; i++) { threads[i].id = i; pthread_create(&threads[i].thread, NULL, thread_entry, &threads[i].id); } for (int i = 0; i < NUM_THREADS; i++) { pthread_join(threads[i].thread, NULL); } printf("Size = %ld\n", atomic_load(&list.size)); uint32_t count = 0; list_data_t data; while(list_remove(&list, &data)) { count++; } printf("Removed %u\n", count); } 

您提到您正在尝试解决ABA问题,但描述和代码实际上是尝试解决更难的问题: 内存回收问题。

这个问题通常出现在没有垃圾收集的语言中实现的无锁集合的“删除”function中。 核心问题是从共享结构中删除节点的线程通常不知道何时释放已删除的节点是安全的,因为其他读取可能仍然具有对它的引用。 作为一个副作用,解决这个问题通常解决了ABA问题:即使底层指针(和对象的状态)在此期间至少被改变了两次,这也特别是关于CAS操作的成功,最后是原始价值但呈现完全不同的状态。

ABA问题更容易,因为ABA问题有几个直接的解决方案,特别是不会导致“内存回收”问题的解决方案。 在某种意义上,能够检测到位置修改的硬件(例如,使用LL / SC或事务存储器基元)可能根本不会出现问题。

所以说,你正在寻找内存回收问题的解决方案,它也将避免ABA问题。

您的问题的核心是这个声明:

成功更新列表的线程然后加载primefaceslist.entries,并基本上自旋加载atomic.exits,直到该计数器最终超过list.entries。 这应该意味着列表的“旧”版本的所有读者都已完成。 然后,该线程只是释放它从列表顶部交换的标记节点列表。

这个逻辑不成立。 等待list.exits (你说atomic.exits,但我认为这是一个错字,因为你只讨论list.exits在其他地方)比list.entries只告诉你现在有更多的退出总数比在指向变异线程捕获的条目计数。 然而,这些出口可能是由新的读者来来往往产生的:它并不意味着所有的老读者都按照你的要求完成了

这是一个简单的例子。 首先,写入线程T1和读取线程T2访问列表,因此list.entries为2, list.exits为0.写入线程弹出一个节点,并保存list.entries的当前值(2)并等待lists.exits大于2.现在又有三个读取线程T3T4T5到达并快速读取列表并离开。 现在lists.exits为3,满足您的条件, T1释放节点。 T2没有消失,因为它正在读取一个释放的节点!

你有的基本想法可以奏效,但你的两个反制方法尤其肯定是行不通的。

这是一个经过充分研究的问题,因此您不必创建自己的算法(请参阅上面的链接),甚至编写自己的代码,因为librcu和concurrencykit之类的东西已经存在。

用于教育目的

如果你想让这项工作用于教育目的,一种方法是使用确保在修改开始后进入的线程使用一组不同的list.entry/exit计数器。 一种方法是生成计数器,当编写者想要修改列表时,它会增加生成计数器,这会导致新读者切换到另一组list.entry/exit计数器。

现在作者只需要等待list.entry[old] == list.exists[old] ,这意味着所有读者都离开了。 你也可以每代只使用一个计数器:你没有两个entry/exit计数器(虽然它可能有助于减少争用)。

当然,你知道每一代管理这个单独计数器列表的新问题……哪种看起来像建立一个无锁列表的原始问题! 这个问题稍微容易一点,因为你可能会对“飞行中”的代数进行一些合理的限制,并且只是将它们全部预先分配,或者你可以实现一种更容易推理的有限类型的无锁列表。因为添加和删除仅发生在头部或尾部。

Interesting Posts