从 kafka 发送消息到 websocket

Send messages from kafka into websocket

我在 golang 中使用 gin-framework 创建了一个 web 服务。在这个项目中,我还使用了一些来自特定主题的 kafka 消息。我想要实现的是将我从主题中收到的消息倒入 websocket 中。所以通信只是一种方式,不止一个人可以连接到网络套接字并看到传入的消息。 我 tought 使用通道,所以在接收 kafka 消息的函数中我有这样的东西:

ch <- KafkaMessage

在 gin 框架中我创建了这样的东西:

requestRouterRPCv1.GET("wf-live", wsWorkFlowLive)

func wsWorkFlowLive(c *gin.Context) {
    ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
    if err != nil {
        log.Println("error get connection")
        log.Fatal(err)
    }
    defer ws.Close()

    err = ws.WriteJSON(<-ch)

    if err != nil {
        log.Println("error write message: " + err.Error())
    }
}

var upGrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

这里是我用来测试 websocket 的 html 片段:

 <!DOCTYPE html>
   <html lang="en">
     <head>
       <meta charset="UTF-8" />
       <title>index</title>
    </head>
     <body>
       <h1>test websocket</h1>
       <p id="message-json"></p>
      <p id="message-text"></p>
            <script>
        function jsonWS() {
          var ws = new WebSocket("ws://ws.acme.com/ws/v1/wf-live");

          ws.onmessage = function (evt) {
            console.log("Received Message: " + evt.data);
            document.getElementById("message-json").innerText += evt.data;
          };

          ws.onclose = function (evt) {
            console.log("Connection closed.");
          };
        }

        // Start websocket
        jsonWS();
      </script>
    </body>
  </html>

但是我错过了一些东西,我是一个新手,因为我在收到第一条kafka消息后有以下错误行为:

  1. 只显示第一条消息,之后连接很快关闭
  2. 要看到第二个我必须刷新页面,这不是websocket方式
  3. 因为连接关闭通道它不是红色的,所以它一直卡在cosume函数中直到我读到它。我不能有这种行为
  4. 为了避免第 3 点,我想我必须有一种机制,只有当一个或多个 ws 连接时,我才将消息发送到通道。

你很接近。只是缺少两件事。

1- 使用 select 语句继续从 kafka 频道接收新消息。

2- 保持活动的 websocket 连接。这个answer有更多细节

让我知道这是否适合您。

Gorilla chat example 接近您的需要。该示例将从任何客户端接收到的消息广播到所有连接的客户端。执行以下操作以使代码适应您的用例:

  • 更改 client read pump 以丢弃收到的消息而不是将消息发送到中心。

    func (c *Client) readPump() {
        defer func() {
            c.hub.unregister <- c
            c.conn.Close()
        }()
        c.conn.SetReadLimit(maxMessageSize)
        c.conn.SetReadDeadline(time.Now().Add(pongWait))
        c.conn.SetPongHandler(func(string) error { 
        c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
        for {
            if _, _, err := c.NextReader(); err != nil {
              break
            }
        }
    }
    
  • 更改您的 Kafka 读取循环以将消息发送到 Hub.broadcast 频道。

    for {
        msg, err := c.ReadMessage(xxx)
        if err != nil {
              // handle error
        }
        hub.broadcast <- msg
    }
    
  • 删除the code that coalesces messages in the client send queue to a single websocket message或调整客户端在单个websocket消息中处理多个Kafka消息。