ksqldb - 拉取查询的一次交付,多个应用程序实例
ksqldb - exactly one delivery for pull query, multiple app instances
我正在尝试在 ksqldb 之上构建应用程序。
假设我有一个简单的制作人:
package main
import (
"fmt"
"github.com/rmoff/ksqldb-go"
"net/http"
)
var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()
func init() {
offset := `SET 'auto.offset.reset' = 'earliest';`
if err := client.Execute(offset); err != nil {
panic(err)
}
s1 := `
CREATE OR REPLACE STREAM userEvents (
userId VARCHAR KEY,
eventType VARCHAR
)
WITH (
kafka_topic='user_events',
value_format='json',
partitions=8
);
`
if err := client.Execute(s1); err != nil {
panic(err)
}
}
func main() {
http.HandleFunc("/emit", hello)
http.ListenAndServe(":4201", nil)
}
func hello(w http.ResponseWriter, req *http.Request) {
userId := req.URL.Query().Get("userId")
if userId == "" {
http.Error(w, "no userId", 400)
return
}
userEvent := req.URL.Query().Get("event")
if userEvent == "" {
userEvent = "unknown"
}
err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');",
userId, userEvent))
if err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(200)
return
}
此应用创建一个数据流并公开一个端点以用数据填充该流。
另外,我有一个消费者:
package main
import (
"context"
"fmt"
"github.com/rmoff/ksqldb-go"
)
var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()
func main() {
query := `SET 'auto.offset.reset' = 'earliest';`
if err := client.Execute(query); err != nil {
panic(err)
}
ctx := context.TODO()
rows := make(chan ksqldb.Row)
headers := make(chan ksqldb.Header)
go func() {
if err := client.Push(ctx,
"SELECT * FROM userEvents EMIT CHANGES;",
rows,
headers); err != nil {
panic(err)
}
}()
h := <-headers
fmt.Printf("headers: [%v]", h)
for {
select {
case r := <-rows:
fmt.Printf("received event: [%v]", r)
}
}
}
而我 运行 一个生产者和多个消费者,使用相同的查询。如何(并且可能?)仅在一个消费者上接收事件?现在,通过这样的设置,我在所有可用的消费者上接收到这些事件,但我想在一个消费者上处理事件(处理时间会很长,所以我需要这个来实现并行性)。
老实说,我认为这是“标准”,所有连接的应用程序都属于同一个组,而且这种送货我是免费的。
本地集群配置(这是来自 Confluentic how-to-start 的标准配置):
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ksqldb-server:
image: confluentinc/ksqldb-server:0.23.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.23.1
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
是我配置有问题还是我误解了这个数据库的用法?
感谢您的帮助!
首先,请注意,我不再维护该客户端,您可能需要查看 https://github.com/thmeitz/ksqldb-go。
现在回答你的问题。如果我的理解正确,您希望 运行 同一逻辑 consumer 的多个实例用于并行目的,因此每条消息都应由该逻辑使用者处理一次。
如果是这种情况,那么您正在描述 Kafka 中所谓的 consumer group。消费者的多个实例使用相同的客户端 ID 标识自己,Kafka 确保来自源主题分区的数据被路由到该组中的可用消费者。如果有四个消费者和八个分区,则每个消费者将从两个分区中获取数据。如果一个消费者离开了该组(它崩溃了,你缩减了规模等),那么 Kafka 会将该消费者的分区重新分配给该组的其余消费者。
这与您所看到的行为不同,您在其中有效地实例化了多个独立的使用者。通过设计,Kafka 确保订阅主题的每个消费者都能收到关于该主题的所有消息。
我这里故意说的是Kafka,而不是ksqlDB。这是因为 ksqlDB 是建立在 Kafka 之上的,为了理解您所看到的内容,解释基础知识很重要。
要获得您正在寻找的行为,您可能希望直接在您的消费者应用程序中使用 Consumer API。您可以在 this quickstart for Golang and Kafka 中查看消费者 API 的示例。要创建消费者组,您需要指定一个唯一的 group.id
.
我正在尝试在 ksqldb 之上构建应用程序。
假设我有一个简单的制作人:
package main
import (
"fmt"
"github.com/rmoff/ksqldb-go"
"net/http"
)
var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()
func init() {
offset := `SET 'auto.offset.reset' = 'earliest';`
if err := client.Execute(offset); err != nil {
panic(err)
}
s1 := `
CREATE OR REPLACE STREAM userEvents (
userId VARCHAR KEY,
eventType VARCHAR
)
WITH (
kafka_topic='user_events',
value_format='json',
partitions=8
);
`
if err := client.Execute(s1); err != nil {
panic(err)
}
}
func main() {
http.HandleFunc("/emit", hello)
http.ListenAndServe(":4201", nil)
}
func hello(w http.ResponseWriter, req *http.Request) {
userId := req.URL.Query().Get("userId")
if userId == "" {
http.Error(w, "no userId", 400)
return
}
userEvent := req.URL.Query().Get("event")
if userEvent == "" {
userEvent = "unknown"
}
err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');",
userId, userEvent))
if err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(200)
return
}
此应用创建一个数据流并公开一个端点以用数据填充该流。
另外,我有一个消费者:
package main
import (
"context"
"fmt"
"github.com/rmoff/ksqldb-go"
)
var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()
func main() {
query := `SET 'auto.offset.reset' = 'earliest';`
if err := client.Execute(query); err != nil {
panic(err)
}
ctx := context.TODO()
rows := make(chan ksqldb.Row)
headers := make(chan ksqldb.Header)
go func() {
if err := client.Push(ctx,
"SELECT * FROM userEvents EMIT CHANGES;",
rows,
headers); err != nil {
panic(err)
}
}()
h := <-headers
fmt.Printf("headers: [%v]", h)
for {
select {
case r := <-rows:
fmt.Printf("received event: [%v]", r)
}
}
}
而我 运行 一个生产者和多个消费者,使用相同的查询。如何(并且可能?)仅在一个消费者上接收事件?现在,通过这样的设置,我在所有可用的消费者上接收到这些事件,但我想在一个消费者上处理事件(处理时间会很长,所以我需要这个来实现并行性)。
老实说,我认为这是“标准”,所有连接的应用程序都属于同一个组,而且这种送货我是免费的。
本地集群配置(这是来自 Confluentic how-to-start 的标准配置):
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ksqldb-server:
image: confluentinc/ksqldb-server:0.23.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.23.1
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
是我配置有问题还是我误解了这个数据库的用法? 感谢您的帮助!
首先,请注意,我不再维护该客户端,您可能需要查看 https://github.com/thmeitz/ksqldb-go。
现在回答你的问题。如果我的理解正确,您希望 运行 同一逻辑 consumer 的多个实例用于并行目的,因此每条消息都应由该逻辑使用者处理一次。
如果是这种情况,那么您正在描述 Kafka 中所谓的 consumer group。消费者的多个实例使用相同的客户端 ID 标识自己,Kafka 确保来自源主题分区的数据被路由到该组中的可用消费者。如果有四个消费者和八个分区,则每个消费者将从两个分区中获取数据。如果一个消费者离开了该组(它崩溃了,你缩减了规模等),那么 Kafka 会将该消费者的分区重新分配给该组的其余消费者。
这与您所看到的行为不同,您在其中有效地实例化了多个独立的使用者。通过设计,Kafka 确保订阅主题的每个消费者都能收到关于该主题的所有消息。
我这里故意说的是Kafka,而不是ksqlDB。这是因为 ksqlDB 是建立在 Kafka 之上的,为了理解您所看到的内容,解释基础知识很重要。
要获得您正在寻找的行为,您可能希望直接在您的消费者应用程序中使用 Consumer API。您可以在 this quickstart for Golang and Kafka 中查看消费者 API 的示例。要创建消费者组,您需要指定一个唯一的 group.id
.