Confluent Platform——如何正确使用ksql-datagen?

Confluent Platform - how to properly use ksql-datagen?

我正在使用 Confluent Platform v 7.0.1 的 dockerized 版本:

CONTAINER ID   IMAGE                                             COMMAND                  CREATED        STATUS        PORTS                                                                                  NAMES
e6963904b485   confluentinc/cp-enterprise-control-center:7.0.1   "/etc/confluent/dock…"   25 hours ago   Up 25 hours   0.0.0.0:9021->9021/tcp, :::9021->9021/tcp                                              control-center
49ade0e752b4   confluentinc/cp-ksqldb-cli:7.0.1                  "/bin/sh"                25 hours ago   Up 25 hours                                                                                          ksqldb-cli
95b0982c0159   confluentinc/ksqldb-examples:7.0.1                "bash -c 'echo Waiti…"   25 hours ago   Up 25 hours                                                                                          ksql-datagen
e28e3b937f6e   confluentinc/cp-ksqldb-server:7.0.1               "/etc/confluent/dock…"   25 hours ago   Up 25 hours   0.0.0.0:8088->8088/tcp, :::8088->8088/tcp                                              ksqldb-server
af92bfb84cb1   confluentinc/cp-kafka-rest:7.0.1                  "/etc/confluent/dock…"   25 hours ago   Up 25 hours   0.0.0.0:8082->8082/tcp, :::8082->8082/tcp                                              rest-proxy
318a999e76dc   cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0   "/etc/confluent/dock…"   25 hours ago   Up 25 hours   0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9092/tcp                                    connect
0c299fbda7c5   confluentinc/cp-schema-registry:7.0.1             "/etc/confluent/dock…"   25 hours ago   Up 25 hours   0.0.0.0:8081->8081/tcp, :::8081->8081/tcp                                              schema-registry
a33075002386   confluentinc/cp-server:7.0.1                      "/etc/confluent/dock…"   25 hours ago   Up 25 hours   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:9101->9101/tcp, :::9101->9101/tcp   broker
135f832fbccb   confluentinc/cp-zookeeper:7.0.1                   "/etc/confluent/dock…"   25 hours ago   Up 25 hours   2888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 3888/tcp                          zookeeper

我正在尝试使用 sql-datagen,如 Confluent 页面中所述:

https://docs.ksqldb.io/en/0.7.1-ksqldb/tutorials/basics-docker/


docker run --network confluent-platform_default --rm --name datagen-orders \
    confluentinc/ksqldb-examples:7.0.1 \
    ksql-datagen \
        bootstrap-server=localhost:9092 \
        quickstart=orders \
        format=json \
        topic=orders \
        msgRate=5

这是返回的配置信息值:

INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.cast.strings.preserve.nulls = true
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.create.or.replace.enabled = true
    ksql.error.classifier.regex = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.lambdas.enabled = true
    ksql.metastore.backup.location = 
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.nested.error.set.null = true
    ksql.output.topic.name.prefix = 
    ksql.persistence.default.format.key = KAFKA
    ksql.persistence.default.format.value = null
    ksql.persistence.wrap.single.values = null
    ksql.persistent.prefix = query_
    ksql.properties.overrides.denylist = []
    ksql.pull.queries.enable = true
    ksql.query.error.max.queue.size = 10
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.persistent.max.bytes.buffering.total = -1
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.interpreter.enabled = true
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.query.pull.max.concurrent.requests = 2147483647
    ksql.query.pull.max.hourly.bandwidth.megabytes = 2147483647
    ksql.query.pull.max.qps = 2147483647
    ksql.query.pull.metrics.enabled = true
    ksql.query.pull.table.scan.enabled = false
    ksql.query.pull.thread.pool.size = 100
    ksql.query.push.scalable.enabled = false
    ksql.query.push.scalable.interpreter.enabled = true
    ksql.query.retry.backoff.initial.ms = 15000
    ksql.query.retry.backoff.max.ms = 900000
    ksql.query.status.running.threshold.seconds = 300
    ksql.query.transient.max.bytes.buffering.total = -1
    ksql.queryanonymizer.cluster_namespace = null
    ksql.queryanonymizer.logs_enabled = true
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = http://localhost:8081
    ksql.security.extension.class = null
    ksql.service.id = default_
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.suppress.buffer.size.bytes = -1
    ksql.suppress.enabled = false
    ksql.timestamp.throw.on.invalid = false
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ksql.variable.substitution.enable = true
    metric.reporters = []
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:376)
[2022-02-18 17:33:30,254] INFO AvroDataConfig values: 
    connect.meta.data = true
    discard.type.doc.default = false
    enhanced.avro.schema.support = false
    schemas.cache.config = 1
    scrub.invalid.names = false
 (io.confluent.connect.avro.AvroDataConfig:376)
[2022-02-18 17:33:30,354] INFO ProcessingLogConfig values: 
    ksql.logging.processing.rows.include = false
    ksql.logging.processing.stream.auto.create = false
    ksql.logging.processing.stream.name = KSQL_PROCESSING_LOG
    ksql.logging.processing.topic.auto.create = false
    ksql.logging.processing.topic.name = 
    ksql.logging.processing.topic.partitions = 1
    ksql.logging.processing.topic.replication.factor = 1

我遇到超时错误:

Error when sending message to topic: 'orders', with key: '[0]', and value: '[ 1489814116510L | 0 | 'Item_349' | 9.871488089870846 | Struct{city=City_56,state=State_97,zipcode=56964} ]'
org.apache.kafka.common.errors.TimeoutException: Topic orders not present in metadata after 60000 ms.

当我查询 Ksql 时,我确实看到了 'orders' 主题:

show topics;

 Kafka Topic                 | Partitions | Partition Replicas 
---------------------------------------------------------------
 PERSON_STATS                | 1          | 1                  
 default_ksql_processing_log | 1          | 1                  
 docker-connect-configs      | 1          | 1                  
 docker-connect-offsets      | 25         | 1                  
 docker-connect-status       | 5          | 1                  
 leeds-users                 | 1          | 1                  
 movements                   | 1          | 1                  
 orders                      | 2          | 1                  
 orders_local                | 2          | 1                  
 pageviews                   | 1          | 1                  
 users                       | 1          | 1                  
---------------------------------------------------------------

但是,没有为此主题生成数据,ksql-datagen 一直超时。

我应该怎么做才能解决这个问题?

您可能会遇到问题,因为您 运行 旧版本的 ksqlDB 快速入门 (0.7.1) 和 Confluent Platform 7.0.1。

如果您查看像这样的快速入门:https://ksqldb.io/quickstart-platform.html,事情可能会更好。

我在寻找该数据生成器的更新版本,但没能很快找到。如果您正在寻找有关结构化数据的更多信息,请阅读 https://docs.ksqldb.io/en/latest/how-to-guides/query-structured-data/

看来我遇到的问题可能与您遇到的问题类似。

原始 docker-compose 条目没有 BOOTSTRAP_SERVER 环境变量。所以我然后手动添加了 bootstrap-server= 命令行选项并且它起作用了。生成数据并创建主题。

所以根据你的情况我想你的bootstrap-server=可能是错误的

例如我用了 ksql-datagen bootstrap-server=broker:29092 quickstart=orders msgRate=1 topic=orders_topic

请注意,我没有像您那样使用 localhost,而是使用 docker-compose 撰写文件中定义的服务器名称。