Kafka 流式传输来自给定主题的 KTable 创建
Kafka streams KTable creation from a given topic
我正在做一个项目,但卡在了 KTable 上。
我想从一个主题中获取记录并将它们放入 KTable(存储)中,这样我就有 1 个记录对应 1 个键。
static KafkaStreams streams;
final Serde<Long> longSerde = Serdes.Long();
final Serde<byte[]> byteSerde = Serdes.ByteArray();
static String topicName;
static String storeName;
final StreamsBuilder builder = new StreamsBuilder();
KStream<Long, byte[]> streamed = builder.stream(topicName, Consumed.with(longSerde, byteSerde));
KTable<Long, byte[]> records = streamed.groupByKey().reduce(
new Reducer<Long>() {
@Override
public Long apply(Long aggValue, Long newValue) {
return newValue;
}
},
storeName);
这是我认为最接近的答案。
您的方法是正确的,但您需要使用正确的 serdes。
在.reduce()函数中,值类型应该是byte[]
。
KStream<Long, byte[]> streamed = builder.stream(topicName, Consumed.with(longSerde, byteSerde));
KTable<Long, byte[]> records = streamed.groupByKey().reduce(
new Reducer<byte[]>() {
@Override
public byte[] apply(byte[] aggValue, byte[] newValue) {
return newValue;
}
},
Materialized.as(storename).with(longSerde,byteSerde));
我正在做一个项目,但卡在了 KTable 上。
我想从一个主题中获取记录并将它们放入 KTable(存储)中,这样我就有 1 个记录对应 1 个键。
static KafkaStreams streams;
final Serde<Long> longSerde = Serdes.Long();
final Serde<byte[]> byteSerde = Serdes.ByteArray();
static String topicName;
static String storeName;
final StreamsBuilder builder = new StreamsBuilder();
KStream<Long, byte[]> streamed = builder.stream(topicName, Consumed.with(longSerde, byteSerde));
KTable<Long, byte[]> records = streamed.groupByKey().reduce(
new Reducer<Long>() {
@Override
public Long apply(Long aggValue, Long newValue) {
return newValue;
}
},
storeName);
这是我认为最接近的答案。
您的方法是正确的,但您需要使用正确的 serdes。
在.reduce()函数中,值类型应该是byte[]
。
KStream<Long, byte[]> streamed = builder.stream(topicName, Consumed.with(longSerde, byteSerde));
KTable<Long, byte[]> records = streamed.groupByKey().reduce(
new Reducer<byte[]>() {
@Override
public byte[] apply(byte[] aggValue, byte[] newValue) {
return newValue;
}
},
Materialized.as(storename).with(longSerde,byteSerde));