在并发 TCP 请求中收到大量数据包时如何识别完整响应

How can I identify a complete response when receive lot of packets in concurrent TCP requests

我有一个到服务器的单一 TCP 连接,但可能同时有多个请求。大多数时候,响应会非常大,以至于我会不断收到大量数据块。我可以检查数据长度以确定它是 END OF STREAM。但是对于多个请求,有时数据包会与其他请求“混合”在一起,从而导致大量失败。

例如,

对于正常请求:

现实生活中:

目前,我唯一的想法是通过一个接一个地处理请求来使其串行化。但是它不能完全解决我的问题,因为有时我订阅的事件会不受控制地进入。

一般如何解决这个 TCP 问题?

我在下面展示了一些代码(以防有人知道 erlang 和 elixir)

# Create TCP connection
{:ok, socket} =
      :gen_tcp.connect(host_charlist, port, [:binary, active: true, keepalive: true])

# Send request
def handle_call({:send, msg}, from, state) do
  :ok = :gen_tcp.send(state.socket, msg)
  new_state = %{state | from: from, msg: ""}

  {:noreply, new_state}
end

# When it receive packet
def handle_info({:tcp, _socket, msg}, state) do
  new_state = %{state | msg: state.msg <> msg}
  current_msg_size = byte_size(new_state.msg)
  defined_msg_size = Response.get_body_size(new_state.msg) # this msg size can read from the first packet's header

  cond do
    current_msg_size == defined_msg_size ->
      GenServer.reply(new_state.from, {:ok, new_state.msg})
      {:noreply, %{new_state | msg: ""}}

    current_msg_size > defined_msg_size ->
      GenServer.reply(new_state.from, {:error, "Message size exceed."})
      {:noreply, %{new_state | msg: ""}}

    true ->
      {:noreply, new_state}
  end
end

在 TCP 级别,在一个连接中,requestresponse 不存在,它是一个单管,按顺序从一侧传输字节到另一侧。

为了处理单个连接上的交错,您必须在堆栈上一级处理它。

可能的解决方案包括:

  1. 序列化
  2. 成帧:你对响应进行成帧并以某种方式保证帧被完全发送而不会在服务器中交错,然后你的接收者可以检查每个帧(可能跨越多个 receives)并将其分配给相应的请求.
  3. 每个请求一个连接:让 OS 每次以套接字和握手为代价处理网络上的交错

您可以将 tcp 选项从 active = true 更改为 active = falseactive = once

:gen_tcp.connect(host_charlist, port, [:binary, active: true, keepalive: true])

然后你可以自己控制消息的接收,而不是被传入的消息淹没。

1、接收上层调用,处理客户端请求,设置active = false,等待服务器响应

2、尽量不要只接收相关消息,在进程的消息队列中忽略这些消息或将它们保存在缓冲区中。

3、收到服务器的响应,处理,然后设置active = once.

4、超时设置active = once.

https://www.erlang.org/doc/man/gen_tcp.html#controlling_process-2

controlling_process(Socket, Pid) -> ok | {error, Reason} Types Socket = socket() Pid = pid() Reason = closed | not_owner | badarg | inet:posix() Assigns a new controlling process Pid to Socket. The controlling process is the process that receives messages from the socket. If called by any other process than the current controlling process, {error, not_owner} is returned. If the process identified by Pid is not an existing local pid, {error, badarg} is returned. {error, badarg} may also be returned in some cases when Socket is closed during the execution of this function.

If the socket is set in active mode, this function will transfer any messages in the mailbox of the caller to the new controlling process. If any other process is interacting with the socket while the transfer is happening, the transfer may not work correctly and messages may remain in the caller's mailbox. For instance changing the sockets active mode before the transfer is complete may cause this.