如何将MySql表数据集成到Ksql流或表中?
How to Integrate MySql tables Data To Ksql Stream or Tables?
我正在尝试构建从 MySql 到 Ksql.
的数据管道
用例:数据源是MySql。我在 MySql.
中创建了一个 table
我正在使用
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
启动独立连接器。它工作正常。
我正在使用主题名称启动消费者,即
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1Category --from-beginning
当我在 MySQL table 中插入数据时,我也在消费者中得到了结果。 我已经创建了具有相同主题名称的 KSQL Stream。我也希望在我的 Kstream 中得到相同的结果,但是我在做
时没有得到任何结果
select * from <streamName>
连接器配置--source-quickstart-mysql.properties
name=jdbc_source_mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:mysql://localhost:3306/testDB?user=root&password=cloudera
#comment=Which table(s) to include
table.whitelist=ftest
mode=incrementing
incrementing.column.name=id
topic.prefix=ftopic
示例数据
- MySql
1.) 创建数据库:
CREATE DATABASE testDB;
2.) 使用数据库:
USE testDB;
3.) 创建 table:
CREATE TABLE products (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
4.) 插入数据到table:
INSERT INTO products(id,name,description,weight)
VALUES (103,'car','Small car',20);
- KSQL
1.) 创建流:
CREATE STREAM pro_original (id int, name varchar, description varchar,weight bigint) WITH \
(kafka_topic='proproducts', value_format='DELIMITED');
2.) Select 查询:
Select * from pro_original;
预期输出
- 消费者
正在获取插入到 MySQL table.
中的数据
我在这里获取 MySQL 中的数据。
- Ksql
应填充插入 Mysql table 并反映在 Kafka 主题中的 In-Stream 数据。
我在 ksql 中没有得到预期的结果
帮我处理这个数据管道。
您的数据是 AVRO 格式,但您定义的 DELIMITED
是 VALUE_FORMAT
而不是 AVRO
。重要的是要指示 KSQL 存储在主题中的值的格式。以下应该为您解决问题。
CREATE STREAM pro_original_v2 \
WITH (KAFKA_TOPIC='products', VALUE_FORMAT='AVRO');
数据插入kafka主题在执行
之后
SELECT * FROM pro_original_v2;
现在应该可以在您的 ksql 控制台中看到 window。
您可以在 KSQL here 中查看一些 Avro 示例。
我正在尝试构建从 MySql 到 Ksql.
的数据管道用例:数据源是MySql。我在 MySql.
中创建了一个 table我正在使用
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
启动独立连接器。它工作正常。
我正在使用主题名称启动消费者,即
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1Category --from-beginning
当我在 MySQL table 中插入数据时,我也在消费者中得到了结果。 我已经创建了具有相同主题名称的 KSQL Stream。我也希望在我的 Kstream 中得到相同的结果,但是我在做
时没有得到任何结果select * from <streamName>
连接器配置--source-quickstart-mysql.properties
name=jdbc_source_mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:mysql://localhost:3306/testDB?user=root&password=cloudera
#comment=Which table(s) to include
table.whitelist=ftest
mode=incrementing
incrementing.column.name=id
topic.prefix=ftopic
示例数据
- MySql
1.) 创建数据库:
CREATE DATABASE testDB;
2.) 使用数据库:
USE testDB;
3.) 创建 table:
CREATE TABLE products (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
4.) 插入数据到table:
INSERT INTO products(id,name,description,weight)
VALUES (103,'car','Small car',20);
- KSQL
1.) 创建流:
CREATE STREAM pro_original (id int, name varchar, description varchar,weight bigint) WITH \
(kafka_topic='proproducts', value_format='DELIMITED');
2.) Select 查询:
Select * from pro_original;
预期输出
- 消费者
正在获取插入到 MySQL table.
中的数据我在这里获取 MySQL 中的数据。
- Ksql
应填充插入 Mysql table 并反映在 Kafka 主题中的 In-Stream 数据。
我在 ksql 中没有得到预期的结果
帮我处理这个数据管道。
您的数据是 AVRO 格式,但您定义的 DELIMITED
是 VALUE_FORMAT
而不是 AVRO
。重要的是要指示 KSQL 存储在主题中的值的格式。以下应该为您解决问题。
CREATE STREAM pro_original_v2 \
WITH (KAFKA_TOPIC='products', VALUE_FORMAT='AVRO');
数据插入kafka主题在执行
之后SELECT * FROM pro_original_v2;
现在应该可以在您的 ksql 控制台中看到 window。
您可以在 KSQL here 中查看一些 Avro 示例。