从 kafka 发送消息到 websocket
Send messages from kafka into websocket
我在 golang 中使用 gin-framework 创建了一个 web 服务。在这个项目中,我还使用了一些来自特定主题的 kafka 消息。我想要实现的是将我从主题中收到的消息倒入 websocket 中。所以通信只是一种方式,不止一个人可以连接到网络套接字并看到传入的消息。
我 tought 使用通道,所以在接收 kafka 消息的函数中我有这样的东西:
ch <- KafkaMessage
在 gin 框架中我创建了这样的东西:
requestRouterRPCv1.GET("wf-live", wsWorkFlowLive)
func wsWorkFlowLive(c *gin.Context) {
ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Println("error get connection")
log.Fatal(err)
}
defer ws.Close()
err = ws.WriteJSON(<-ch)
if err != nil {
log.Println("error write message: " + err.Error())
}
}
var upGrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
这里是我用来测试 websocket 的 html 片段:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>index</title>
</head>
<body>
<h1>test websocket</h1>
<p id="message-json"></p>
<p id="message-text"></p>
<script>
function jsonWS() {
var ws = new WebSocket("ws://ws.acme.com/ws/v1/wf-live");
ws.onmessage = function (evt) {
console.log("Received Message: " + evt.data);
document.getElementById("message-json").innerText += evt.data;
};
ws.onclose = function (evt) {
console.log("Connection closed.");
};
}
// Start websocket
jsonWS();
</script>
</body>
</html>
但是我错过了一些东西,我是一个新手,因为我在收到第一条kafka消息后有以下错误行为:
- 只显示第一条消息,之后连接很快关闭
- 要看到第二个我必须刷新页面,这不是websocket方式
- 因为连接关闭通道它不是红色的,所以它一直卡在cosume函数中直到我读到它。我不能有这种行为
- 为了避免第 3 点,我想我必须有一种机制,只有当一个或多个 ws 连接时,我才将消息发送到通道。
你很接近。只是缺少两件事。
1- 使用 select 语句继续从 kafka 频道接收新消息。
2- 保持活动的 websocket 连接。这个answer有更多细节
让我知道这是否适合您。
Gorilla chat example 接近您的需要。该示例将从任何客户端接收到的消息广播到所有连接的客户端。执行以下操作以使代码适应您的用例:
更改 client read pump 以丢弃收到的消息而不是将消息发送到中心。
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if _, _, err := c.NextReader(); err != nil {
break
}
}
}
更改您的 Kafka 读取循环以将消息发送到 Hub.broadcast 频道。
for {
msg, err := c.ReadMessage(xxx)
if err != nil {
// handle error
}
hub.broadcast <- msg
}
删除the code that coalesces messages in the client send queue to a single websocket message或调整客户端在单个websocket消息中处理多个Kafka消息。
我在 golang 中使用 gin-framework 创建了一个 web 服务。在这个项目中,我还使用了一些来自特定主题的 kafka 消息。我想要实现的是将我从主题中收到的消息倒入 websocket 中。所以通信只是一种方式,不止一个人可以连接到网络套接字并看到传入的消息。 我 tought 使用通道,所以在接收 kafka 消息的函数中我有这样的东西:
ch <- KafkaMessage
在 gin 框架中我创建了这样的东西:
requestRouterRPCv1.GET("wf-live", wsWorkFlowLive)
func wsWorkFlowLive(c *gin.Context) {
ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Println("error get connection")
log.Fatal(err)
}
defer ws.Close()
err = ws.WriteJSON(<-ch)
if err != nil {
log.Println("error write message: " + err.Error())
}
}
var upGrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
这里是我用来测试 websocket 的 html 片段:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>index</title>
</head>
<body>
<h1>test websocket</h1>
<p id="message-json"></p>
<p id="message-text"></p>
<script>
function jsonWS() {
var ws = new WebSocket("ws://ws.acme.com/ws/v1/wf-live");
ws.onmessage = function (evt) {
console.log("Received Message: " + evt.data);
document.getElementById("message-json").innerText += evt.data;
};
ws.onclose = function (evt) {
console.log("Connection closed.");
};
}
// Start websocket
jsonWS();
</script>
</body>
</html>
但是我错过了一些东西,我是一个新手,因为我在收到第一条kafka消息后有以下错误行为:
- 只显示第一条消息,之后连接很快关闭
- 要看到第二个我必须刷新页面,这不是websocket方式
- 因为连接关闭通道它不是红色的,所以它一直卡在cosume函数中直到我读到它。我不能有这种行为
- 为了避免第 3 点,我想我必须有一种机制,只有当一个或多个 ws 连接时,我才将消息发送到通道。
你很接近。只是缺少两件事。
1- 使用 select 语句继续从 kafka 频道接收新消息。
2- 保持活动的 websocket 连接。这个answer有更多细节
让我知道这是否适合您。
Gorilla chat example 接近您的需要。该示例将从任何客户端接收到的消息广播到所有连接的客户端。执行以下操作以使代码适应您的用例:
更改 client read pump 以丢弃收到的消息而不是将消息发送到中心。
func (c *Client) readPump() { defer func() { c.hub.unregister <- c c.conn.Close() }() c.conn.SetReadLimit(maxMessageSize) c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { if _, _, err := c.NextReader(); err != nil { break } } }
更改您的 Kafka 读取循环以将消息发送到 Hub.broadcast 频道。
for { msg, err := c.ReadMessage(xxx) if err != nil { // handle error } hub.broadcast <- msg }
删除the code that coalesces messages in the client send queue to a single websocket message或调整客户端在单个websocket消息中处理多个Kafka消息。