ksqldb - 拉取查询的一次交付,多个应用程序实例

ksqldb - exactly one delivery for pull query, multiple app instances

我正在尝试在 ksqldb 之上构建应用程序。

假设我有一个简单的制作人:

package main

import (
    "fmt"
    "github.com/rmoff/ksqldb-go"
    "net/http"
)

var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()

func init() {
    offset := `SET 'auto.offset.reset' = 'earliest';`
    if err := client.Execute(offset); err != nil {
        panic(err)
    }

    s1 := `
        CREATE OR REPLACE STREAM userEvents (
            userId VARCHAR KEY,
            eventType VARCHAR
        )
        WITH (
            kafka_topic='user_events', 
            value_format='json', 
            partitions=8
        );
    `
    if err := client.Execute(s1); err != nil {
        panic(err)
    }
}

func main() {
    http.HandleFunc("/emit", hello)
    http.ListenAndServe(":4201", nil)
}

func hello(w http.ResponseWriter, req *http.Request) {
    userId := req.URL.Query().Get("userId")
    if userId == "" {
        http.Error(w, "no userId", 400)
        return
    }
    userEvent := req.URL.Query().Get("event")
    if userEvent == "" {
        userEvent = "unknown"
    }

    err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');",
        userId, userEvent))
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    w.WriteHeader(200)
    return
}

此应用创建一个数据流并公开一个端点以用数据填充该流。

另外,我有一个消费者:

package main

import (
    "context"
    "fmt"
    "github.com/rmoff/ksqldb-go"
)

var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()

func main() {
    query := `SET 'auto.offset.reset' = 'earliest';`
    if err := client.Execute(query); err != nil {
        panic(err)
    }

    ctx := context.TODO()
    rows := make(chan ksqldb.Row)
    headers := make(chan ksqldb.Header)
    go func() {
        if err := client.Push(ctx,
            "SELECT * FROM userEvents EMIT CHANGES;",
            rows,
            headers); err != nil {
            panic(err)
        }
    }()

    h := <-headers
    fmt.Printf("headers: [%v]", h)

    for {
        select {
        case r := <-rows:
            fmt.Printf("received event: [%v]", r)
        }
    }
}

而我 运行 一个生产者和多个消费者,使用相同的查询。如何(并且可能?)仅在一个消费者上接收事件?现在,通过这样的设置,我在所有可用的消费者上接收到这些事件,但我想在一个消费者上处理事件(处理时间会很长,所以我需要这个来实现并行性)。

老实说,我认为这是“标准”,所有连接的应用程序都属于同一个组,而且这种送货我是免费的。

本地集群配置(这是来自 Confluentic how-to-start 的标准配置):

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.23.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.23.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

是我配置有问题还是我误解了这个数据库的用法? 感谢您的帮助!

首先,请注意,我不再维护该客户端,您可能需要查看 https://github.com/thmeitz/ksqldb-go


现在回答你的问题。如果我的理解正确,您希望 运行 同一逻辑 consumer 的多个实例用于并行目的,因此每条消息都应由该逻辑使用者处理一次。

如果是这种情况,那么您正在描述 Kafka 中所谓的 consumer group。消费者的多个实例使用相同的客户端 ID 标识自己,Kafka 确保来自源主题分区的数据被路由到该组中的可用消费者。如果有四个消费者和八个分区,则每个消费者将从两个分区中获取数据。如果一个消费者离开了该组(它崩溃了,你缩减了规模等),那么 Kafka 会将该消费者的分区重新分配给该组的其余消费者。

这与您所看到的行为不同,您在其中有效地实例化了多个独立的使用者。通过设计,Kafka 确保订阅主题的每个消费者都能收到关于该主题的所有消息。

我这里故意说的是Kafka,而不是ksqlDB。这是因为 ksqlDB 是建立在 Kafka 之上的,为了理解您所看到的内容,解释基础知识很重要。

要获得您正在寻找的行为,您可能希望直接在您的消费者应用程序中使用 Consumer API。您可以在 this quickstart for Golang and Kafka 中查看消费者 API 的示例。要创建消费者组,您需要指定一个唯一的 group.id.