如何从 kafka 主题为 ksqldb 创建主题
how to create subject for ksqldb from kafka tapic
我使用 Mysql 数据库。假设我有一个 table 的订单。并使用 debezium mysql connect for Kafka,订单主题已经创建。但是我在 ksqldb 中创建流时遇到了问题。
CREATE STREAM orders WITH (
kafka_topic = 'myserver.mydatabase.orders',
value_format = 'avro'
);
我的 docker-compose 文件看起来像这样
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
privileged: true
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:latest
container_name: schema-registry
depends_on:
- kafka
- zookeeper
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
SCHEMA_REGISTRY_HOST_NAME: schema-registry
kafka-connect:
hostname: kafka-connect
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect
ports:
- 8083:8083
depends_on:
- schema-registry
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "quickstart-avro"
CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
volumes:
- $PWD/kafka/jars:/etc/kafka-connect/jars
ksqldb-server:
image: confluentinc/ksqldb-server:latest
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
ksqldb-cli:
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
depends_on:
- kafka
- ksqldb-server
- schema-registry
entrypoint: /bin/sh
tty: true
必须首先为此 table 创建主题。 avro有什么区别,json?
Using debezium mysql connect for Kafka
您可以将其设置为使用AvroConverter
,然后将自动创建主题
否则,你可以让KSQL使用VALUE_FORMAT=JSON
,你需要手动指定所有的字段名称。不清楚你问的有什么区别(它们是不同的序列化格式),但从 KSQL 的角度来看, JSON 单独被视为纯文本(类似于 DELIMITED)并且需要被解析,与另一个相比模式+字段已知的 Avro 等格式。
我解决了这个问题。使用此配置,您可以在主题中发送 mysql table 而无需 before 和 next 状态。
CREATE SOURCE CONNECTOR final_connector WITH (
'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname' = 'mysql',
'database.port' = '3306',
'database.user' = 'root',
'database.password' = 'mypassword',
'database.allowPublicKeyRetrieval' = 'true',
'database.server.id' = '184055',
'database.server.name' = 'db',
'database.whitelist' = 'mydb',
'database.history.kafka.bootstrap.servers' = 'kafka:9092',
'database.history.kafka.topic' = 'mydb',
'table.whitelist' = 'mydb.user',
'include.schema.changes' = 'false',
'transforms'= 'unwrap,extractkey',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractkey.field'= 'id',
'key.converter'= 'org.apache.kafka.connect.converters.IntegerConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);
并简单地创建您的流!
这个视频可以帮到你很多
我使用 Mysql 数据库。假设我有一个 table 的订单。并使用 debezium mysql connect for Kafka,订单主题已经创建。但是我在 ksqldb 中创建流时遇到了问题。
CREATE STREAM orders WITH (
kafka_topic = 'myserver.mydatabase.orders',
value_format = 'avro'
);
我的 docker-compose 文件看起来像这样
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
privileged: true
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:latest
container_name: schema-registry
depends_on:
- kafka
- zookeeper
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
SCHEMA_REGISTRY_HOST_NAME: schema-registry
kafka-connect:
hostname: kafka-connect
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect
ports:
- 8083:8083
depends_on:
- schema-registry
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "quickstart-avro"
CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
volumes:
- $PWD/kafka/jars:/etc/kafka-connect/jars
ksqldb-server:
image: confluentinc/ksqldb-server:latest
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
ksqldb-cli:
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
depends_on:
- kafka
- ksqldb-server
- schema-registry
entrypoint: /bin/sh
tty: true
必须首先为此 table 创建主题。 avro有什么区别,json?
Using debezium mysql connect for Kafka
您可以将其设置为使用AvroConverter
,然后将自动创建主题
否则,你可以让KSQL使用VALUE_FORMAT=JSON
,你需要手动指定所有的字段名称。不清楚你问的有什么区别(它们是不同的序列化格式),但从 KSQL 的角度来看, JSON 单独被视为纯文本(类似于 DELIMITED)并且需要被解析,与另一个相比模式+字段已知的 Avro 等格式。
我解决了这个问题。使用此配置,您可以在主题中发送 mysql table 而无需 before 和 next 状态。
CREATE SOURCE CONNECTOR final_connector WITH (
'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname' = 'mysql',
'database.port' = '3306',
'database.user' = 'root',
'database.password' = 'mypassword',
'database.allowPublicKeyRetrieval' = 'true',
'database.server.id' = '184055',
'database.server.name' = 'db',
'database.whitelist' = 'mydb',
'database.history.kafka.bootstrap.servers' = 'kafka:9092',
'database.history.kafka.topic' = 'mydb',
'table.whitelist' = 'mydb.user',
'include.schema.changes' = 'false',
'transforms'= 'unwrap,extractkey',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractkey.field'= 'id',
'key.converter'= 'org.apache.kafka.connect.converters.IntegerConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);
并简单地创建您的流!
这个视频可以帮到你很多