用于套接字客户端通信的每个线程的状态机

State machine per thread for socket client communication

我正在开发用于处理多套接字客户端连接的线程 TCP 套接字服务器。客户端可以与服务器异步连接和断开连接,连接后,客户端应以预定义的自定义数据包协议格式发送一些数据。
该协议定义了帧开始 (SOP) 和帧结束 (EOP)。

我写了一个 C 代码,这样对于每个成功的客户端连接,都会创建一个线程,该线程以预定义的数据包格式继续从客户端接收字节,线程有一个线程本地状态机,因为每个客户端都可以异步连接,因此每个客户端的状态可能不同。
下面是从客户端接收数据并根据接收到的字节类型维护状态的线程:

static void *receive_handler(void *args) {

  struct thread_args *local_args = args;
  struct sockaddr_in6 *client_address = local_args->client_address;
  //struct itimerval timer_val;
  int32_t conn_fd = local_args->conn_fd;
  int32_t val_read = 0;
  int32_t resp_code = 0;
  uint32_t sendBuffLen = 0;
  int8_t buffer[BUFFER_SIZE] = { 0 };
  uint8_t RetBuff[1024] = { 0 };
  int8_t rx_addr_str[INET6_ADDRSTRLEN];
  int8_t byte = 0;
  int16_t idx = ePacketType;
  int16_t packet_len = 0;
  int16_t calculated_crc = 0, recv_crc = 0;
  uint16_t num_bytes = 0;

  memset(rx_addr_str, 0, INET6_ADDRSTRLEN);
  inet_ntop(AF_INET6, &(client_address->sin6_addr), rx_addr_str, INET6_ADDRSTRLEN);
  printf("\nRx Thread (%d) Created for %s\n", local_args->connection_no, rx_addr_str);

  int eState = eStart_Frame;

  memcpy(rx_Packet_Info[local_args->connection_no].inet6, rx_addr_str, INET6_ADDRSTRLEN);

  //timerclear(&timer_val.it_interval); /* zero interval means no reset of timer */
  //timerclear(&timer_val.it_value);
  //timer_val.it_value.tv_sec = 10; /* 10 second timeout */

  //(void) signal(SIGALRM, state_reset_handler);

  while (1) {

    if (eState != eChecksum_Verify) {
      val_read = -1;
      val_read = recv(conn_fd, &byte, sizeof(byte), 0);
      debug_printf(INFO, "Amount Read: %d Byte Rxd: 0x%x => 0x%X\n", val_read, (byte & 0xFF), byte);
      if (val_read <= 0) {
        if (parse_packet("ERR_DISCONNECT", rx_addr_str, local_args->connection_no) < 0) {
          debug_printf(ERR, "Error parsing packet: %s\n", strerror(errno));
        }
        debug_printf(ERR, "May be closed by client %s: %s\n", rx_addr_str, strerror(errno));
        debug_printf(ERR, "Exiting Rx Thread: ConnIdx: %d", num_connections);
        close(conn_fd);
        pthread_exit(NULL);
      }
    }

    switch (eState) {

      case eStart_Frame:
        debug_printf(DEBG, "Current State: %d\n", eState);
        if ((val_read > 0) && (byte & 0xFF) == SOP) {
          memset(buffer, 0, BUFFER_SIZE);
          val_read = -1;
          buffer[eSOP] = (byte & 0xFF);
          eState = eFrame_Len;
        }
        break;

      case eFrame_Len: {
        static char MSB_Rxd = 0;
        debug_printf(DEBG, "Current State: %d\n", eState);
        if (val_read > 0) {
          if (MSB_Rxd == 0) {
            buffer[ePacket_length] = byte;
            MSB_Rxd = 1;
          }
          else {
            buffer[ePacket_length + 1] = byte;
            eState = eFrame;
            num_bytes = 0;
            MSB_Rxd = 0;
            packet_len = (buffer[ePacket_length] & 0xFF << 8) | (buffer[ePacket_length + 1]);
            debug_printf(INFO, "Packet Length: %d : 0x%x 0x%x\n", packet_len,
                buffer[ePacket_length], buffer[ePacket_length + 1]);
          }
        }
      }
        break;

      case eFrame:
        debug_printf(DEBG, "Current State: %d\n", eState);
        num_bytes++;
        buffer[idx] = byte;
        if (num_bytes == packet_len) {
          eState = eEnd_Frame;
          debug_printf(DEBG, "Num bytes: 0x%x\n", num_bytes);
        }
        else {
          debug_printf(ERR, "Num bytes: 0x%x Pkt Len: 0x%x\n", num_bytes, packet_len);
        }
        idx++;
        break;

      case eEnd_Frame:
        debug_printf(ERR, "Current State: %d val read %d\n", eState, val_read);
        if ((val_read > 0) && (byte & 0xFF) == EOP) {
          val_read = -1;
          eState = eChecksum_Verify;
        }
        break;

      case eChecksum_Verify: {

        calculated_crc = crc_16(&buffer[ePacket_length], (num_bytes));
        recv_crc = buffer[num_bytes + 1] << 8 | (buffer[num_bytes + 2] & 0xFF);

        if (calculated_crc != recv_crc) {
          debug_printf(ERR, "CRC Error! CRC do not match!!\n");
          debug_printf(ERR, "Calculated CRC: 0x%X\nCRC Rxd: 0x%X\n", calculated_crc, recv_crc);
          resp_code = CRC_ERR;
          send(conn_fd, &resp_code, sizeof(resp_code), 0);
        }
        else {
          if (rx_Packet_Info[local_args->connection_no].packetUUID != NULL) {
            free(rx_Packet_Info[local_args->connection_no].packetUUID);
            rx_Packet_Info[local_args->connection_no].packetUUID = NULL;
          }

          rx_Packet_Info[local_args->connection_no].packetUUID = calloc(buffer[ePacketUUIDLen],
              sizeof(uint8_t));
          memcpy(rx_Packet_Info[local_args->connection_no].packetUUID, &buffer[ePacketUUID],
              buffer[ePacketUUIDLen]);
          rx_Packet_Info[local_args->connection_no].packetUUIDlength = buffer[ePacketUUIDLen];

          printf("\nRX-Thread-UUID %d: ConnNo: %d\n", buffer[ePacketUUIDLen],
              local_args->connection_no);
          for (char i = 0; i < buffer[ePacketUUIDLen]; i++) {
            printf("0x%x ", rx_Packet_Info[local_args->connection_no].packetUUID[i]);
          }
          printf("\n");
          if (parse_packet(buffer, rx_addr_str, local_args->connection_no) < 0) {
            debug_printf(ERR, "Error parsing packet: %s\n", strerror(errno));
          }
        }
        num_bytes = 0;
        eState = eStart_Frame;
        idx = ePacketType;
      }
        break;

      default:
        debug_printf(DEBG, "Invalid State!! Should not come here.\n");
        num_bytes = 0;
        eState = eStart_Frame;
        idx = ePacketType;
        break;
    }
  }

  return NULL;
}

我的问题是,如果客户端在接收到帧开始后卡住并且无法发送帧长度或完整帧直到帧结束,我应该如何重置此状态机?

我想到的一种方法是实现定时器回调,但我不确定我应该如何跟踪多线程的状态机。 任何人都可以建议我在这种情况下应该做什么,或者我做错了什么吗?

如果我正确地解析了问题,那么您是在询问如何优雅地处理连接客户端未及时发送数据的情况——即它已发送消息的第一部分,但是(由于网络问题或客户端错误或其他原因)永远不会发送其余部分,使您的服务器端 I/O 线程在 recv() 调用中阻塞 long/indefinite 时间.

如果是这样,首先要问的问题是:这真的是个问题吗?如果每个连接都有自己的线程,那么阻塞一个特定的 thread/connection 应该不会对其他线程造成任何问题,因为它们都彼此独立执行。所以也许你可以完全忽略这个问题?

然而,更可能的答案是忽略这个问题还不够好,因为随后会出现一些不容易忽略的问题:(a) 如果客户端连接过多怎么办"freeze up" 同时?一两个停顿的 TCP connections/threads 没什么大不了的,但如果同样的问题不断发生,最终你会 运行 资源不足来产生更多的线程或 TCP 连接,然后你的服务器可以不再起作用。 (b) 如果服务器进程现在想退出怎么办? (即因为服务器的用户给它发送了一个SIGINT中断或类似的)如果一个或多个线程无限期阻塞,那么服务器不可能及时可控地退出,因为主线程需要等待所有 TCP 线程在它可以清理其进程范围的资源之前首先退出,并且任何阻塞的线程都不会退出很长时间,如果有的话。

因此,假设问题 确实 需要解决,我发现最可靠的解决方法是永远不要阻塞 recv()(或send()) 首先。相反,请确保将每个套接字置于非阻塞模式,并且只在 select() 调用中让线程的 while 循环阻塞。这样做会使您的状态机更加复杂(因为它现在必须处理部分发送和部分接收),但补偿的好处是线程现在可以更好地控制其自身的阻塞行为。特别是,您可以告诉 select() 在一定时间后总是 return,无论如何,并且(更好的是)您可以随时告诉 select() 到 return许多套接字上都有准备好读取的字节。这意味着如果您的主线程想要退出,它可以使用 pipe()socketpair() 向每个 TCP 线程发送一个虚拟字节,并​​且 TCP 线程(可能在 select(),等待来自其客户端或来自 pipe/socketpair 套接字的数据)将立即从 select() return,看到主线程已向其发送一个字节,并通过立即退出做出响应.

这应该足够了——根据我的经验,如果可以避免,最好不要强加固定超时,因为很难预测在所有情况下网络性能和任何经验法则你可能想出的 (like "a client that doesn't send the whole message in 5 seconds must be broken") 很可能是错误的,如果你试图执行该规则,你最终会遇到误报问题。最好只让每个客户端花费 wants/needs 的时间,同时还具有一种机制,主线程可以通过该机制请求特定的客户端线程立即退出 if/when 这变得必要(例如在服务器期间 -进程关闭,或者如果有太多的 TCP 线程处于活动状态并且你想 p运行e 中的一些 old/inactive 在产生更多线程之前)