C Pthreads – 线程安全队列实现的问题

我是multithreading的新手,我试图实现一个简单的线程安全任务队列,每个线程可以从中拉出工作,直到没有剩下的任务。 任何线程都不会排队任务。

出于测试目的,每个任务仅包含一个数字。

static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; typedef struct Task{ int number; }Task; typedef struct Cell{ Task t; struct Cell* next; }Cell; typedef struct TQueue{ struct Cell* head; struct Cell* tail; }TQueue; int empty(TQueue *Queue) return queue->head == queue->tail; void startQueue(TQueue *queue){ queue->head = malloc(sizeof(Cell)); queue->tail = queue->head; } void enqueue(TQueue *queue, Task C){ queue->tail->next = malloc(sizeof(Cell)); queue->tail = queue->tail->next; queue->tail->t = C; queue->tail->next = NULL; } Task * dequeue(TQueue* queue){ pthread_mutex_lock( &task_mutex); Task * t; if(empty(queue)) t = NULL; else{ struct Cell* p = queue->head; queue->head = queue->head->next; t = &queue->head->t; free(p); } pthread_mutex_unlock( &task_mutex); return t; } void * work( void* arg){ TQueue* queue = (TQueue *)arg; Task* t = malloc(sizeof(Task)); for(t = dequeue(queue); t != NULL; t = dequeue(queue)) printf("%d ", t->number); free(t); pthread_exit(NULL); return 0; } 

对于一个简单的测试我在main上运行:

 int main(){ TQueue* queue = malloc(sizeof(TQueue)); startQueue(queue); pthread_t threads[3]; Task t[3]; for(int i = 0; i < 3; i++){ t[i].number = i + 1; enqueue(queue, t[i]); } for(int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void*)queue); for(int i = 0; i < 3; i++) pthread_join(threads[i], NULL); return 0; } 

任何顺序的预期输出为1 2 3 ,但有时它会打印出一个奇怪数字的序列,如1823219 2 3 。 我无法发现任何竞争条件或相关问题,所以我感谢任何帮助。

我发现了一些错误。

我已经注释了你的代码。 我从你的第一个post和你的第二个post中拿了一点。 我修复了代码,显示前后[请原谅无偿风格的清理]:

 #include  #include  #include  static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; typedef struct Task { int number; } Task; typedef struct Cell { // NOTE/BUG: this should be a pointer to the task. otherwise, dequeue gets // messy #if 0 Task t; #else Task *t; #endif struct Cell *next; } Cell; typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; void startQueue(TQueue *queue) { #if 0 queue->head = malloc(sizeof(Cell)); #else queue->head = NULL; #endif queue->tail = NULL; } int empty(TQueue *queue) { // NOTE/BUG: dequeue never touches tail, so this test is incorrect #if 0 return (queue->head == queue->tail); #else return (queue->head == NULL); #endif } void enqueue(TQueue *queue, Task *t) { Cell *p; pthread_mutex_lock(&task_mutex); p = malloc(sizeof(Cell)); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } pthread_mutex_unlock(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; pthread_mutex_lock(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; // NOTE/BUG: this is setting t to the second element in the list, // not the first // NOTE/BUG: this is also undefined behavior, in original code (with // original struct definition), because what t points to _does_ get // freed before return #if 0 t = &queue->head->t; #else t = p->t; #endif free(p); } pthread_mutex_unlock(&task_mutex); return t; } void * work(void *arg) { TQueue *queue = (TQueue *) arg; // NOTE/BUG: this gets orphaned on the first call to dequeue #if 0 Task *t = malloc(sizeof(Task)); #else Task *t; #endif for (t = dequeue(queue); t != NULL; t = dequeue(queue)) printf("%d ", t->number); // NOTE/BUG: this frees some cell allocated in main -- not what we want #if 0 free(t); #endif pthread_exit(NULL); return 0; } // For a simple test i runned this on main: int main() { TQueue *queue = malloc(sizeof(TQueue)); startQueue(queue); pthread_t threads[3]; Task t[3]; for (int i = 0; i < 3; i++) { t[i].number = i + 1; #if 0 enqueue(queue, t); #else enqueue(queue, &t[i]); #endif } for (int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void *) queue); for (int i = 0; i < 3; i++) pthread_join(threads[i], NULL); return 0; } 

更新:

线程是否同时执行任务? 我一直在用htop测试cpu的使用情况,我只能最多使用四个核心的单个核心。

要记住一些事情。 对于运行时间如此短的程序, htop可能不会显示太多。 即使有10,000个队列条目,该程序也会在20ms内执行。

让程序本身打印信息更好[见下文]。 请注意, printfstdin上执行线程锁定,因此它可能会导致程序的“串行”性质。 它也为程序的执行时间贡献了很多 (即printfdequeue慢得多)

此外,一个线程(即第一个线程)可以独占队列并在其他条目有机会运行之前排空所有条目。

操作系统可以[可以自由地]安排单个核心上的所有线程。 然后它可以稍后“迁移”它们(例如在一秒左右)。

我已经增强了程序,在输出打印中包含一些定时信息,可能有助于显示您希望看到的更多信息。 此外,我添加了命令行选项来控制线程数和排队的项目数。 这与我自己的一些程序类似。 将程序输出转移到日志文件并进行检查。 在多次运行中使用选项

 #include  #include  #include  #include  #include  int opt_n; // suppress thread output int opt_T; // number of threads int opt_Q; // number of queue items static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; double tvzero; typedef struct Task { int number; } Task; typedef struct Cell { Task *t; struct Cell *next; } Cell; typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; typedef struct Thread { pthread_t tid; int xid; TQueue *queue; } Thread; double tvgetf(void) { struct timespec ts; double sec; clock_gettime(CLOCK_REALTIME,&ts); sec = ts.tv_nsec; sec /= 1e9; sec += ts.tv_sec; sec -= tvzero; return sec; } void startQueue(TQueue *queue) { queue->head = NULL; queue->tail = NULL; } int empty(TQueue *queue) { return (queue->head == NULL); } void enqueue(TQueue *queue, Task *t) { Cell *p; pthread_mutex_lock(&task_mutex); p = malloc(sizeof(Cell)); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } pthread_mutex_unlock(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; pthread_mutex_lock(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; t = p->t; free(p); } pthread_mutex_unlock(&task_mutex); return t; } void * work(void *arg) { Thread *tskcur = arg; TQueue *queue = tskcur->queue; Task *t; double tvbef; double tvaft; while (1) { tvbef = tvgetf(); t = dequeue(queue); tvaft = tvgetf(); if (t == NULL) break; if (! opt_n) printf("[%.9f/%.9f %5.5d] %d\n", tvbef,tvaft - tvbef,tskcur->xid,t->number); } return (void *) 0; } // For a simple test i runned this on main: int main(int argc,char **argv) { char *cp; TQueue *queue; Task *t; Thread *tsk; --argc; ++argv; for (; argc > 0; --argc, ++argv) { cp = *argv; if (*cp != '-') break; switch (cp[1]) { case 'n': // suppress thread output opt_n = 1; break; case 'Q': // number of queue items opt_Q = atoi(cp + 2); break; case 'T': // number of threads opt_T = atoi(cp + 2); break; default: break; } } tvzero = tvgetf(); queue = malloc(sizeof(TQueue)); startQueue(queue); if (opt_T == 0) opt_T = 16; Thread threads[opt_T]; if (opt_Q == 0) opt_Q = 10000; t = malloc(sizeof(Task) * opt_Q); for (int i = 0; i < opt_Q; i++) { t[i].number = i + 1; enqueue(queue, &t[i]); } for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; tsk->xid = i + 1; tsk->queue = queue; pthread_create(&tsk->tid, NULL, work, tsk); } for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; pthread_join(tsk->tid, NULL); } printf("TOTAL: %.9f\n",tvgetf()); free(t); return 0; } 

更新#2:

此外,一个线程(即第一个线程)可以独占队列并在其他人有机会运行之前排空所有条目。“在这种情况下可以做些什么?

一些东西。

pthread_create需要一些时间,允许线程1继续,而其他线程仍在创建。 改善这种情况的一种方法是创建所有线程,每个线程设置一个“我正在运行”标志(在其线程控制块中)。 主线程等待所有线程设置此标志。 然后,主线程设置一个全局volatile“you_may_now_all_run”标志,每个线程在进入其主线程循环之前旋转。 根据我的经验,它们都开始在彼此的微秒内运行[或更好]。

我没有在下面的更新代码中实现这一点,因此您可以自己进行实验[与nanosleep ]。

互斥体总体上相当公平[至少在linux下],因为被阻塞的线程将排队,等待互斥锁。 正如我在评论中提到的那样,也可以使用nanosleep ,但这[有点]会失败,因为线程会减慢。

线索饥饿的解药是“公平”。 正如我所提到的,有一个精细的公平算法,无需等待。 它是Kogan / Petrank算法: http : //www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf这实际上有点涉及/高级,所以需要注意......

但是,妥协可能是门票锁: https : //en.wikipedia.org/wiki/Ticket_lock

我再次重新编写程序。 它具有池化分配,票证与互斥锁定以及日志条目的延迟打印的选项。 它还交叉检查线程之间的结果,以确保它们都没有重复的条目。

当然,所有这一切的关键是准确,高精度的记录(即如果你无法测量它,你就无法调整它)。

例如,人们会认为在dequeue内部执行free会比简单地将Cell释放到可恢复池(类似于slab分配器)慢,但是,性能提升并不像预期的那么大。 这可能是glibc的malloc/free只是快速的[这是他们声称的 ]。

这些不同的版本应该为您提供有关如何构建自己的性能度量套件的一些想法。

无论如何,这是代码:

 #include  #include  #include  #include  #include  #include  #include  #include  int opt_p; // print thread output immediately int opt_T; // number of threads int opt_Q; // number of queue items int opt_L; // use ticket lock int opt_M; // use fast cell alloc/free typedef unsigned char byte; typedef unsigned int u32; #define sysfault(_fmt...) \ do { \ fprintf(stderr,_fmt); \ exit(1); \ } while (0) // lock control typedef struct AnyLock { pthread_mutex_t mutex; // standard mutex volatile u32 seqreq; // ticket lock request volatile u32 seqacq; // ticket lock grant } AnyLock; // work value typedef struct Task { union { struct Task *next; int number; }; } Task; // queue item typedef struct Cell { struct Cell *next; Task *t; } Cell; // queue control typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; // thread log entry typedef struct Log { double tvbef; double tvaft; int number; } Log; #define BTVOFF(_off) \ ((_off) >> 3) #define BTVMSK(_off) \ (1u << ((_off) & 0x07)) #define BTVLEN(_len) \ ((_len) + 7) >> 3 // thread control typedef struct Thread { pthread_t tid; int xid; TQueue *queue; Log *log; byte *bitv; } Thread; static inline byte btvset(byte *bitv,long off) { u32 msk; byte oval; bitv += BTVOFF(off); msk = BTVMSK(off); oval = *bitv & msk; *bitv |= msk; return oval; } AnyLock task_mutex; AnyLock print_mutex; double tvzero; Cell *cellpool; // free pool of cells long bitvlen; #define BARRIER \ __asm__ __volatile__("" ::: "memory") // virtual function pointers Cell *(*cellnew)(void); void (*cellfree)(Cell *); void (*lock_acquire)(AnyLock *lock); void (*lock_release)(AnyLock *lock); double tvgetf(void) { struct timespec ts; double sec; clock_gettime(CLOCK_REALTIME,&ts); sec = ts.tv_nsec; sec /= 1e9; sec += ts.tv_sec; sec -= tvzero; return sec; } void * xalloc(size_t cnt,size_t siz) { void *ptr; ptr = calloc(cnt,siz); if (ptr == NULL) sysfault("xalloc: calloc failure -- %s\n",strerror(errno)); return ptr; } void lock_wait_ticket(AnyLock *lock,u32 newval) { u32 oldval; // wait for our ticket to come up // NOTE: atomic_load is [probably] overkill here while (1) { #if 0 oldval = atomic_load(&lock->seqacq); #else oldval = lock->seqacq; #endif if (oldval == newval) break; } } void lock_acquire_ticket(AnyLock *lock) { u32 oldval; u32 newval; int ok; // acquire our ticket value // NOTE: just use a garbage value for oldval -- the exchange will // update it with the correct/latest value -- this saves a separate // refetch within the loop oldval = 0; while (1) { #if 0 BARRIER; oldval = lock->seqreq; #endif newval = oldval + 1; ok = atomic_compare_exchange_strong(&lock->seqreq,&oldval,newval); if (ok) break; } lock_wait_ticket(lock,newval); } void lock_release_ticket(AnyLock *lock) { // NOTE: atomic_fetch_add is [probably] overkill, but leave it for now #if 1 atomic_fetch_add(&lock->seqacq,1); #else lock->seqacq += 1; #endif } void lock_acquire_mutex(AnyLock *lock) { pthread_mutex_lock(&lock->mutex); } void lock_release_mutex(AnyLock *lock) { pthread_mutex_unlock(&lock->mutex); } void lock_init(AnyLock *lock) { switch (opt_L) { case 1: lock->seqreq = 0; lock->seqacq = 1; lock_acquire = lock_acquire_ticket; lock_release = lock_release_ticket; break; default: pthread_mutex_init(&lock->mutex,NULL); lock_acquire = lock_acquire_mutex; lock_release = lock_release_mutex; break; } } void startQueue(TQueue *queue) { queue->head = NULL; queue->tail = NULL; } int empty(TQueue *queue) { return (queue->head == NULL); } // cellnew_pool -- allocate a queue entry Cell * cellnew_pool(void) { int cnt; Cell *p; Cell *pool; while (1) { // try for quick allocation p = cellpool; // bug out if we got it if (p != NULL) { cellpool = p->next; break; } // go to the heap to replenish the pool cnt = 1000; p = xalloc(cnt,sizeof(Cell)); // link up the entries pool = NULL; for (; cnt > 0; --cnt, ++p) { p->next = pool; pool = p; } // put this "online" cellpool = pool; } return p; } // cellfree_pool -- release a queue entry void cellfree_pool(Cell *p) { p->next = cellpool; cellpool = p; } // cellnew_std -- allocate a queue entry Cell * cellnew_std(void) { Cell *p; p = xalloc(1,sizeof(Cell)); return p; } // cellfree_std -- release a queue entry void cellfree_std(Cell *p) { free(p); } void enqueue(TQueue *queue, Task *t) { Cell *p; lock_acquire(&task_mutex); p = cellnew(); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } lock_release(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; lock_acquire(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; t = p->t; cellfree(p); } lock_release(&task_mutex); return t; } void * work(void *arg) { Thread *tskcur = arg; TQueue *queue = tskcur->queue; Task *t; Log *log; long cnt; int tprev; byte *bitv; double tvbeg; double tvbef; double tvaft; log = tskcur->log; bitv = tskcur->bitv; tvbeg = tvgetf(); tprev = 0; while (1) { tvbef = tvgetf(); t = dequeue(queue); tvaft = tvgetf(); if (t == NULL) break; // abort if we get a double entry if (btvset(bitv,t->number)) sysfault("work: duplicate\n"); if (opt_p) { printf("[%.9f/%.9f %5.5d] %d [%d]\n", tvbef,tvaft - tvbef,tskcur->xid,t->number,t->number - tprev); tprev = t->number; continue; } log->tvbef = tvbef; log->tvaft = tvaft; log->number = t->number; ++log; } if (! opt_p) { tvaft = tvgetf(); cnt = log - tskcur->log; log = tskcur->log; lock_acquire(&print_mutex); printf("\n"); printf("THREAD=%5.5d START=%.9f STOP=%.9f ELAP=%.9f TOTAL=%ld\n", tskcur->xid,tvbeg,tvaft,tvaft - tvbeg,cnt); tprev = 0; for (; cnt > 0; --cnt, ++log) { printf("[%.9f/%.9f %5.5d] %d [%d]\n", log->tvbef,log->tvaft - log->tvbef,tskcur->xid, log->number,log->number - tprev); tprev = log->number; } lock_release(&print_mutex); } return (void *) 0; } void btvchk(Thread *tska,Thread *tskb) { byte *btva; byte *btvb; byte aval; byte bval; int idx; printf("btvchk: %d ??? %d\n",tska->xid,tskb->xid); btva = tska->bitv; btvb = tskb->bitv; // abort if we get overlapping entries between two threads for (idx = 0; idx < bitvlen; ++idx) { aval = btva[idx]; bval = btvb[idx]; if (aval & bval) sysfault("btvchk: duplicate\n"); } } // For a simple test i runned this on main: int main(int argc,char **argv) { char *cp; TQueue *queue; Task *t; Thread *tsk; --argc; ++argv; for (; argc > 0; --argc, ++argv) { cp = *argv; if (*cp != '-') break; switch (cp[1]) { case 'p': // print immediately opt_p = 1; break; case 'Q': // number of queue items opt_Q = atoi(cp + 2); break; case 'T': // number of threads opt_T = atoi(cp + 2); break; case 'L': opt_L = 1; break; case 'M': opt_M = 1; break; default: break; } } printf("p=%d -- thread log is %s\n",opt_p,opt_p ? "immediate" : "deferred"); if (opt_T == 0) opt_T = 16; printf("T=%d (number of threads)\n",opt_T); if (opt_Q == 0) opt_Q = 1000000; printf("Q=%d (number of items to enqueue)\n",opt_Q); printf("L=%d -- lock is %s\n",opt_L,opt_L ? "ticket" : "mutex"); printf("M=%d -- queue item allocation is %s\n", opt_M,opt_M ? "pooled" : "malloc/free"); tvzero = tvgetf(); lock_init(&task_mutex); lock_init(&print_mutex); // select queue item allocation strategy switch (opt_M) { case 1: cellnew = cellnew_pool; cellfree = cellfree_pool; break; default: cellnew = cellnew_std; cellfree = cellfree_std; break; } queue = xalloc(1,sizeof(TQueue)); startQueue(queue); Thread threads[opt_T]; // get byte length of bit vectors bitvlen = BTVLEN(opt_Q + 1); // allocate per-thread log buffers for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; if (! opt_p) tsk->log = xalloc(opt_Q,sizeof(Log)); tsk->bitv = xalloc(bitvlen,sizeof(byte)); } // allocate "work to do" t = xalloc(opt_Q,sizeof(Task)); // add to master queue for (int i = 0; i < opt_Q; i++) { t[i].number = i + 1; enqueue(queue, &t[i]); } // fire up the threads for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; tsk->xid = i + 1; tsk->queue = queue; pthread_create(&tsk->tid, NULL, work, tsk); } // wait for threads to complete for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; pthread_join(tsk->tid, NULL); } // wait for threads to complete for (int i = 0; i < opt_T; i++) { for (int j = i + 1; j < opt_T; j++) btvchk(&threads[i],&threads[j]); } printf("TOTAL: %.9f\n",tvgetf()); free(t); return 0; }