在单个线程中使用循环缓冲区的可能性

我有一个UDP线程,它通过来自不同多路复用流的recvmmsg系统调用读取多个数据报,并将它们推送到不同的循环/环形缓冲区。 这些环形缓冲区是Stream结构的一部分。 每个流每20ms发送一个语音帧。 因此UDP数据包可能如下所示:F1S1 F1S2 F1S3 F2S1依此类推或者如果出现突发情况,它可能如下所示:F1S1 F2S1 F3S1 F1S2,依此类推。 在接收之后,这些数据包将由一个按照ITP原理工作的库并行处理。 UDP线程必须将这些parllel任务与要处理的数据包列表一起分派。 这里的限制是任务不能从SAME流并行处理两个帧。任务必须有自己独立的内存用于帧处理。 所以我需要确保这些帧的FIFO执行顺序,这将在我产生这些任务之前在UDP线程中完成。 目前,当我收到这些数据包时,我查找streamId并将帧放在循环缓冲区中,该缓冲区是带有for_loop的Stream Strctures的一部分。

这是代码,显示UDP线程中发生的事情。

while (!(*thread_stop)) { int nr_datagrams = recvmmsg(socket_handle->fd_udp, datagramS, VLEN, 0, NULL); ..... for (int i = 0; i circBuff.seqNum[codecPtr->circBuff.newestIdx] = _seq_num; // Update the entry pointer to point to the newest frame codecPtr->circBuff.entries = codecPtr->circBuff.entries + codecPtr->circBuff.newestIdx * codecPtr->frameLength; // Copy the contents of the frame in entry buffer memcpy(codecPtr->circBuff.entries, 2 * sizeof(uint16_t) + datagramBuff[i], codecPtr->frameLength); // Update the newest Index codecPtr->circBuff.newestIdx = (codecPtr->circBuff.newestIdx + 1) & codecPtr->circBuffSize; } 

我的程序现在应该从最近接收数据但不是最新数据的不同流的环形缓冲区中弹出帧,因为在突发的情况下,所有最近接收的数据包可能属于相同的流。 现在我应该如何前进是我面临的困境?

(在OP澄清了有关其他细节的问题之后,这是完全重写。)

我建议对接收的数据报使用无序缓冲区,每个数据报都有流标识符,流计数器和接收计数器; 以及每个流的最新调度计数器:

 #define _GNU_SOURCE #include  #include  #include  #include  #include  #include  /* Maximum number of buffered datagrams */ #define MAX_DATAGRAMS 16 /* Maximum size of each datagram */ #define MAX_DATAGRAM_SIZE 4096 /* Maximum number of streams */ #define MAX_STREAMS 4 typedef struct { int stream; /* -1 for none */ unsigned int counter; /* Per stream counter */ unsigned int order; /* Global counter */ unsigned int size; /* Bytes of data in data[] */ char data[MAX_DATAGRAM_SIZE]; } datagram; void process(const int socketfd) { /* Per-stream counters for latest dispatched message */ unsigned int newest_dispatched[MAX_STREAMS] = { 0U }; /* Packet buffer */ datagram buffer[MAX_DATAGRAMS]; /* Sender IPv4 addresses */ struct sockaddr_in from[MAX_DATAGRAMS]; /* Vectors to refer to packet buffer .data member */ struct iovec iov[MAX_DATAGRAMS]; /* Message headers */ struct mmsghdr hdr[MAX_DATAGRAMS]; /* Buffer index; hdr[i], iov[i], from[i] all refer * to buffer[buf[i]]. */ unsigned int buf[MAX_DATAGRAMS]; /* Temporary array indicating which buffer contains * the next datagram to be dispatched for each stream */ int next[MAX_STREAMS]; /* Receive counter (not stream specific) */ unsigned int order = 0U; int i, n; /* Mark all buffers unused. */ for (i = 0; i < MAX_DATAGRAMS; i++) { buffer[i].stream = -1; buffer[i].size = 0U; } /* Clear stream dispatch counters. */ for (i = 0; i < MAX_STREAMS; i++) newest_dispatched[i] = 0U; while (1) { /* Discard datagrams received too much out of order. */ for (i = 0; i < MAX_DATAGRAMS; i++) if (buffer[i].stream >= 0) if (buffer[i].counter - newest_dispatched[buffer[i].stream] >= UINT_MAX/2) { /* Either in the past, or too far into the future */ buffer[i].stream = -1; buffer[i].size = 0U; } /* Prepare for receiving new messages. * Stream -1 indicates unused/processed message. */ for (n = 0, i = 0; i < MAX_DATAGRAMS; i++) if (buffer[i].stream == -1) { /* Prep the buffer. */ buffer[i].stream = -1; buffer[i].counter = 0U; buffer[i].order = order + n; buffer[i].size = 0U; /* Local index n refers to buffer i. */ buf[n] = i; /* Local index n refers to buffer i data. */ iov[n].iov_base = buffer[i].data; iov[n].iov_len = sizeof buffer[i].data; /* Clear received bytes counter. */ hdr[n].msg_len = 0U; /* Source address to from[] array. */ hdr[n].msg_hdr.msg_name = from + i; hdr[n].msg_hdr.msg_namelen = sizeof from[i]; /* Payload per iov[n]. */ hdr[n].msg_hdr.msg_iov = iov + n; hdr[n].msg_hdr.msg_iovlen = 1; /* No ancillary data. */ hdr[n].msg_hdr.msg_control = NULL; hdr[n].msg_hdr.msg_controllen = 0; /* Clear received message flags */ hdr[n].msg_hdr.msg_flags = 0; /* Prepared one more hdr[], from[], iov[], buf[]. */ n++; } if (n < 1) { /* Buffer is full. Find oldest received datagram. */ unsigned int max_age = 0U; int oldest = -1; for (i = 0; i < MAX_DATAGRAMS; i++) { const unsigned int age = order - buffer[i].order; if (age >= max_age) { max_age = age; oldest = i; } } /* TODO: Dispatch the oldest received datagram: * Stream buffer[oldest].stream * Data buffer[oldest].data, buffer[oldest].size bytes */ /* Update stream counters. */ newest_dispatched[buffer[oldest].stream] = buffer[oldest].counter; /* Remove buffer. */ buffer[oldest].stream = -1; buffer[oldest].size = 0; /* Need more datagrams. */ continue; } n = recvmmsg(socketfd, hdr, n, 0, NULL); if (n < 1) { /* TODO: Check for errors. */ continue; } /* Update buffer description for each received message. */ for (i = 0; i < n; i++) { const int b = buf[i]; buffer[b].order = order; /* Already set, actually */ buffer[b].size = hdr[i].msg_len; /* TODO: determine stream and counter, * based on from[i] and buffer[b].data. * This assigns them in round-robin fashion. */ buffer[b].stream = order % MAX_STREAMS; buffer[b].counter = order / MAX_STREAMS; /* Account for the message received. */ order++; } while (1) { /* Clear next-to-be-dispatched index list. */ for (i = 0; i < MAX_STREAMS; i++) next[i] = -1; /* Find next messages to be dispatched. */ for (i = 0; i < MAX_DATAGRAMS; i++) if (buffer[i].stream >= 0 && buffer[i].counter == newest_dispatched[buffer[i].stream] + 1U) next[buffer[i].stream] = i; /* Note: This is one point where you will wish to * ensure all pending dispatches are complete, * before issuing new ones. */ /* Count (n) and dispatch the messages. */ for (n = 0, i = 0; i < MAX_STREAMS; i++) if (next[i] != -1) { const int b = next[i]; const int s = buffer[b].stream; /* TODO: Dispatch buffer b, stream s. */ /* Update dispatch counters. */ newest_dispatched[s]++; n++; } /* Nothing dispatched? */ if (n < 1) break; /* Remove dispatched messages from the buffer. Also remove duplicates. */ for (i = 0; i < MAX_DATAGRAMS; i++) if (buffer[i].stream >= 0 && buffer[i].counter == newest_dispatched[buffer[i].stream]) { buffer[i].stream = -1; buffer[i].size = 0U; } } } } 

请注意,我省略了您应该等待调度消息完成的点(因为有多个选项,具体取决于您的调度方式以及您是否希望同时“工作”)。 此外,此代码仅经过编译测试,因此可能包含逻辑错误。

循环结构如下:

  1. 丢弃过去或过去太远的缓冲消息是有用的。

    计数器是循环的。 我在这里添加了计数器包装逻辑的描述。

  2. 为每个空闲缓冲区槽构建recvmmsg()头文件。

  3. 如果没有可用的缓冲区插槽,请查找和分发或丢弃最旧的缓冲区,并从步骤1开始重复。

  4. 接收一条或多条消息。

  5. 根据收到的消息,更新缓冲区插槽。

    重点是确定流,流计数器和接收消息中的字节数。

  6. 调度循环。

    这是一个循环,因为如果我们不按顺序接收消息,但稍后完成消息,我们将需要一次分派多组消息。

    在循环内,首先清除流索引数组( next[] )。

    然后,我们检查缓冲区以查找接下来要分派的消息。 为此,我们需要每个流计数器。 如果我们收到重复的UDP数据报,则在单独的步骤中完成。

    如果没有任何流已经缓冲了它们的下一条消息,我们将退出此循环,并等待新的数据报到达。

    接下来发送消息。 循环为每个流调度最多一条消息。

    发送后,我们删除已分派的邮件。 我们循环遍历整个缓冲区,而不是循环遍历每个流并删除与该流相对应的缓冲区,以便我们捕获重复的UDP消息。

请注意,缓冲区根本不会以上述顺序复制。

如果消息是压缩或未压缩的音频,则需要为未压缩的音频流提供额外的(循环)缓冲区。 为所有UDP消息提供共享的无序缓冲区的好处是,您始终可以选择接下来要进行的音频流(如果您已收到该数据报),并且不会意外地花费这么多时间推进其他流可能会耗尽数据的一个流,造成音频故障。

每个音频流的循环缓冲区大小应至少是数据报最大大小的三倍。 这使得您可以使用包装逻辑((later % LIMIT) + LIMIT - (earlier % LIMIT)) % LIMIT ,每个样本的结果> LIMIT/2表示反向顺序),并在回放/解压缩期间追加新数据。 (Dispatcher更新一个索引,音频播放另一个索引。只需确保它们以primefaces方式访问。)较大的音频流缓冲区可能会导致较大的延迟。

总之,假设音频流解复用和调度即将到来,有两个完全独立的缓冲器结构可供使用。 对于UDP数据报,使用无序的一组缓冲槽。 缓冲区插槽需要一些记账(如上面的代码所示),但是为了获得许多不同的流而调度它们非常简单。 但是,每个音频流确实需要一个循环缓冲区(至少是(解压缩)数据报最大大小的三倍)。

不幸的是,我没有看到在这里使用独立任务并行性的任何好处(例如羊毛C库 )。

实际上,为每个流添加一个结构来描述解压缩器状态可能更简单,并根据哪个循环音频缓冲区具有最少的缓冲数据来确定它们的优先级。 典型的解压缩器报告它们是否需要更多数据,因此每个流添加一个临时工作区(两个压缩数据报)将允许解压缩器消耗整个数据包,但仅在绝对必要时复制内存。


编辑添加有关循环缓冲区的详细信息:

跟踪循环缓冲区状态有两种主要方法,加上我怀疑在这里可能有用的第三种衍生方法:

  1. 使用单独的索引添加( head )和删除( tail )数据

    如果有一个生产者和一个消费者,循环缓冲区可以无锁地维护,因为生产者只增加head ,而消费者增加tail

    head == tail时缓冲区为空。 如果缓冲区有SIZE条目, head = head % SIZEtail = tail % SIZE ,则有(head + SIZE - tail) % SIZE缓冲条目。

    缺点是一个简单的实现总是在缓冲区中至少有一个空闲条目,因为上面简单的模运算不能区分所有和没有使用的条目。 稍微复杂的代码有一些变通方法。

    在简单的情况下,缓冲区有SIZE - 1 - (head + SIZE - tail) % SIZE free条目。

  2. 缓冲数据从索引start ,缓冲length条目。

    缓冲区内容在内存中总是连续的,或者在内存中分成两部分(第一部分在缓冲区空间的末尾结束,第二部分从缓冲区空间的开始处开始)。 生产者和消费者需要修改startlength ,因此无锁使用需要比较和交换primefaces操作(通常将两者都打包成一个整数)。

    在任何时候,都使用length条目,并且在循环缓冲区中释放size - length条目。

    当生产者附加n数据条目时,它会复制从索引(start + length) % SIZE的数据,最后一个索引(start + length + n - 1) % SIZE ,并将length增加n 。 如上所述,要复制的数据可能是连续的,也可以分为两部分。

    当一个消费者消费n数据条目时,它会复制从索引start的数据,在索引(start + n) % SIZE最终条目,以及更新start = (start + n) % SIZE;length = length - n; 。 同样,消耗的数据可能会在内存中分成两部分(否则它将跨越缓冲区的末尾)。

  3. 衍生品。

    如果只有一个生产者线程/任务和一个使用者,我们可以将缓冲区状态变量加倍,以允许通过DMA或异步I / O 异步地从缓冲区添加或使用数据。

    1. 使用headtailhead_pendingtail_pending索引

      head != head_pending ,正在消耗从headhead_pending-1的数据。 完成后,消费者设置head = head_pending % SIZE

      tail != tail_pending ,在tail_pending-1 (包括tail索引tail处添加了更多数据。 传输完成后,生产者设置tail = tail_pending % SIZE

      请注意,使用DMA时,通常最好在内存中使用连续的块。 在微控制器中,通常使用中断将下一个DMA’ble块加载到DMA寄存器中,在这种情况下,您实际上有headhead_pendinghead_next ,或tailtail_pendingtail_next ,每个DMA的大小选择了一个块,这样你就不会在分裂点附近(在缓冲区的物理端)发送非常短的块,而是保持中断率可接受。

      在任何时候,缓冲区中都存在(head + SIZE - tail) % SIZE条目,可以使用。 使用简单的模运算,缓冲区中至少有一个条目始终未使用,因此可添加的最大条目数为SIZE - 1 - (head + SIZE - tail) % SIZE

    2. 使用startlengthincomingoutgoing

      在这里,必须以primefaces方式修改startlength ,以便另一方无法以新的length观察旧的start ,反之亦然。 如上所述,这可以无锁地完成,但必须小心,因为这是问题的常见根源。

      在任何时候,缓冲区包含length条目,添加incoming条目(在(start + length) % SIZE(start + length + incoming - 1) % SIZE ,包括端点,如果incoming > 0 ),并且incoming > 0条目被消耗(在start(start + outgoing - 1) % SIZE ,包括端点,如果outgoing > 0 )。

      传入传输完成后,生产者通过incoming增加length

      当传出传输完成时,消费者更新start = (start + outgoing) % SIZElength = length - outgoing

至于primefaces处理:

支持C11的C编译器提供了一系列primefaces函数,可用于primefaces地更新上述变量。 使用弱版本可以在不同类型的硬件之间实现最大兼容性。 对于startlength

  uint64_t buffer_state; /* Actual atomic variable */ uint64_t old_state; /* Temporary variable */ temp_state = atomic_load(&buffer_state); do { uint32_t start = temp_state >> 32; uint32_t length = (uint32_t)temp_state; uint64_t new_state; /* Update start and length as necessary */ new_state = (uint64_t)length | ((uint64_t)state << 32); } while (!atomic_compare_exchange_weak(&buffer_state, &old_state, new_state)); 

对于按amount递增一些缓冲区状态变量state ,缓冲区大小为size ,假设所有类型都是size_t

  size_t old; /* Temporary variable */ old = atomic_load(&state) % size; do { size_t new = (old + amount) % size; } while (!atomic_compare_exchange_weak(&state, &old, new)); 

请注意,如果atomic_compare_exchange_weak()失败,它会将当前的state值复制为old 。 这就是为什么只需要一个初始primefaces负载的原因。

许多C编译器提供了非标准的C11之前的primefaces内置函数,只是许多C编译器提供的常见扩展。 例如,可以使用primefaces方式修改startlength

  uint64_t buffer_state; /* Actual atomic variable */ uint64_t old_state, new_state; /* Temporary variables */ do { uint32_t start, length; old_state = buffer_state; /* Non-atomic access */ start = old_state >> 32; length = (uint32_t)old_state; /* Update start and/or length */ new_state = (uint64_t)length | ((uint64_t)start << 32); } while (!__sync_bool_compare_and_swap(&buffer_state, old_state, new_state)); 

要在许多C11前编译器上增加一些缓冲区状态变量state ,缓冲区大小为size ,假设所有类型都是size_t ,则可以使用:

  size_t old_state, new_state; /* Temporary variables */ do { old_state = state; new_state = (old_state + amount) % size; } while (!__sync_bool_compare_and_swap(&state, old_state, new_state)); 

所有这些primefaces都基本上旋转,直到修改成功地primefaces化。 虽然看起来两个或更多并发核心可以无休止地战斗,但是当前的缓存架构使得一个核心总是会赢(第一个)。 因此,在实践中,只要每个核心在执行其中一个primefaces更新循环之间还有其他工作要做,这些工作就可以了。 (确实,在无锁C代码中无处不在。)

我要提到的最后一部分是允许部分发送数据报。 这基本上意味着每个数据报缓冲区插槽不仅具有size (表示该插槽中的字节数),而且还可以start 。 收到新数据报时, start设置为零。 如果无法完全分派数据报(复制到每个流缓冲区),则更新缓冲区槽的startsize ,但不会增加流分派计数器。 这样,在下一轮中,该数据报的其余部分被缓冲。

我可以编写一个完整的例子,展示我如何使用我在上一段中提到的部分缓冲数据报方案将输入数据报从无序数据报缓冲区解压缩成几个流,但确切的实现在很大程度上取决于编程接口解压缩器库具有。

特别是,我个人更喜欢在例如POSIX iconv()函数中使用的接口 - 但可能返回状态代码而不是转换的字符数。 各种音频和语音库具有不同的接口,甚至可能无法将它们转换为这种接口。 (作为一个来自不同领域的示例,用于保护套接字通信的大多数SSL / TLS库没有这样的接口,因为它们总是希望直接直接访问套接字描述符;这使得单线程多套接字异步SSL / TLS实现“很难”。好吧,更像是“从头开始写,如果你想要它”。)对于解压缩数据的音频分析,比如使用FFTW库进行快速傅里叶变换(或DCT,或Hartley,或其他一个变换那个优秀的库执行,特别是当该窗口大小的变换的优化智慧可用时),解压缩的数据通常需要固定大小的块。 这也会影响确切的实施。