使用线程的并行 TCP 连接

Parallel TCP connection using threads

我正在尝试构建一个使用线程打开并行 TCP 套接字的系统。 我的线程是使用消息队列 IPC 触发的,因此每次数据包到达消息队列时都会“唤醒”线程,打开与远程服务器的 TCP 连接并发送数据包。 我的问题是,在 Wireshark 中,我可以看到使用线程而不是一个连接发送文件所需的时间更短,但吞吐量没有改变。
我的问题是:

  1. 如何验证我的线程并行工作?
  2. 我该如何改进这段代码?, 3.How 我可以使用一个线程打开多个套接字吗?

我正在使用虚拟机 运行 多线程客户端。 IDE 我用的是Clion,语言是C。 我的代码:

#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<string.h>
#include <arpa/inet.h>
#include <unistd.h> // for close
#include<pthread.h>
#include <math.h>
#include<malloc.h>
#include<signal.h>
#include<stdbool.h>
#include<sys/types.h>
#include<linux/if_packet.h>
#include<netinet/in.h>
#include<netinet/if_ether.h>    // for ethernet header
#include<netinet/ip.h>      // for ip header
#include<netinet/udp.h>     // for udp header
#include<netinet/tcp.h>
#include <byteswap.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <assert.h>
#include <time.h>
#define QUEUE_NAME "/ServerDan_Queue"
#define QUEUE_PERM 0660
#define MAX_MESSAGES 10 //Max size = 10
#define MAX_MSG_SIZE 4105 //Max size = 8192B
#define MSG_BUFFER_SIZE MAX_MSG_SIZE+10
#define BSIZE 1024
#define Nbytes 4096
#define ElorServer_addr "192.168.1.54"
///params:
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct sockaddr_in server;
struct stat obj;
int sock;
int k, size, status;
int i = 0;

typedef struct frag
{
    int packet_number;
    int seq;
    uint8_t data[4096];
    bool lastfrag;
} fragma;

void * middlemanThread(void *arg)
{
    ///========================================///
    ///**** Waiting for message queue trigger!:///
    ///=======================================///
    long id = (long)arg;
    id+=1;
    mqd_t qd; //queue descriptor
    //open the queue for reading//
    qd= mq_open(QUEUE_NAME,O_RDONLY);
    assert(qd != -1);
    struct mq_attr attr;
    assert(mq_getattr(qd,&attr) != -1);
    uint8_t *income_buf = calloc(attr.mq_msgsize,1);
    uint8_t *cast_buf = calloc(attr.mq_msgsize,1);
    assert(income_buf);
    fragma frag;
    struct timespec timeout;
    clock_gettime(CLOCK_REALTIME,&timeout);
    timeout.tv_sec+=50;
    //bool closesoc =false;
    printf("Waiting for messages ..... \n\n");
    while(1){
        ///========================================///
        ///**** Open message queue fo receive:///
        ///=======================================///

        if((mq_timedreceive(qd,income_buf,attr.mq_msgsize,0,&timeout))<0){
            printf("Failed to receive message for 50 sec \n");
            //closesoc =true;
            pthread_exit(NULL);
        }
        else{
            cast_buf = income_buf;
            printf("Received successfully , your msg :\n");
            frag.packet_number = *cast_buf;
            cast_buf = (cast_buf + sizeof(int));
            frag.seq = *cast_buf;
            cast_buf = (cast_buf + sizeof(int));
            memccpy(frag.data,((fragma*)cast_buf)->data,0,Nbytes);
            cast_buf = cast_buf + Nbytes;
            frag.lastfrag = *cast_buf;
            uint8_t * data = frag.data;
        }
        pthread_mutex_lock(&lock);

        ///========================================///
        ///**** Connecting to Server and send Frament:///
        ///=======================================///

        int size = sizeof(( fragma *)income_buf)->packet_number + sizeof(( fragma *)income_buf)->seq + sizeof(( fragma *)income_buf)->data + sizeof(( fragma *)income_buf)->lastfrag;
        printf("In thread\n");
        int clientSocket;
        struct sockaddr_in serverAddr;
        socklen_t addr_size;

        // Create the socket.
        clientSocket = socket(PF_INET, SOCK_STREAM, 0);

        //Configure settings of the server address
        // Address family is Internet
        serverAddr.sin_family = AF_INET;

        //Set port number, using htons function
        serverAddr.sin_port = htons(8081);

        //Set IP address to localhost
        serverAddr.sin_addr.s_addr = inet_addr("192.168.14.149");
        memset(serverAddr.sin_zero, '[=10=]', sizeof serverAddr.sin_zero);

        //Connect the socket to the server using the address
        addr_size = sizeof serverAddr;
        connect(clientSocket, (struct sockaddr *) &serverAddr, addr_size);
        if(send(clientSocket , income_buf , size,0) < 0)
        {
            printf("Send failed\n");
        }
        printf("Trhead Id : %ld \n" , id);
        printf("Packet number : %d \n Seq = %d  \n lasfrag = %d\n\n",frag.packet_number,frag.seq,(int)frag.lastfrag);
        pthread_mutex_unlock(&lock);
        //if(closesoc)
        close(clientSocket);
        usleep(20000);
    }
}
int main(){
    int i = 0;
    pthread_t tid[5];

    while(i< 5)
    {
        if( pthread_create(&tid[i], NULL, middlemanThread, (void*)i) != 0 )
            printf("Failed to create thread\n");
        i++;
    }
    sleep(2);
    i = 0;
    while(i< 5)
    {
        pthread_join(tid[i++],NULL);
        printf("Thread ID : %d:\n",i);
    }
    return 0;
}

以下是对以下问题的部分回答:

3.Is the concept of parallel TCP suppose to increase the throughput?

有点。这真的取决于瓶颈是什么。

第一个可能的瓶颈是拥堵控制。 TCP 发送方对一次可以发送多少数据包有限制(在收到第一个数据包的 ACK 之前),称为拥塞 window。这个数字应该从小开始,然后随着时间的推移而增长。此外,如果一个数据包丢失,这个数字将减少一半,然后慢慢增长,直到发生下一个丢弃。但是,限制是针对每个 TCP 连接的,因此,如果您将数据分布在多个并行连接上,则总体拥塞 window(所有流量的所有 windows 之和)将增长得更快并下降到更小的数量。 (这是一个总结,详细的你需要了解拥塞控制是如何工作的,这是一个很大的话题)。无论您是否使用线程,这都应该发生。可以在一个线程中打开多个连接,达到同样的效果。

第二个可能的瓶颈是OS中的网络处理。据我所知,这是一个从 10Gb 连接开始的问题。也许是 1Gb,但可能不是。 TCP 处理发生在 OS 中,而不是在您的应用程序中。如果处理通过 OS 在处理器之间传播,您可能会获得更好的性能(应该有参数来启用它),并且由于缓存可能会获得更好的性能。

如果您从磁盘读取文件,磁盘 IO 也很可能成为瓶颈。在这种情况下,我认为在不同线程之间分散发送数据实际上没有帮助。

thus every time a packet arrive to the message queue a thread "wakes up" , open TCP connection with remote server and send the packet

如果您非常在意速度或效率,请不要这样做。使用 TCP 套接字可以做的最昂贵的事情就是初始连接。您正在进行 3 次握手只是为了发送一条消息!

然后,您在执行整个操作时持有一个全局互斥体 - 同样,这也是您程序中最慢的操作。

当前的设计有效地单线程,但采用了最复杂和最昂贵的方式。

I can see the the time it takes to send a file is smaller using threads instead of one connection , but the throughput does not change

我不知道你实际测量的是什么,而且你也不清楚你在测量什么。什么是文件?一个碎片?多个片段?与您的 MTU 相比有多大?您是否检查过片段实际上是以正确的顺序接收的,因为在我看来,唯一可能的并行性是可能中断的地方。

如何才能使单个文件具有更低的延迟和不变的吞吐量?

How can i verify my threads working parallely?

如果您在 wireshark 中看到多个具有不同源端口的 TCP 连接,并且它们的数据包交织在一起,则说明您具有有效的并行性。这不太可能,因为您使用全局互斥锁明确禁止它!

What is the best way to check the throughput in wireshark?

不要。使用 wireshark 检查数据包,使用服务器确定吞吐量。这才是结果真正重要的地方。

3.Is the concept of parallel TCP suppose to increase the throughput?

如果你不知道它的用途,为什么要实现所有这些复杂性?

单个线程(正确编码,没有虚假互斥抖动)很有可能会使您的网络饱和,所以:不会。拥有多个 I/O 线程通常是为了方便地划分您的逻辑和状态(即,每个线程有一个客户端,或不同线程中的不同不相关 I/O 子系统),而不是性能。


如果你想从消息队列中取出数据包并将它们发送到 TCP,高效的方法是:

  1. 只使用一个线程来执行此操作(您的程序可能有其他线程在执行其他操作 - 如果可能,请避免与它们同步)
  2. 打开到服务器的单个持久 TCP 连接,而不是 connect/close 每个片段
  3. 就是这样。它比您现有的要简单得多,而且性能会好得多。

您实际上可以让一个线程处理多个不同的连接,但我看不出这对您的情况有任何用处,所以请保持简单。