消息保存的 Kafka 主题未通过 Kafka Connector 正确保存
Messages saved Kafka Topic not saving correctly via Kafka Connector
所以我设置了 Confluent Kafka JDBC 连接器。首先,我启动一个模式注册表,例如
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
这是模式-registery.properties 文件
listeners=http://0.0.0.0:8081
kafkastore.connection.url=zookeeperhost:2181
kafkastore.bootstrap.servers=PLAINTEXT://kafkahost:9092
kafkastore.topic=_schemas
debug=false
接下来我启动一个像这样的独立连接器
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./jdbc-source.properties
connect-avro-standalone.properties 是
bootstrap.servers=kafkahost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java
jdbc-source.properties是
name=jdbc_source_oracle
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=host)(PORT=port))(CONNECT_DATA=(SERVER=dedicated)(SID=server)))
connection.user=xxx
connection.password=xxx
table.whitelist=table1, table2
mode=bulk
topic.prefix=my_topic
query=select * from table1 t1 join table1 t2 on t2.id = t1.id where t2.entereddate >='19-FEB-2019' and t2.entereddate <= '23-FEB-2019'
我使用的查询仅用于测试目的,我要使用的真正查询将实现递增模式并且不包含 where 子句。
现在这设法将数据发布到主题中,但发生了一些奇怪的事情。首先,ID 以不可读的格式保存。只是空广场。其次,数据库中填充的一些字段在主题中保存为空。第三,每当我尝试更改 JDBC 源文件中的查询日期时,什么也没有发生。它仍然包含我第一次发布时发布的相同消息 运行,因为 kafka 主题中的任何内容都没有更新我更改查询的次数。
谁能帮帮我?
编辑
我想做的是通过pyspark代码消费数据。这是我如何做的代码
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("data streaming app")\
.getOrCreate()
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.load()
query = data_raw.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
我也使用这个命令使用 kafka-avro-console-consumer 使用数据
./bin/kafka-avro-console-consumer \
--bootstrap-server kafkahost:9092 \
--property print.key=true \
--from-beginning \
--topic my_topic
这两个都给我奇怪的结果
这是 pyspark 代码给我的
这就是使用 avro 控制台给我的好处
屏蔽了一些字段和文本,因为它可能包含公司敏感信息。
如果您从 Spark 使用 Avro,则需要使用 correct deserializer。
您正在从控制台看到 Avro 数据中的字节,然后是 decimals/numerics、as detailed here 的处理。
您可以阅读更多关于 Kafka Connect 和 Avro 的序列化替代方案(包括 JSON)here。
所以我设置了 Confluent Kafka JDBC 连接器。首先,我启动一个模式注册表,例如
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
这是模式-registery.properties 文件
listeners=http://0.0.0.0:8081
kafkastore.connection.url=zookeeperhost:2181
kafkastore.bootstrap.servers=PLAINTEXT://kafkahost:9092
kafkastore.topic=_schemas
debug=false
接下来我启动一个像这样的独立连接器
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./jdbc-source.properties
connect-avro-standalone.properties 是
bootstrap.servers=kafkahost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java
jdbc-source.properties是
name=jdbc_source_oracle
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=host)(PORT=port))(CONNECT_DATA=(SERVER=dedicated)(SID=server)))
connection.user=xxx
connection.password=xxx
table.whitelist=table1, table2
mode=bulk
topic.prefix=my_topic
query=select * from table1 t1 join table1 t2 on t2.id = t1.id where t2.entereddate >='19-FEB-2019' and t2.entereddate <= '23-FEB-2019'
我使用的查询仅用于测试目的,我要使用的真正查询将实现递增模式并且不包含 where 子句。
现在这设法将数据发布到主题中,但发生了一些奇怪的事情。首先,ID 以不可读的格式保存。只是空广场。其次,数据库中填充的一些字段在主题中保存为空。第三,每当我尝试更改 JDBC 源文件中的查询日期时,什么也没有发生。它仍然包含我第一次发布时发布的相同消息 运行,因为 kafka 主题中的任何内容都没有更新我更改查询的次数。
谁能帮帮我?
编辑
我想做的是通过pyspark代码消费数据。这是我如何做的代码
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("data streaming app")\
.getOrCreate()
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.load()
query = data_raw.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
我也使用这个命令使用 kafka-avro-console-consumer 使用数据
./bin/kafka-avro-console-consumer \
--bootstrap-server kafkahost:9092 \
--property print.key=true \
--from-beginning \
--topic my_topic
这两个都给我奇怪的结果
这是 pyspark 代码给我的
这就是使用 avro 控制台给我的好处
屏蔽了一些字段和文本,因为它可能包含公司敏感信息。
如果您从 Spark 使用 Avro,则需要使用 correct deserializer。
您正在从控制台看到 Avro 数据中的字节,然后是 decimals/numerics、as detailed here 的处理。
您可以阅读更多关于 Kafka Connect 和 Avro 的序列化替代方案(包括 JSON)here。