为什么在工作交错时TCP写入延迟会更糟?

我一直在分析TCP延迟(特别是从用户空间到小消息的内核空间的write ),以便对write的延迟有所了解(确认这可能是特定于上下文的)。 我注意到测试之间存在很大的不一致,这对我来说似乎很相似,而且我很想知道差异来自何处。 我知道微基准测试可能会有问题,但我仍然觉得我缺少一些基本的理解(因为延迟差异是~10倍)。

设置是我有一个C ++ TCP服务器接受一个客户端连接(来自同一CPU上的另一个进程),并且在与客户端连接时进行20次系统调用以write套接字,一次发送一个字节。 服务器的完整代码将在本文末尾复制。 这是使用boost/timer每次write时间输出(增加~1 mic的噪声):

 $ clang++ -std=c++11 -stdlib=libc++ tcpServerStove.cpp -O3; ./a.out 18 mics 3 mics 3 mics 4 mics 3 mics 3 mics 4 mics 3 mics 5 mics 3 mics ... 

我可靠地发现第一次write明显慢于其他write 。 如果我在一个计时器中包装10,000个write调用,则每次write的平均值为2微秒,但第一个调用始终是15个以上的麦克风。 为什么会出现这种“升温”现象?

相关地,我运行了一个实验,在每个write调用之间我做了一些阻止CPU工作(计算一个大的素数)。 这会导致所有 write调用都很慢:

 $ clang++ -std=c++11 -stdlib=libc++ tcpServerStove.cpp -O3; ./a.out 20 mics 23 mics 23 mics 30 mics 23 mics 21 mics 21 mics 22 mics 22 mics ... 

鉴于这些结果,我想知道在将字节从用户缓冲区复制到内核缓冲区的过程中是否存在某种批处理。 如果多次write调用快速连续发生,它们是否会合并为一个内核中断?

特别是我正在寻找一些关于将缓冲区从用户空间复制到内核空间需要多长时间的概念。 如果有一些合并效应允许平均write在我连续执行10,000次时只占用2个麦克风,那么得出结论write延迟是2个麦克风是不公平的乐观; 似乎我的直觉应该是每次write需要20微秒。 对于没有内核旁路的最低延迟(一个字节的原始write调用),这似乎非常慢。

最后一条数据是,当我在我的计算机上的两个进程(TCP服务器和TCP客户端)之间设置乒乓测试时,我平均每次往返6个麦克风(包括readwrite ,以及通过localhost网络移动)。 这似乎与上面看到的单个写入的20个麦克风延迟不一致。

TCP服务器的完整代码:

 // Server side C/C++ program to demonstrate Socket programming // #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  // Set up some blocking work. bool isPrime(int n) { if (n < 2) { return false; } for (int i = 2; i = n) { return i; } } i++; } } int main(int argc, char const *argv[]) { int server_fd, new_socket, valread; struct sockaddr_in address; int opt = 1; int addrlen = sizeof(address); // Create socket for TCP server server_fd = socket(AF_INET, SOCK_STREAM, 0); // Prevent writes from being batched setsockopt(server_fd, SOL_SOCKET, TCP_NODELAY, &opt, sizeof(opt)); setsockopt(server_fd, SOL_SOCKET, TCP_NOPUSH, &opt, sizeof(opt)); setsockopt(server_fd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)); setsockopt(server_fd, SOL_SOCKET, SO_SNDLOWAT, &opt, sizeof(opt)); address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(8080); bind(server_fd, (struct sockaddr *)&address, sizeof(address)); listen(server_fd, 3); // Accept one client connection new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen); char sendBuffer[1] = {0}; int primes[20] = {0}; // Make 20 sequential writes to kernel buffer. for (int i = 0; i < 20; i++) { sendBuffer[0] = i; boost::timer t; write(new_socket, sendBuffer, 1); printf("%d mics\n", int(1e6 * t.elapsed())); // For some reason, doing some blocking work between the writes // The following work slows down the writes by a factor of 10. // primes[i] = getPrime(10000 + i); } // Print a prime to make sure the compiler doesn't optimize // away the computations. printf("prime: %d\n", primes[8]); } 

TCP客户端代码:

 // Server side C/C++ program to demonstrate Socket programming // #include  #include  #include  #include  #include  #include  #include  #include  #include  int main(int argc, char const *argv[]) { int sock, valread; struct sockaddr_in address; int opt = 1; int addrlen = sizeof(address); // We'll be passing uint32's back and forth unsigned char recv_buffer[1024] = {0}; // Create socket for TCP server sock = socket(AF_INET, SOCK_STREAM, 0); setsockopt(sock, SOL_SOCKET, TCP_NODELAY, &opt, sizeof(opt)); address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(8080); // Accept one client connection if (connect(sock, (struct sockaddr *)&address, (socklen_t)addrlen) != 0) { throw("connect failed"); } read(sock, buffer_pointer, num_left); for (int i = 0; i < 10; i++) { printf("%d\n", recv_buffer[i]); } } 

我尝试使用和不使用标志TCP_NODELAYTCP_NOPUSHSO_SNDBUFSO_SNDLOWAT ,并认为这可能会阻止批处理(但我的理解是这种批处理发生在内核缓冲区和网络之间,而不是在用户缓冲区和内核缓冲区之间) 。

这是乒乓测试的服务器代码:

 // Server side C/C++ program to demonstrate Socket programming // #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  __inline__ uint64_t rdtsc(void) { uint32_t lo, hi; __asm__ __volatile__ ( "xorl %%eax,%%eax \n cpuid" ::: "%rax", "%rbx", "%rcx", "%rdx"); __asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi)); return (uint64_t)hi << 32 | lo; } // Big Endian (network order) unsigned int fromBytes(unsigned char b[4]) { return b[3] | b[2]<<8 | b[1]<<16 | b[0]<>8; b[1] = x>>16; b[0] = x>>24; } int main(int argc, char const *argv[]) { int server_fd, new_socket, valread; struct sockaddr_in address; int opt = 1; int addrlen = sizeof(address); unsigned char recv_buffer[4] = {0}; unsigned char send_buffer[4] = {0}; // Create socket for TCP server server_fd = socket(AF_INET, SOCK_STREAM, 0); address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(8080); bind(server_fd, (struct sockaddr *)&address, sizeof(address)); listen(server_fd, 3); // Accept one client connection new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen); printf("Connected with client!\n"); int counter = 0; unsigned int x = 0; auto start = rdtsc(); boost::timer t; int n = 10000; while (counter < n) { valread = read(new_socket, recv_buffer, 4); x = fromBytes(recv_buffer); toBytes(x+1, send_buffer); write(new_socket, send_buffer, 4); ++counter; } printf("%f clock cycles per round trip (rdtsc)\n", (rdtsc() - start) / double(n)); printf("%f mics per round trip (boost timer)\n", 1e6 * t.elapsed() / n); } 

这是乒乓测试的客户端代码:

 // #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  // Big Endian (network order) unsigned int fromBytes(unsigned char b[4]) { return b[3] | b[2]<<8 | b[1]<<16 | b[0]<>8; b[1] = x>>16; b[0] = x>>24; } int main(int argc, char const *argv[]) { int sock, valread; struct sockaddr_in address; int opt = 1; int addrlen = sizeof(address); // We'll be passing uint32's back and forth unsigned char recv_buffer[4] = {0}; unsigned char send_buffer[4] = {0}; // Create socket for TCP server sock = socket(AF_INET, SOCK_STREAM, 0); // Set TCP_NODELAY so that writes won't be batched setsockopt(sock, SOL_SOCKET, TCP_NODELAY, &opt, sizeof(opt)); address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(8080); // Accept one client connection if (connect(sock, (struct sockaddr *)&address, (socklen_t)addrlen) != 0) { throw("connect failed"); } unsigned int lastReceived = 0; while (true) { toBytes(++lastReceived, send_buffer); write(sock, send_buffer, 4); valread = read(sock, recv_buffer, 4); lastReceived = fromBytes(recv_buffer); } } 

这里有一些问题。

为了更接近答案,您需要让客户端做两件事:1。接收所有数据。 2.记录每次阅读的大小。 我是这样做的:

  int loc[N+1]; int nloc, curloc; for (nloc = curloc = 0; curloc < N; nloc++) { int n = read(sock, recv_buffer + curloc, sizeof recv_buffer-curloc); if (n <= 0) { break; } curloc += n; loc[nloc] = curloc; } int last = 0; for (int i = 0; i < nloc; i++) { printf("%*.*s ", loc[i] - last, loc[i] - last, recv_buffer + last); last = loc[i]; } printf("\n"); 

并将N定义为20(抱歉,成长),并将服务器更改为一次写入一个字节。 现在,当这打印出类似于:

  abcdefghijklmnopqrs 

我们知道服务器正在发送1个字节的数据包; 但是当它打印出类似于:

  a bcde fghi jklm nop qrs 

我们怀疑服务器主要发送4字节数据包。

根本问题是TCP_NODELAY不会执行您怀疑的操作。 Nagle的算法,当存在未确认的已发送数据包时累积输出; TCP_NODELAY控制是否应用此选项。

无论TCP_NODELAY如何,您仍然是STREAM_SOCKET,这意味着N-writes可以合并为一个。 sockets正在为设备供电,但同时您正在为sockets供电。 一旦数据包[mbuf,skbuff,...]已经提交给设备,套接字需要在下一个write()s上创建一个新的数据包。 一旦设备准备好接收新数据包,套接字就可以提供它,但在此之前,数据包将充当缓冲区。 在缓冲模式下,写入非常快,因为所有必要的数据结构都可用[如评论和其他答案中所述]。

您可以通过调整SO_SNDBUF和SO_SNDLOWAT套接字选项来控制此缓冲。 请注意,但accept返回的缓冲区不会inheritance提供的套接字的缓冲区大小。 通过将SNDBUF减少到1

输出如下:

 abcdefghijklmnopqrst a bcdefgh ijkl mno pqrst ab cdefg hij klm nop qrst abcdefghijklmnopqrst 

对应从默认开始,然后在后续连接上连续向服务器端添加:TCP_NODELAY,TCP_NOPUSH,SO_SNDBUF(= 1),SO_SNDLOWAT(= 1)。 每次迭代都有比以前更平坦的时间差。

您的里程可能会有所不同,这是在MacOS 10.12上; 并且我用rdtsc()将程序更改为C ++,因为我有信任问题。

 /* srv.c */ // Server side C/C++ program to demonstrate Socket programming // #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  #ifndef N #define N 20 #endif int nap = 0; int step = 0; extern long rdtsc(void); void xerror(char *f) { perror(f); exit(1); } #define Z(x) if ((x) == -1) { xerror(#x); } void sopt(int fd, int opt, int val) { Z(setsockopt(fd, SOL_SOCKET, opt, &val, sizeof(val))); } int gopt(int fd, int opt) { int val; socklen_t r = sizeof(val); Z(getsockopt(fd, SOL_SOCKET, opt, &val, &r)); return val; } #define POPT(fd, x) printf("%s %d ", #x, gopt(fd, x)) void popts(char *tag, int fd) { printf("%s: ", tag); POPT(fd, SO_SNDBUF); POPT(fd, SO_SNDLOWAT); POPT(fd, TCP_NODELAY); POPT(fd, TCP_NOPUSH); printf("\n"); } void stepsock(int fd) { switch (step++) { case 7: step = 2; case 6: sopt(fd, SO_SNDLOWAT, 1); case 5: sopt(fd, SO_SNDBUF, 1); case 4: sopt(fd, TCP_NOPUSH, 1); case 3: sopt(fd, TCP_NODELAY, 1); case 2: break; } } int main(int argc, char const *argv[]) { int server_fd, new_socket, valread; struct sockaddr_in address; int opt = 1; int addrlen = sizeof(address); // Create socket for TCP server server_fd = socket(AF_INET, SOCK_STREAM, 0); popts("original", server_fd); // Set TCP_NODELAY so that writes won't be batched while ((opt = getopt(argc, argv, "sn:o:")) != -1) { switch (opt) { case 's': step = ! step; break; case 'n': nap = strtol(optarg, NULL, 0); break; case 'o': for (int i = 0; optarg[i]; i++) { switch (optarg[i]) { case 't': sopt(server_fd, TCP_NODELAY, 1); break; case 'p': sopt(server_fd, TCP_NOPUSH, 0); break; case 's': sopt(server_fd, SO_SNDBUF, 1); break; case 'l': sopt(server_fd, SO_SNDLOWAT, 1); break; default: exit(1); } } } } address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(8080); if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) == -1) { xerror("bind"); } popts("ready", server_fd); while (1) { if (listen(server_fd, 3) == -1) { xerror("listen"); } // Accept one client connection new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen); if (new_socket == -1) { xerror("accept"); } popts("accepted: ", new_socket); sopt(new_socket, SO_SNDBUF, gopt(server_fd, SO_SNDBUF)); sopt(new_socket, SO_SNDLOWAT, gopt(server_fd, SO_SNDLOWAT)); if (step) { stepsock(new_socket); } long tick[21]; tick[0] = rdtsc(); // Make N sequential writes to kernel buffer. for (int i = 0; i < N; i++) { char ch = 'a' + i; write(new_socket, &ch, 1); tick[i+1] = rdtsc(); // For some reason, doing some blocking work between the writes // The following work slows down the writes by a factor of 10. if (nap) { sleep(nap); } } for (int i = 1; i < N+1; i++) { printf("%ld\n", tick[i] - tick[i-1]); } printf("_\n"); // Print a prime to make sure the compiler doesn't optimize // away the computations. close(new_socket); } } 

clnt.c:

 #include  #include  #include  #include  #include  #include  #ifndef N #define N 20 #endif int nap = 0; int main(int argc, char const *argv[]) { int sock, valread; struct sockaddr_in address; int opt = 1; int addrlen = sizeof(address); // We'll be passing uint32's back and forth unsigned char recv_buffer[1024] = {0}; // Create socket for TCP server sock = socket(AF_INET, SOCK_STREAM, 0); // Set TCP_NODELAY so that writes won't be batched setsockopt(sock, SOL_SOCKET, TCP_NODELAY, &opt, sizeof(opt)); while ((opt = getopt(argc,argv,"n:")) != -1) { switch (opt) { case 'n': nap = strtol(optarg, NULL, 0); break; default: exit(1); } } opt = 1; address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(8080); // Accept one client connection if (connect(sock, (struct sockaddr *)&address, (socklen_t)addrlen) != 0) { perror("connect failed"); exit(1); } if (nap) { sleep(nap); } int loc[N+1]; int nloc, curloc; for (nloc = curloc = 0; curloc < N; nloc++) { int n = read(sock, recv_buffer + curloc, sizeof recv_buffer-curloc); if (n <= 0) { perror("read"); break; } curloc += n; loc[nloc] = curloc; } int last = 0; for (int i = 0; i < nloc; i++) { int t = loc[i] - last; printf("%*.*s ", t, t, recv_buffer + last); last = loc[i]; } printf("\n"); return 0; } 

rdtsc.s:

 .globl _rdtsc _rdtsc: rdtsc shl $32, %rdx or %rdx,%rax ret 

(不是一个答案,但需要比评论更多的空间……)

这听起来像Nagle的算法或其变体,控制TCP数据包实际发送的时间。

对于第一次写入,当“管道”中没有未经证实的数据时,它将立即发送,这需要一些时间。 对于之后不久的后续写入,管道中仍然会有未经证实的数据,因此可以在发送缓冲区中排队少量数据,这样会更快。

在传输中断后,当所有发送都有机会赶上时,管道将准备好再次发送。

您可以使用Wireshark之​​类的东西来确认这一点,以查看实际的TCP数据包 – 这将显示write()请求如何组合在一起。

公平地说,我希望TCP_NODELAY标志可以绕过这一点 – 正如你所说的那样,导致更均匀的时间传播。 如果您可以检查TCP数据包,那么它们是否值得查看是否显示PSH标志设置,以强制立即发送。

(不确定这是否有帮助,但我没有足够的声誉发表评论)

微观标记是棘手的,特别是对于操作系统调用 – 根据我的经验,在确定数字之前,必须考虑并过滤或测量很少的因素。

其中一些因素是:

  1. 缓存命中/未命中

  2. 多任务抢占

  3. 操作系统在API调用的某些时刻分配内存(内存分配很容易导致微秒的延迟)

  4. 延迟加载(某些API在connect调用fe期间可能不会做太多,直到实际数据进入)

  5. 此刻CPU的实际时钟速度(动态时钟缩放,一直发生)

  6. 最近在此核心或相邻核心上执行的命令(fe,重AVX512指令可能会将CPU切换到L2(许可证2)模式,这会减慢时钟速度以避免过热)。

  7. 使用虚拟化,可以在同一物理CPU上运行其他任何东西。

您可以尝试通过在一个循环中重复运行相同的命令来减轻因子1,2,6和7的影响。 但是,在您的情况下,这可能意味着您需要一次打开多个套接字并在一个周期内测量每个套接字的第一次写入。 这样,进入内核的缓存将在第一次调用时被预热,并且进一步调用将具有“更干净”的时间。 你可以平均一下。

为了帮助5,您可以尝试“预热”CPU时钟 – 在测试之前和测试循环内运行一个长的阻塞周期,但是在该循环中不要做任何花哨的事情以避免过热 – 最安全的是调用__asm("nop")在那个循环里面。

起初,我没有注意到你只发送1个字节,并认为这可能是由于TCP缓慢启动 。 但是你的第二次素数测试也不支持这个。 所以,这听起来更像是我的清单中的因素1,5或6。