如何使用共享内存在进程之间共享信号量

我必须将N个客户端进程与一个服务器同步。 这些进程由一个main函数分叉,我在其中声明了3个信号量。 我决定使用POSIX信号量,但我不知道如何在这些进程之间共享它们。 我认为共享内存应该正常工作,但我有一些问题:

  • 如何在我的细分中分配正确的内存空间?
  • 我可以在shmget size_t字段中使用sizeof(sem_t)来准确分配我需要的空间吗?
  • 有没有人有类似这种情况的例子?

共享命名的POSIX信号量很容易

  • 选择信号量的名称

     #define SNAME "/mysem" 
  • 在创建它们的过程sem_openO_CREAT一起使用

     sem_t *sem = sem_open(SNAME, O_CREAT, 0644, 3); /* Initial value is 3. */ 
  • 在其他进程中打开信号量

     sem_t *sem = sem_open(SEM_NAME, 0); /* Open a preexisting semaphore. */ 

如果你坚持使用共享内存,那肯定是可能的。

 int fd = shm_open("shmname", O_CREAT, O_RDWR); ftruncate(fd, sizeof(sem_t)); sem_t *sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); sem_init(sem, 1, 1); 

我没有测试过上面所以它可能是完全疯狂的。

我编写了一个示例应用程序,其中父(生产者)生成N个子(消费者)线程并使用全局数组在它们之间传递数据。 全局数组是循环存储器,当数据写入arrays时受到保护。

头文件:

 #define TRUE 1 #define FALSE 0 #define SUCCESS 0 #define FAILURE -1 /* * Function Declaration. */ void Parent( void ); int CalcBitmap(unsigned int Num); void InitSemaphore( void ); void DeInitSemaphore( void ); int ComputeComplexCalc( int op1, int op2); /* * Thread functions. */ void *Child( void *arg ); void *Report( void *arg1 ); /* * Macro Definition. */ #define INPUT_FILE "./input.txt" #define NUM_OF_CHILDREN 10 #define SIZE_CIRCULAR_BUF 256 #define BUF_SIZE 128 //Set the bits corresponding to all threads. #define SET_ALL_BITMAP( a ) a = uiBitmap; //Clears the bit corresponding to a thread after //every read. #define CLEAR_BIT( a, b ) a &= ~( 1 << b ); /* * Have a dedicated semaphore for each buffer * to synchronize the acess to the shared memory * between the producer (parent) and the consumers. */ sem_t ReadCntrMutex; sem_t semEmptyNode[SIZE_CIRCULAR_BUF]; /* * Global Variables. */ unsigned int uiBitmap = 0; //Counter to track the number of processed buffers. unsigned int Stats; unsigned int uiParentCnt = 0; char inputfile[BUF_SIZE]; int EndOfFile = FALSE; /* * Data Structure Definition. ( Circular Buffer ) */ struct Message { int id; unsigned int BufNo1; unsigned int BufNo2; /* Counter to check if all threads read the buffer. */ unsigned int uiBufReadCntr; }; struct Message ShdBuf[SIZE_CIRCULAR_BUF]; 

源代码:

  /* # # ipc_communicator is a IPC binary where a parent # create ten child threads, read the input parameters # from a file, sends that parameters to all the children. # Each child computes the operations on those input parameters # and writes them on to their own file. # # Author: Ashok Vairavan Date: 8/8/2014 */ /* * Generic Header Files. */ #include "stdio.h" #include "unistd.h" #include "string.h" #include "pthread.h" #include "errno.h" #include "malloc.h" #include "fcntl.h" #include "getopt.h" #include "stdlib.h" #include "math.h" #include "semaphore.h" /* * Private Header Files. */ #include "ipc*.h" /* * Function to calculate the bitmap based on the * number of threads. This bitmap is used to * track which thread read the buffer and the * pending read threads. */ int CalcBitmap(unsigned int Num) { unsigned int uiIndex; for( uiIndex = 0; uiIndex < Num; uiIndex++ ) { uiBitmap |= 1 << uiIndex; } return uiBitmap; } /* * Function that performs complex computation on * numbers. */ int ComputeComplexCalc( int op1, int op2) { return sqrt(op1) + log(op2); } /* * Function to initialise the semaphores. * semEmptyNode indicates if the buffer is available * for write. semFilledNode indicates that the buffer * is available for read. */ void InitSemaphore( void ) { unsigned int uiIndex=0; CalcBitmap(NUM_OF_CHILDREN); sem_init( &ReadCntrMutex, 0, 1); while( uiIndex uiTotalCnt ) { /* Access the shared memory */ op1 = ShdBuf[uiCnt].BufNo1; op2 = ShdBuf[uiCnt].BufNo2; fprintf(fp, "%d %d = %d\n", op1, op2, ComputeComplexCalc( op1, op2) ); sem_wait( &ReadCntrMutex ); ShdBuf[uiCnt].uiBufReadCntr &= ~( 1 << ThreadID ); if( ShdBuf[uiCnt].uiBufReadCntr == 0 ) { __sync_add_and_fetch(&Stats, 1); /* Release the semaphore lock if all readers read the data */ sem_post(&semEmptyNode[uiCnt]); } sem_post( &ReadCntrMutex ); uiTotalCnt++; uiCnt = uiTotalCnt % SIZE_CIRCULAR_BUF; /* If the parent reaches the end of file and if the child thread read all the data then break out */ if( isEOF() && ( uiTotalCnt == uiParentCnt ) ) { //printf("Thid %dp %dc %d\n", ThreadID, uiParentCnt, uiCnt ); break; } } else { /* Sleep for ten micro seconds before checking the shared memory */ usleep(10); } } fclose( fp ); return NULL; } void usage( void ) { printf(" Usage:\n"); printf(" -f - the absolute path of the input file where the input parameters are read from.\n\t\t Default input file: ./input.txt"); printf(" -? - Help Menu. \n" ); exit(1); } int main( int argc, char *argv[]) { pthread_attr_t ThrAttr; pthread_t ThrChild[NUM_OF_CHILDREN], ThrReport; unsigned int uiThdIndex = 0; unsigned int *pThreadID; int c; strncpy( inputfile, INPUT_FILE, BUF_SIZE ); while( ( c = getopt( argc, argv, "f:" )) != -1 ) { switch( c ) { case 'f': strncpy( inputfile, optarg, BUF_SIZE ); break; default: usage(); } } if( access( inputfile, F_OK ) == -1 ) { perror( "access" ); return FAILURE; } InitSemaphore(); pthread_attr_init( &ThrAttr ); while( uiThdIndex< NUM_OF_CHILDREN ) { /* Allocate the memory from the heap and pass it as an argument * to each thread to avoid race condition and invalid reference * issues with the local variables. */ pThreadID = (unsigned int *) malloc( sizeof( unsigned int ) ); if( pThreadID == NULL ) { perror( "malloc" ); /* Cancel pthread operation */ return FAILURE; } *pThreadID = uiThdIndex; if( pthread_create( &ThrChild[uiThdIndex], NULL, &Child, pThreadID) != 0 ) { printf( "pthread %d creation failed. Error: %d\n", uiThdIndex, errno); perror( "pthread_create" ); return FAILURE; } uiThdIndex++; } /* Report thread reports the statistics of IPC communication * between parent and childs threads. */ if( pthread_create( &ThrReport, NULL, &Report, NULL) != 0 ) { perror( "pthread_create" ); return FAILURE; } Parent(); uiThdIndex = 0; while( uiThdIndex < NUM_OF_CHILDREN ) { pthread_join( ThrChild[uiThdIndex], NULL ); uiThdIndex++; } /* * Cancel the report thread function when all the * children exits. */ pthread_cancel( ThrReport ); DeInitSemaphore(); return SUCCESS; } 

我真的怀疑共享内存方法是否会起作用。

AFAIK信号量实际上只是一个句柄,对它打开的进程有效。真正的信号量信息在于内核内存。

因此在其他过程中使用相同的句柄值是行不通的。

是的,我们可以在两个进程之间使用命名信号量。 我们可以打开一个文件。

 snprintf(sem_open_file, 128, "/sem_16"); ret_value = sem_open((const char *)sem_open_file, 0, 0, 0); if (ret_value == SEM_FAILED) { /* * No such file or directory */ if (errno == ENOENT) { ret_value = sem_open((const char *)sem_open_file, O_CREAT | O_EXCL, 0777, 1); } } 

而其他过程可以使用这个。