使用线程的并行 TCP 连接
Parallel TCP connection using threads
我正在尝试构建一个使用线程打开并行 TCP 套接字的系统。
我的线程是使用消息队列 IPC 触发的,因此每次数据包到达消息队列时都会“唤醒”线程,打开与远程服务器的 TCP 连接并发送数据包。
我的问题是,在 Wireshark 中,我可以看到使用线程而不是一个连接发送文件所需的时间更短,但吞吐量没有改变。
我的问题是:
- 如何验证我的线程并行工作?
- 我该如何改进这段代码?,
3.How 我可以使用一个线程打开多个套接字吗?
我正在使用虚拟机 运行 多线程客户端。
IDE 我用的是Clion,语言是C。
我的代码:
#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<string.h>
#include <arpa/inet.h>
#include <unistd.h> // for close
#include<pthread.h>
#include <math.h>
#include<malloc.h>
#include<signal.h>
#include<stdbool.h>
#include<sys/types.h>
#include<linux/if_packet.h>
#include<netinet/in.h>
#include<netinet/if_ether.h> // for ethernet header
#include<netinet/ip.h> // for ip header
#include<netinet/udp.h> // for udp header
#include<netinet/tcp.h>
#include <byteswap.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <assert.h>
#include <time.h>
#define QUEUE_NAME "/ServerDan_Queue"
#define QUEUE_PERM 0660
#define MAX_MESSAGES 10 //Max size = 10
#define MAX_MSG_SIZE 4105 //Max size = 8192B
#define MSG_BUFFER_SIZE MAX_MSG_SIZE+10
#define BSIZE 1024
#define Nbytes 4096
#define ElorServer_addr "192.168.1.54"
///params:
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct sockaddr_in server;
struct stat obj;
int sock;
int k, size, status;
int i = 0;
typedef struct frag
{
int packet_number;
int seq;
uint8_t data[4096];
bool lastfrag;
} fragma;
void * middlemanThread(void *arg)
{
///========================================///
///**** Waiting for message queue trigger!:///
///=======================================///
long id = (long)arg;
id+=1;
mqd_t qd; //queue descriptor
//open the queue for reading//
qd= mq_open(QUEUE_NAME,O_RDONLY);
assert(qd != -1);
struct mq_attr attr;
assert(mq_getattr(qd,&attr) != -1);
uint8_t *income_buf = calloc(attr.mq_msgsize,1);
uint8_t *cast_buf = calloc(attr.mq_msgsize,1);
assert(income_buf);
fragma frag;
struct timespec timeout;
clock_gettime(CLOCK_REALTIME,&timeout);
timeout.tv_sec+=50;
//bool closesoc =false;
printf("Waiting for messages ..... \n\n");
while(1){
///========================================///
///**** Open message queue fo receive:///
///=======================================///
if((mq_timedreceive(qd,income_buf,attr.mq_msgsize,0,&timeout))<0){
printf("Failed to receive message for 50 sec \n");
//closesoc =true;
pthread_exit(NULL);
}
else{
cast_buf = income_buf;
printf("Received successfully , your msg :\n");
frag.packet_number = *cast_buf;
cast_buf = (cast_buf + sizeof(int));
frag.seq = *cast_buf;
cast_buf = (cast_buf + sizeof(int));
memccpy(frag.data,((fragma*)cast_buf)->data,0,Nbytes);
cast_buf = cast_buf + Nbytes;
frag.lastfrag = *cast_buf;
uint8_t * data = frag.data;
}
pthread_mutex_lock(&lock);
///========================================///
///**** Connecting to Server and send Frament:///
///=======================================///
int size = sizeof(( fragma *)income_buf)->packet_number + sizeof(( fragma *)income_buf)->seq + sizeof(( fragma *)income_buf)->data + sizeof(( fragma *)income_buf)->lastfrag;
printf("In thread\n");
int clientSocket;
struct sockaddr_in serverAddr;
socklen_t addr_size;
// Create the socket.
clientSocket = socket(PF_INET, SOCK_STREAM, 0);
//Configure settings of the server address
// Address family is Internet
serverAddr.sin_family = AF_INET;
//Set port number, using htons function
serverAddr.sin_port = htons(8081);
//Set IP address to localhost
serverAddr.sin_addr.s_addr = inet_addr("192.168.14.149");
memset(serverAddr.sin_zero, '[=10=]', sizeof serverAddr.sin_zero);
//Connect the socket to the server using the address
addr_size = sizeof serverAddr;
connect(clientSocket, (struct sockaddr *) &serverAddr, addr_size);
if(send(clientSocket , income_buf , size,0) < 0)
{
printf("Send failed\n");
}
printf("Trhead Id : %ld \n" , id);
printf("Packet number : %d \n Seq = %d \n lasfrag = %d\n\n",frag.packet_number,frag.seq,(int)frag.lastfrag);
pthread_mutex_unlock(&lock);
//if(closesoc)
close(clientSocket);
usleep(20000);
}
}
int main(){
int i = 0;
pthread_t tid[5];
while(i< 5)
{
if( pthread_create(&tid[i], NULL, middlemanThread, (void*)i) != 0 )
printf("Failed to create thread\n");
i++;
}
sleep(2);
i = 0;
while(i< 5)
{
pthread_join(tid[i++],NULL);
printf("Thread ID : %d:\n",i);
}
return 0;
}
以下是对以下问题的部分回答:
3.Is the concept of parallel TCP suppose to increase the throughput?
有点。这真的取决于瓶颈是什么。
第一个可能的瓶颈是拥堵控制。 TCP 发送方对一次可以发送多少数据包有限制(在收到第一个数据包的 ACK 之前),称为拥塞 window。这个数字应该从小开始,然后随着时间的推移而增长。此外,如果一个数据包丢失,这个数字将减少一半,然后慢慢增长,直到发生下一个丢弃。但是,限制是针对每个 TCP 连接的,因此,如果您将数据分布在多个并行连接上,则总体拥塞 window(所有流量的所有 windows 之和)将增长得更快并下降到更小的数量。 (这是一个总结,详细的你需要了解拥塞控制是如何工作的,这是一个很大的话题)。无论您是否使用线程,这都应该发生。可以在一个线程中打开多个连接,达到同样的效果。
第二个可能的瓶颈是OS中的网络处理。据我所知,这是一个从 10Gb 连接开始的问题。也许是 1Gb,但可能不是。 TCP 处理发生在 OS 中,而不是在您的应用程序中。如果处理通过 OS 在处理器之间传播,您可能会获得更好的性能(应该有参数来启用它),并且由于缓存可能会获得更好的性能。
如果您从磁盘读取文件,磁盘 IO 也很可能成为瓶颈。在这种情况下,我认为在不同线程之间分散发送数据实际上没有帮助。
thus every time a packet arrive to the message queue a thread "wakes up" , open TCP connection with remote server and send the packet
如果您非常在意速度或效率,请不要这样做。使用 TCP 套接字可以做的最昂贵的事情就是初始连接。您正在进行 3 次握手只是为了发送一条消息!
然后,您在执行整个操作时持有一个全局互斥体 - 同样,这也是您程序中最慢的操作。
当前的设计有效地单线程,但采用了最复杂和最昂贵的方式。
I can see the the time it takes to send a file is smaller using threads instead of one connection , but the throughput does not change
我不知道你实际测量的是什么,而且你也不清楚你在测量什么。什么是文件?一个碎片?多个片段?与您的 MTU 相比有多大?您是否检查过片段实际上是以正确的顺序接收的,因为在我看来,唯一可能的并行性是可能中断的地方。
如何才能使单个文件具有更低的延迟和不变的吞吐量?
How can i verify my threads working parallely?
如果您在 wireshark 中看到多个具有不同源端口的 TCP 连接,并且它们的数据包交织在一起,则说明您具有有效的并行性。这不太可能,因为您使用全局互斥锁明确禁止它!
What is the best way to check the throughput in wireshark?
不要。使用 wireshark 检查数据包,使用服务器确定吞吐量。这才是结果真正重要的地方。
3.Is the concept of parallel TCP suppose to increase the throughput?
如果你不知道它的用途,为什么要实现所有这些复杂性?
单个线程(正确编码,没有虚假互斥抖动)很有可能会使您的网络饱和,所以:不会。拥有多个 I/O 线程通常是为了方便地划分您的逻辑和状态(即,每个线程有一个客户端,或不同线程中的不同不相关 I/O 子系统),而不是性能。
如果你想从消息队列中取出数据包并将它们发送到 TCP,高效的方法是:
- 只使用一个线程来执行此操作(您的程序可能有其他线程在执行其他操作 - 如果可能,请避免与它们同步)
- 打开到服务器的单个持久 TCP 连接,而不是 connect/close 每个片段
- 就是这样。它比您现有的要简单得多,而且性能会好得多。
您实际上可以让一个线程处理多个不同的连接,但我看不出这对您的情况有任何用处,所以请保持简单。
我正在尝试构建一个使用线程打开并行 TCP 套接字的系统。
我的线程是使用消息队列 IPC 触发的,因此每次数据包到达消息队列时都会“唤醒”线程,打开与远程服务器的 TCP 连接并发送数据包。
我的问题是,在 Wireshark 中,我可以看到使用线程而不是一个连接发送文件所需的时间更短,但吞吐量没有改变。
我的问题是:
- 如何验证我的线程并行工作?
- 我该如何改进这段代码?, 3.How 我可以使用一个线程打开多个套接字吗?
我正在使用虚拟机 运行 多线程客户端。 IDE 我用的是Clion,语言是C。 我的代码:
#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<string.h>
#include <arpa/inet.h>
#include <unistd.h> // for close
#include<pthread.h>
#include <math.h>
#include<malloc.h>
#include<signal.h>
#include<stdbool.h>
#include<sys/types.h>
#include<linux/if_packet.h>
#include<netinet/in.h>
#include<netinet/if_ether.h> // for ethernet header
#include<netinet/ip.h> // for ip header
#include<netinet/udp.h> // for udp header
#include<netinet/tcp.h>
#include <byteswap.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <assert.h>
#include <time.h>
#define QUEUE_NAME "/ServerDan_Queue"
#define QUEUE_PERM 0660
#define MAX_MESSAGES 10 //Max size = 10
#define MAX_MSG_SIZE 4105 //Max size = 8192B
#define MSG_BUFFER_SIZE MAX_MSG_SIZE+10
#define BSIZE 1024
#define Nbytes 4096
#define ElorServer_addr "192.168.1.54"
///params:
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct sockaddr_in server;
struct stat obj;
int sock;
int k, size, status;
int i = 0;
typedef struct frag
{
int packet_number;
int seq;
uint8_t data[4096];
bool lastfrag;
} fragma;
void * middlemanThread(void *arg)
{
///========================================///
///**** Waiting for message queue trigger!:///
///=======================================///
long id = (long)arg;
id+=1;
mqd_t qd; //queue descriptor
//open the queue for reading//
qd= mq_open(QUEUE_NAME,O_RDONLY);
assert(qd != -1);
struct mq_attr attr;
assert(mq_getattr(qd,&attr) != -1);
uint8_t *income_buf = calloc(attr.mq_msgsize,1);
uint8_t *cast_buf = calloc(attr.mq_msgsize,1);
assert(income_buf);
fragma frag;
struct timespec timeout;
clock_gettime(CLOCK_REALTIME,&timeout);
timeout.tv_sec+=50;
//bool closesoc =false;
printf("Waiting for messages ..... \n\n");
while(1){
///========================================///
///**** Open message queue fo receive:///
///=======================================///
if((mq_timedreceive(qd,income_buf,attr.mq_msgsize,0,&timeout))<0){
printf("Failed to receive message for 50 sec \n");
//closesoc =true;
pthread_exit(NULL);
}
else{
cast_buf = income_buf;
printf("Received successfully , your msg :\n");
frag.packet_number = *cast_buf;
cast_buf = (cast_buf + sizeof(int));
frag.seq = *cast_buf;
cast_buf = (cast_buf + sizeof(int));
memccpy(frag.data,((fragma*)cast_buf)->data,0,Nbytes);
cast_buf = cast_buf + Nbytes;
frag.lastfrag = *cast_buf;
uint8_t * data = frag.data;
}
pthread_mutex_lock(&lock);
///========================================///
///**** Connecting to Server and send Frament:///
///=======================================///
int size = sizeof(( fragma *)income_buf)->packet_number + sizeof(( fragma *)income_buf)->seq + sizeof(( fragma *)income_buf)->data + sizeof(( fragma *)income_buf)->lastfrag;
printf("In thread\n");
int clientSocket;
struct sockaddr_in serverAddr;
socklen_t addr_size;
// Create the socket.
clientSocket = socket(PF_INET, SOCK_STREAM, 0);
//Configure settings of the server address
// Address family is Internet
serverAddr.sin_family = AF_INET;
//Set port number, using htons function
serverAddr.sin_port = htons(8081);
//Set IP address to localhost
serverAddr.sin_addr.s_addr = inet_addr("192.168.14.149");
memset(serverAddr.sin_zero, '[=10=]', sizeof serverAddr.sin_zero);
//Connect the socket to the server using the address
addr_size = sizeof serverAddr;
connect(clientSocket, (struct sockaddr *) &serverAddr, addr_size);
if(send(clientSocket , income_buf , size,0) < 0)
{
printf("Send failed\n");
}
printf("Trhead Id : %ld \n" , id);
printf("Packet number : %d \n Seq = %d \n lasfrag = %d\n\n",frag.packet_number,frag.seq,(int)frag.lastfrag);
pthread_mutex_unlock(&lock);
//if(closesoc)
close(clientSocket);
usleep(20000);
}
}
int main(){
int i = 0;
pthread_t tid[5];
while(i< 5)
{
if( pthread_create(&tid[i], NULL, middlemanThread, (void*)i) != 0 )
printf("Failed to create thread\n");
i++;
}
sleep(2);
i = 0;
while(i< 5)
{
pthread_join(tid[i++],NULL);
printf("Thread ID : %d:\n",i);
}
return 0;
}
以下是对以下问题的部分回答:
3.Is the concept of parallel TCP suppose to increase the throughput?
有点。这真的取决于瓶颈是什么。
第一个可能的瓶颈是拥堵控制。 TCP 发送方对一次可以发送多少数据包有限制(在收到第一个数据包的 ACK 之前),称为拥塞 window。这个数字应该从小开始,然后随着时间的推移而增长。此外,如果一个数据包丢失,这个数字将减少一半,然后慢慢增长,直到发生下一个丢弃。但是,限制是针对每个 TCP 连接的,因此,如果您将数据分布在多个并行连接上,则总体拥塞 window(所有流量的所有 windows 之和)将增长得更快并下降到更小的数量。 (这是一个总结,详细的你需要了解拥塞控制是如何工作的,这是一个很大的话题)。无论您是否使用线程,这都应该发生。可以在一个线程中打开多个连接,达到同样的效果。
第二个可能的瓶颈是OS中的网络处理。据我所知,这是一个从 10Gb 连接开始的问题。也许是 1Gb,但可能不是。 TCP 处理发生在 OS 中,而不是在您的应用程序中。如果处理通过 OS 在处理器之间传播,您可能会获得更好的性能(应该有参数来启用它),并且由于缓存可能会获得更好的性能。
如果您从磁盘读取文件,磁盘 IO 也很可能成为瓶颈。在这种情况下,我认为在不同线程之间分散发送数据实际上没有帮助。
thus every time a packet arrive to the message queue a thread "wakes up" , open TCP connection with remote server and send the packet
如果您非常在意速度或效率,请不要这样做。使用 TCP 套接字可以做的最昂贵的事情就是初始连接。您正在进行 3 次握手只是为了发送一条消息!
然后,您在执行整个操作时持有一个全局互斥体 - 同样,这也是您程序中最慢的操作。
当前的设计有效地单线程,但采用了最复杂和最昂贵的方式。
I can see the the time it takes to send a file is smaller using threads instead of one connection , but the throughput does not change
我不知道你实际测量的是什么,而且你也不清楚你在测量什么。什么是文件?一个碎片?多个片段?与您的 MTU 相比有多大?您是否检查过片段实际上是以正确的顺序接收的,因为在我看来,唯一可能的并行性是可能中断的地方。
如何才能使单个文件具有更低的延迟和不变的吞吐量?
How can i verify my threads working parallely?
如果您在 wireshark 中看到多个具有不同源端口的 TCP 连接,并且它们的数据包交织在一起,则说明您具有有效的并行性。这不太可能,因为您使用全局互斥锁明确禁止它!
What is the best way to check the throughput in wireshark?
不要。使用 wireshark 检查数据包,使用服务器确定吞吐量。这才是结果真正重要的地方。
3.Is the concept of parallel TCP suppose to increase the throughput?
如果你不知道它的用途,为什么要实现所有这些复杂性?
单个线程(正确编码,没有虚假互斥抖动)很有可能会使您的网络饱和,所以:不会。拥有多个 I/O 线程通常是为了方便地划分您的逻辑和状态(即,每个线程有一个客户端,或不同线程中的不同不相关 I/O 子系统),而不是性能。
如果你想从消息队列中取出数据包并将它们发送到 TCP,高效的方法是:
- 只使用一个线程来执行此操作(您的程序可能有其他线程在执行其他操作 - 如果可能,请避免与它们同步)
- 打开到服务器的单个持久 TCP 连接,而不是 connect/close 每个片段
- 就是这样。它比您现有的要简单得多,而且性能会好得多。
您实际上可以让一个线程处理多个不同的连接,但我看不出这对您的情况有任何用处,所以请保持简单。