从中间主题将 KStream 与 KTable 连接会导致异常
Joining KStream with KTable from intermediate topic results in exception
我正在尝试使用 KTable 加入 KStream。没有连接,我可以毫无问题地阅读中间主题 "book-attribute-by-id"。
KTable 的示例消息:
{key: {id: 1}
value: {id: 1, attribute_name: "weight"}}
KStream 的示例消息:
{key: {id: 1},
value: {id: 1, book_id: 1, attribute_id: 1, value: 200}}
'final aggregation' 主题的期望输出:
{key: {id: 1},
value: {book_id: 1, attribute_name: "weight", value: 200}}
{key: {id: 1},
value: {book_id: 1, attribute_name: "number_of_pages", value: 450}}
这是代码
KStream<DefaultId, BookAttribute> bookAttributeStream = builder.stream(bookAttributeTopic, Consumed.with(defaultIdSerde, bookAttributeSerde));
KStream<DefaultId, BookValueInt> bookValueIntStream = builder.stream(bookValueIntTopic, Consumed.with(defaultIdSerde, bookValueIntSerde));
bookAttributeStream
.selectKey((k, v) -> k.getId())
.to("book-attribute-by-id", Produced.with(Serdes.Integer(), bookAttributeSerde));
KTable<Integer, BookAttribute> bookAttributeByIdTable = builder.table("book-attribute-by-id", Consumed.with(Serdes.Integer(), bookAttributeSerde));
// when the snippet below is commented out, consuming "book-attribute-by-id" works.
bookValueIntStream
.selectKey((k, v) -> v.getAttribute_id())
.join(bookAttributeByIdTable, (intValue, attribute) -> {
System.out.println("intValue: " + intValue);
System.out.println("attribute: " + attribute);
return new BookAttributeValue(intValue, attribute);
});
加入 KStream 和 KTable 时出现异常:
Exception in thread "xxx-StreamThread-1"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
topology building: stream-thread [xxx-StreamThread-1]Topic not found:
book-attribute-by-id at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:792)
我假设您使用的是 kafka-streams 1.0.0
问题是您必须为流创建输入主题。
在您的案例中,主题是:book-attribute-by-id
以及作为变量值的主题:bookAttributeTopic
、bookValueIntTopic
。
对于连接,Kafka Streams 必须确保连接主题中的分区数量相等。当它尝试获取主题的元数据时抛出异常:book-attribute-by-id
.
在 运行 您的应用程序之前,您必须手动创建 book-attribute-by-id
主题
在较新版本的 kafka-streams 中,在验证分区数之前会检查是否存在主题。
我正在尝试使用 KTable 加入 KStream。没有连接,我可以毫无问题地阅读中间主题 "book-attribute-by-id"。
KTable 的示例消息:
{key: {id: 1}
value: {id: 1, attribute_name: "weight"}}
KStream 的示例消息:
{key: {id: 1},
value: {id: 1, book_id: 1, attribute_id: 1, value: 200}}
'final aggregation' 主题的期望输出:
{key: {id: 1},
value: {book_id: 1, attribute_name: "weight", value: 200}}
{key: {id: 1},
value: {book_id: 1, attribute_name: "number_of_pages", value: 450}}
这是代码
KStream<DefaultId, BookAttribute> bookAttributeStream = builder.stream(bookAttributeTopic, Consumed.with(defaultIdSerde, bookAttributeSerde));
KStream<DefaultId, BookValueInt> bookValueIntStream = builder.stream(bookValueIntTopic, Consumed.with(defaultIdSerde, bookValueIntSerde));
bookAttributeStream
.selectKey((k, v) -> k.getId())
.to("book-attribute-by-id", Produced.with(Serdes.Integer(), bookAttributeSerde));
KTable<Integer, BookAttribute> bookAttributeByIdTable = builder.table("book-attribute-by-id", Consumed.with(Serdes.Integer(), bookAttributeSerde));
// when the snippet below is commented out, consuming "book-attribute-by-id" works.
bookValueIntStream
.selectKey((k, v) -> v.getAttribute_id())
.join(bookAttributeByIdTable, (intValue, attribute) -> {
System.out.println("intValue: " + intValue);
System.out.println("attribute: " + attribute);
return new BookAttributeValue(intValue, attribute);
});
加入 KStream 和 KTable 时出现异常:
Exception in thread "xxx-StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: stream-thread [xxx-StreamThread-1]Topic not found: book-attribute-by-id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:792)
我假设您使用的是 kafka-streams 1.0.0
问题是您必须为流创建输入主题。
在您的案例中,主题是:book-attribute-by-id
以及作为变量值的主题:bookAttributeTopic
、bookValueIntTopic
。
对于连接,Kafka Streams 必须确保连接主题中的分区数量相等。当它尝试获取主题的元数据时抛出异常:book-attribute-by-id
.
在 运行 您的应用程序之前,您必须手动创建 book-attribute-by-id
主题
在较新版本的 kafka-streams 中,在验证分区数之前会检查是否存在主题。