Confluent Platform——如何正确使用ksql-datagen?
Confluent Platform - how to properly use ksql-datagen?
我正在使用 Confluent Platform v 7.0.1 的 dockerized 版本:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e6963904b485 confluentinc/cp-enterprise-control-center:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:9021->9021/tcp, :::9021->9021/tcp control-center
49ade0e752b4 confluentinc/cp-ksqldb-cli:7.0.1 "/bin/sh" 25 hours ago Up 25 hours ksqldb-cli
95b0982c0159 confluentinc/ksqldb-examples:7.0.1 "bash -c 'echo Waiti…" 25 hours ago Up 25 hours ksql-datagen
e28e3b937f6e confluentinc/cp-ksqldb-server:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:8088->8088/tcp, :::8088->8088/tcp ksqldb-server
af92bfb84cb1 confluentinc/cp-kafka-rest:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:8082->8082/tcp, :::8082->8082/tcp rest-proxy
318a999e76dc cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9092/tcp connect
0c299fbda7c5 confluentinc/cp-schema-registry:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp schema-registry
a33075002386 confluentinc/cp-server:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:9101->9101/tcp, :::9101->9101/tcp broker
135f832fbccb confluentinc/cp-zookeeper:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 2888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 3888/tcp zookeeper
我正在尝试使用 sql-datagen,如 Confluent 页面中所述:
https://docs.ksqldb.io/en/0.7.1-ksqldb/tutorials/basics-docker/
docker run --network confluent-platform_default --rm --name datagen-orders \
confluentinc/ksqldb-examples:7.0.1 \
ksql-datagen \
bootstrap-server=localhost:9092 \
quickstart=orders \
format=json \
topic=orders \
msgRate=5
这是返回的配置信息值:
INFO KsqlConfig values:
ksql.access.validator.enable = auto
ksql.authorization.cache.expiry.time.secs = 30
ksql.authorization.cache.max.entries = 10000
ksql.cast.strings.preserve.nulls = true
ksql.connect.url = http://localhost:8083
ksql.connect.worker.config =
ksql.create.or.replace.enabled = true
ksql.error.classifier.regex =
ksql.extension.dir = ext
ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
ksql.insert.into.values.enabled = true
ksql.internal.topic.min.insync.replicas = 1
ksql.internal.topic.replicas = 1
ksql.lambdas.enabled = true
ksql.metastore.backup.location =
ksql.metrics.extension = null
ksql.metrics.tags.custom =
ksql.nested.error.set.null = true
ksql.output.topic.name.prefix =
ksql.persistence.default.format.key = KAFKA
ksql.persistence.default.format.value = null
ksql.persistence.wrap.single.values = null
ksql.persistent.prefix = query_
ksql.properties.overrides.denylist = []
ksql.pull.queries.enable = true
ksql.query.error.max.queue.size = 10
ksql.query.persistent.active.limit = 2147483647
ksql.query.persistent.max.bytes.buffering.total = -1
ksql.query.pull.enable.standby.reads = false
ksql.query.pull.interpreter.enabled = true
ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
ksql.query.pull.max.concurrent.requests = 2147483647
ksql.query.pull.max.hourly.bandwidth.megabytes = 2147483647
ksql.query.pull.max.qps = 2147483647
ksql.query.pull.metrics.enabled = true
ksql.query.pull.table.scan.enabled = false
ksql.query.pull.thread.pool.size = 100
ksql.query.push.scalable.enabled = false
ksql.query.push.scalable.interpreter.enabled = true
ksql.query.retry.backoff.initial.ms = 15000
ksql.query.retry.backoff.max.ms = 900000
ksql.query.status.running.threshold.seconds = 300
ksql.query.transient.max.bytes.buffering.total = -1
ksql.queryanonymizer.cluster_namespace = null
ksql.queryanonymizer.logs_enabled = true
ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
ksql.schema.registry.url = http://localhost:8081
ksql.security.extension.class = null
ksql.service.id = default_
ksql.sink.window.change.log.additional.retention = 1000000
ksql.streams.shutdown.timeout.ms = 300000
ksql.suppress.buffer.size.bytes = -1
ksql.suppress.enabled = false
ksql.timestamp.throw.on.invalid = false
ksql.transient.prefix = transient_
ksql.udf.collect.metrics = false
ksql.udf.enable.security.manager = true
ksql.udfs.enabled = true
ksql.variable.substitution.enable = true
metric.reporters = []
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
(io.confluent.ksql.util.KsqlConfig:376)
[2022-02-18 17:33:30,254] INFO AvroDataConfig values:
connect.meta.data = true
discard.type.doc.default = false
enhanced.avro.schema.support = false
schemas.cache.config = 1
scrub.invalid.names = false
(io.confluent.connect.avro.AvroDataConfig:376)
[2022-02-18 17:33:30,354] INFO ProcessingLogConfig values:
ksql.logging.processing.rows.include = false
ksql.logging.processing.stream.auto.create = false
ksql.logging.processing.stream.name = KSQL_PROCESSING_LOG
ksql.logging.processing.topic.auto.create = false
ksql.logging.processing.topic.name =
ksql.logging.processing.topic.partitions = 1
ksql.logging.processing.topic.replication.factor = 1
我遇到超时错误:
Error when sending message to topic: 'orders', with key: '[0]', and value: '[ 1489814116510L | 0 | 'Item_349' | 9.871488089870846 | Struct{city=City_56,state=State_97,zipcode=56964} ]'
org.apache.kafka.common.errors.TimeoutException: Topic orders not present in metadata after 60000 ms.
当我查询 Ksql 时,我确实看到了 'orders' 主题:
show topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
PERSON_STATS | 1 | 1
default_ksql_processing_log | 1 | 1
docker-connect-configs | 1 | 1
docker-connect-offsets | 25 | 1
docker-connect-status | 5 | 1
leeds-users | 1 | 1
movements | 1 | 1
orders | 2 | 1
orders_local | 2 | 1
pageviews | 1 | 1
users | 1 | 1
---------------------------------------------------------------
但是,没有为此主题生成数据,ksql-datagen 一直超时。
我应该怎么做才能解决这个问题?
您可能会遇到问题,因为您 运行 旧版本的 ksqlDB 快速入门 (0.7.1) 和 Confluent Platform 7.0.1。
如果您查看像这样的快速入门:https://ksqldb.io/quickstart-platform.html,事情可能会更好。
我在寻找该数据生成器的更新版本,但没能很快找到。如果您正在寻找有关结构化数据的更多信息,请阅读 https://docs.ksqldb.io/en/latest/how-to-guides/query-structured-data/。
看来我遇到的问题可能与您遇到的问题类似。
原始 docker-compose 条目没有 BOOTSTRAP_SERVER 环境变量。所以我然后手动添加了 bootstrap-server=
命令行选项并且它起作用了。生成数据并创建主题。
所以根据你的情况我想你的bootstrap-server=
可能是错误的
例如我用了 ksql-datagen bootstrap-server=broker:29092 quickstart=orders msgRate=1 topic=orders_topic
请注意,我没有像您那样使用 localhost,而是使用 docker-compose 撰写文件中定义的服务器名称。
我正在使用 Confluent Platform v 7.0.1 的 dockerized 版本:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e6963904b485 confluentinc/cp-enterprise-control-center:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:9021->9021/tcp, :::9021->9021/tcp control-center
49ade0e752b4 confluentinc/cp-ksqldb-cli:7.0.1 "/bin/sh" 25 hours ago Up 25 hours ksqldb-cli
95b0982c0159 confluentinc/ksqldb-examples:7.0.1 "bash -c 'echo Waiti…" 25 hours ago Up 25 hours ksql-datagen
e28e3b937f6e confluentinc/cp-ksqldb-server:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:8088->8088/tcp, :::8088->8088/tcp ksqldb-server
af92bfb84cb1 confluentinc/cp-kafka-rest:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:8082->8082/tcp, :::8082->8082/tcp rest-proxy
318a999e76dc cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9092/tcp connect
0c299fbda7c5 confluentinc/cp-schema-registry:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp schema-registry
a33075002386 confluentinc/cp-server:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:9101->9101/tcp, :::9101->9101/tcp broker
135f832fbccb confluentinc/cp-zookeeper:7.0.1 "/etc/confluent/dock…" 25 hours ago Up 25 hours 2888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 3888/tcp zookeeper
我正在尝试使用 sql-datagen,如 Confluent 页面中所述:
https://docs.ksqldb.io/en/0.7.1-ksqldb/tutorials/basics-docker/
docker run --network confluent-platform_default --rm --name datagen-orders \
confluentinc/ksqldb-examples:7.0.1 \
ksql-datagen \
bootstrap-server=localhost:9092 \
quickstart=orders \
format=json \
topic=orders \
msgRate=5
这是返回的配置信息值:
INFO KsqlConfig values:
ksql.access.validator.enable = auto
ksql.authorization.cache.expiry.time.secs = 30
ksql.authorization.cache.max.entries = 10000
ksql.cast.strings.preserve.nulls = true
ksql.connect.url = http://localhost:8083
ksql.connect.worker.config =
ksql.create.or.replace.enabled = true
ksql.error.classifier.regex =
ksql.extension.dir = ext
ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
ksql.insert.into.values.enabled = true
ksql.internal.topic.min.insync.replicas = 1
ksql.internal.topic.replicas = 1
ksql.lambdas.enabled = true
ksql.metastore.backup.location =
ksql.metrics.extension = null
ksql.metrics.tags.custom =
ksql.nested.error.set.null = true
ksql.output.topic.name.prefix =
ksql.persistence.default.format.key = KAFKA
ksql.persistence.default.format.value = null
ksql.persistence.wrap.single.values = null
ksql.persistent.prefix = query_
ksql.properties.overrides.denylist = []
ksql.pull.queries.enable = true
ksql.query.error.max.queue.size = 10
ksql.query.persistent.active.limit = 2147483647
ksql.query.persistent.max.bytes.buffering.total = -1
ksql.query.pull.enable.standby.reads = false
ksql.query.pull.interpreter.enabled = true
ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
ksql.query.pull.max.concurrent.requests = 2147483647
ksql.query.pull.max.hourly.bandwidth.megabytes = 2147483647
ksql.query.pull.max.qps = 2147483647
ksql.query.pull.metrics.enabled = true
ksql.query.pull.table.scan.enabled = false
ksql.query.pull.thread.pool.size = 100
ksql.query.push.scalable.enabled = false
ksql.query.push.scalable.interpreter.enabled = true
ksql.query.retry.backoff.initial.ms = 15000
ksql.query.retry.backoff.max.ms = 900000
ksql.query.status.running.threshold.seconds = 300
ksql.query.transient.max.bytes.buffering.total = -1
ksql.queryanonymizer.cluster_namespace = null
ksql.queryanonymizer.logs_enabled = true
ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
ksql.schema.registry.url = http://localhost:8081
ksql.security.extension.class = null
ksql.service.id = default_
ksql.sink.window.change.log.additional.retention = 1000000
ksql.streams.shutdown.timeout.ms = 300000
ksql.suppress.buffer.size.bytes = -1
ksql.suppress.enabled = false
ksql.timestamp.throw.on.invalid = false
ksql.transient.prefix = transient_
ksql.udf.collect.metrics = false
ksql.udf.enable.security.manager = true
ksql.udfs.enabled = true
ksql.variable.substitution.enable = true
metric.reporters = []
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
(io.confluent.ksql.util.KsqlConfig:376)
[2022-02-18 17:33:30,254] INFO AvroDataConfig values:
connect.meta.data = true
discard.type.doc.default = false
enhanced.avro.schema.support = false
schemas.cache.config = 1
scrub.invalid.names = false
(io.confluent.connect.avro.AvroDataConfig:376)
[2022-02-18 17:33:30,354] INFO ProcessingLogConfig values:
ksql.logging.processing.rows.include = false
ksql.logging.processing.stream.auto.create = false
ksql.logging.processing.stream.name = KSQL_PROCESSING_LOG
ksql.logging.processing.topic.auto.create = false
ksql.logging.processing.topic.name =
ksql.logging.processing.topic.partitions = 1
ksql.logging.processing.topic.replication.factor = 1
我遇到超时错误:
Error when sending message to topic: 'orders', with key: '[0]', and value: '[ 1489814116510L | 0 | 'Item_349' | 9.871488089870846 | Struct{city=City_56,state=State_97,zipcode=56964} ]'
org.apache.kafka.common.errors.TimeoutException: Topic orders not present in metadata after 60000 ms.
当我查询 Ksql 时,我确实看到了 'orders' 主题:
show topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
PERSON_STATS | 1 | 1
default_ksql_processing_log | 1 | 1
docker-connect-configs | 1 | 1
docker-connect-offsets | 25 | 1
docker-connect-status | 5 | 1
leeds-users | 1 | 1
movements | 1 | 1
orders | 2 | 1
orders_local | 2 | 1
pageviews | 1 | 1
users | 1 | 1
---------------------------------------------------------------
但是,没有为此主题生成数据,ksql-datagen 一直超时。
我应该怎么做才能解决这个问题?
您可能会遇到问题,因为您 运行 旧版本的 ksqlDB 快速入门 (0.7.1) 和 Confluent Platform 7.0.1。
如果您查看像这样的快速入门:https://ksqldb.io/quickstart-platform.html,事情可能会更好。
我在寻找该数据生成器的更新版本,但没能很快找到。如果您正在寻找有关结构化数据的更多信息,请阅读 https://docs.ksqldb.io/en/latest/how-to-guides/query-structured-data/。
看来我遇到的问题可能与您遇到的问题类似。
原始 docker-compose 条目没有 BOOTSTRAP_SERVER 环境变量。所以我然后手动添加了 bootstrap-server=
命令行选项并且它起作用了。生成数据并创建主题。
所以根据你的情况我想你的bootstrap-server=
可能是错误的
例如我用了 ksql-datagen bootstrap-server=broker:29092 quickstart=orders msgRate=1 topic=orders_topic
请注意,我没有像您那样使用 localhost,而是使用 docker-compose 撰写文件中定义的服务器名称。