Websockets 消息仅在最后发送,而不是在使用 async / await 的实例中发送,在嵌套 for 循环中产生

Websockets messages only sent at the end and not in instances using async / await, yield in nested for loops

我有一个计算量很大的过程,需要几分钟才能在服务器中完成。所以我想通过websockets将每次迭代的结果发送给客户端。

整个应用程序工作正常,但我的问题是在整个模拟完成后,所有消息都以一大块的形式到达客户端。我一定在这里遗漏了一些东西,因为我希望 await websocket.send_json() 在此过程中发送消息,而不是最后发送所有消息。

服务器python (FastAPI)

# A very simplified abstraction of the actual app.

def simulate_intervals(data):
  for t in range(data.n_intervals):
    state = interval(data) # returns a JAX NumPy array
    yield state

def simulate(data):
  for key in range(data.n_trials):
     trial = simulate_intervals(data)
     yield trial

@app.websocket("/ws")
async def socket(websocket: WebSocket):

  await websocket.accept()
  while True:
    # Get model inputs from client
    data = await websocket.receive_text()
    # Minimal computation
    nodes = distributions(data)

    nodosJson = json.dumps(nodes, cls=NumpyEncoder)
    # I expect this message to be sent early on,
    # but the client gets it at the end with all the other messages. 
    await websocket.send_json({"tipo": "nodos", "datos": json.loads(nodosJson)})
    
    # Heavy computation
    trials = simulate(data)

    for trialI, trial in enumerate(trials):
      for stateI, state in enumerate(trial):
        stateString = json.dumps(state, cls=NumpyEncoder)

        await websocket.send_json(
          {
            "tipo": "estado",
            "datos": json.loads(stateString),
            "trialI": trialI,
            "stateI": stateI,
          }
        )

    await websocket.send_json({"tipo": "estado", "msg": "fin"})

为完整起见,这里是基本的客户端代码。

客户

const ws = new WebSocket('ws://localhost:8000/ws');

ws.onopen = () => {
  console.log('Conexión exitosa');
};

ws.onmessage = (e) => {
  const mensaje = JSON.parse(e.data);
  console.log(mensaje);
};

botonEnviarDatos.onclick = () => {
   ws.send(JSON.stringify({...}));
}

我无法使它像我的问题中所发布的那样工作,仍然有兴趣听取任何理解为什么在不被阻止的情况下发送多个异步消息是不可能的人的意见。

对于任何感兴趣的人,这是我目前的解决方案:

来自客户端和服务器的乒乓消息

我更改了逻辑,因此服务器和客户端不断地相互发送消息,而不是尝试在来自客户端的单个请求中流式传输数据。

这实际上比我最初的尝试效果更好,因为我可以检测到套接字何时断开连接并停止服务器中的处理。基本上,如果客户端断开连接,则不会从该客户端发送新的数据请求,服务器也不会继续进行繁重的计算。

服务器

# A very simplified abstraction of the actual app.

def simulate_intervals(data):
  for t in range(data.n_intervals):
    state = interval(data) # returns a JAX NumPy array
    yield state

def simulate(data):
  for key in range(data.n_trials):
     trial = simulate_intervals(data)
     yield trial

@app.websocket("/ws")
async def socket(websocket: WebSocket):

  await websocket.accept()
  while True:
    # Get messages from client
    data = await websocket.receive_text()
    
    # "tipo" is basically the type of data being sent from client or server to the other one.
    # In this case, "tipo": "inicio" is the client sending inputs and requesting for a certain data in response.
    if data["tipo"] == "inicio":
      # Minimal computation
      nodes = distributions(data)

      nodosJson = json.dumps(nodes, cls=NumpyEncoder)
      # In this first interaction, the client gets the first message without delay. 
      await websocket.send_json({"tipo": "nodos", "datos": json.loads(nodosJson)})

      # Since this is a generator (def returns yield) it does not actually
      # trigger that actual computationally heavy process. 
      trials = simulate(data)
      
      # define some initial variables to count the iterations
      trialI = 0
      stateI = 0
      trialsLen = args.number_trials
      statesLen = 600
      
      # load the first trial (also a generator)
      # without the for loop used before, the counters and next()
      # allow us to do the same as being done before in the for loop
      trial = next(trials)

      # With the use of generators and next() it is possible to keep
      # this first message light on the server and send the first response
      # as quickly as possible.
    
    # This type of message asks for the next instance of the simluation
    # without processing the entire model.
    elif data["tipo"] == "sim":
      # check if we are within the limits (before this was a nested for loop)
      if trialI < trialsLen and stateI < statesLen:
        # Trigger the next instance of the simulation
        state = next(trial)
        # update counter
        stateI = stateI + 1
        
        # Send the message with 1 instance of the simulation.
        # 
        stateString = json.dumps(state, cls=NumpyEncoder)
        await websocket.send_json(
          {
             "tipo": "estado",
             "datos": json.loads(stateString),
             "trialI": trialI,
             "stateI": stateI,
          }
        )
        
        # Check if the second loop is done
        if stateI == statesLen:
          # update counter of first loop
          trialI = trialI + 1
          # update counter of second loop
          stateI = 0
          
          # Check if there are more pending trials,
          # otherwise stop and notify the client we are done.
          try:
            trial = next(trials)
          except StopIteration:
            await websocket.send_json({"tipo": "fin"})

客户

只是实际改变的部分:

ws.onmessage = (e) => {
  const mensaje = JSON.parse(e.data);
  
  // Simply check the type of incoming message so it can be processed
  if (mensaje.tipo === 'fin') {
    viz.calcularResultados();
  } else if (mensaje.tipo === 'nodos') {
    viz.pintarNodos(mensaje.datos);
  } else if (mensaje.tipo === 'estado') {
    viz.sumarEstado(mensaje.datos);
  }

  // After receiving a message, ping the server for the next one 
  ws.send(
    JSON.stringify({
      tipo: 'sim',
    })
  );
};

这似乎是保持服务器和客户端协同工作的合理解决方案。我能够在客户端显示长时间模拟的进度,用户体验比等待服务器响应很长时间要好得多。希望对其他有类似问题的人有所帮助。