我可以使用 Kafka 流读写不同类型的消息吗?
Can I use Kafka streams read and write messages of different types?
我正在编写一个使用 Kafka 流的应用程序。它从主题 A 读取,进行一些转换,然后写入主题 B。在转换过程中,值按键分组,因此输出键、值类型与输入值类型不同。
Kafka 流使用特定类型的 Serdes(例如 String serdes 序列化和反序列化字符串)进行序列化和反序列化,因此在数据转换后将不起作用。如何在 Streams API?
中定义不同的序列化器和反序列化器
当然可以
当您创建流、调用 groupBy 或将输出写入某个主题时,您可以提供 Serde
或 Serialized
。示例:
Serde<String> stringSerde = Serdes.String();
Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
Produced<String, YourCustomItem> produced = Produced.with(stringSerde, new JsonSerde<>(YourCustomItem.class));
KStream<String, String> kStream = streamsBuilder.stream("sourceTopicName", consumed);
KStream<String, YourCustomItem> transformedKStream = kStream.mapValues((key, value) -> new YourCustomItem());
transformedKStream.to("destinationTopicName", produced);
transformedKStream.groupByKey(Serialized.with(Serdes.String(), new JsonSerde<>(YourCustomItem.class)));
其中 JsonSerde
来自 spring-kafka
依赖项。
或者您可以使用以下 Serde
:
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
我正在编写一个使用 Kafka 流的应用程序。它从主题 A 读取,进行一些转换,然后写入主题 B。在转换过程中,值按键分组,因此输出键、值类型与输入值类型不同。 Kafka 流使用特定类型的 Serdes(例如 String serdes 序列化和反序列化字符串)进行序列化和反序列化,因此在数据转换后将不起作用。如何在 Streams API?
中定义不同的序列化器和反序列化器当然可以
当您创建流、调用 groupBy 或将输出写入某个主题时,您可以提供 Serde
或 Serialized
。示例:
Serde<String> stringSerde = Serdes.String();
Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
Produced<String, YourCustomItem> produced = Produced.with(stringSerde, new JsonSerde<>(YourCustomItem.class));
KStream<String, String> kStream = streamsBuilder.stream("sourceTopicName", consumed);
KStream<String, YourCustomItem> transformedKStream = kStream.mapValues((key, value) -> new YourCustomItem());
transformedKStream.to("destinationTopicName", produced);
transformedKStream.groupByKey(Serialized.with(Serdes.String(), new JsonSerde<>(YourCustomItem.class)));
其中 JsonSerde
来自 spring-kafka
依赖项。
或者您可以使用以下 Serde
:
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);