无法从 debezium-postgres 的 kafka-stream 读取 kafka-stream 数据
Unable to read kafka-stream data from debezium-postgres's kafka-stream
我使用以下命令启动了 kafka 连接器:
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-postgres/connect-postgres.properties
connect-avro-standalone.properties 中的序列化属性是:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
我创建了一个 java 后端来监听这个 kafka 流主题并且它能够从每个 add/update/delete.
的 postgres 中获取数据
但是数据以某种未知的编码格式传入,这就是我无法正确读取数据的原因。
这是相关的代码片段:
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
StreamsBuilder streamsBuilder = new StreamsBuilder();
final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteSerde = Serdes.ByteArray();
streamsBuilder.stream(Pattern.compile(getTopic()), Consumed.with(stringSerde, byteSerde))
.mapValues(data -> {
System.out.println("->"+new String(data));
return data;
});
我对需要更改的地方和内容感到困惑;在 avro connector prop 或 java side code
此处的 Kafka Connect 配置意味着 Kafka 主题上的消息将被 Avro 序列化:
value.converter=io.confluent.connect.avro.AvroConverter
这意味着您需要在 Streams 应用中使用 Avro 进行反序列化。详情请看这里:https://docs.confluent.io/current/streams/developer-guide/datatypes.html#avro
我使用以下命令启动了 kafka 连接器:
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-postgres/connect-postgres.properties
connect-avro-standalone.properties 中的序列化属性是:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
我创建了一个 java 后端来监听这个 kafka 流主题并且它能够从每个 add/update/delete.
的 postgres 中获取数据
但是数据以某种未知的编码格式传入,这就是我无法正确读取数据的原因。
这是相关的代码片段:
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
StreamsBuilder streamsBuilder = new StreamsBuilder();
final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteSerde = Serdes.ByteArray();
streamsBuilder.stream(Pattern.compile(getTopic()), Consumed.with(stringSerde, byteSerde))
.mapValues(data -> {
System.out.println("->"+new String(data));
return data;
});
我对需要更改的地方和内容感到困惑;在 avro connector prop 或 java side code
此处的 Kafka Connect 配置意味着 Kafka 主题上的消息将被 Avro 序列化:
value.converter=io.confluent.connect.avro.AvroConverter
这意味着您需要在 Streams 应用中使用 Avro 进行反序列化。详情请看这里:https://docs.confluent.io/current/streams/developer-guide/datatypes.html#avro