Kafka-Streaming:如何收集消息对并写入新主题
Kafka-Streaming: How to collect pairs of messages and write to a new topic
这是kafka-streaming的初学者问题。
您将如何使用 java kafka-streaming 库收集消息对并将它们写入新的输出主题?
我在想这样的事情:
private void accumulateTwo(KStream<String, String> messages) {
Optional<String> accumulator = Optional.empty();
messages.mapValues(value -> {
if (accumulator.isPresent()) {
String tmp = accumulator.get();
accumulator = Optional.empty();
return Optional.of(new Tuple<>(tmp, value));
}
else {
accumulator = Optional.of(value);
return Optional.empty();
}
}).filter((key, value) -> value.isPresent()).to("pairs");
但这行不通,因为 Java Lambda 表达式中的变量必须是最终的。
有什么想法吗?
你应该会写一个累加器class
class Accumulator implements ValueMapper<String, Optional<Tuple<String>>> {
private String key;
public Optional<Tuple<String>> get(String item) {
if (key == null) {
key = item;
return Optional.empty();
}
Optional<Tuple<String>> result = Optional.of(new Tuple<>(key, item));
key = null;
return result;
}
}
然后用
处理
messages.mapValues(new Accumulator())
.filter(Optional::isPresent) // I don't think your filter is correct
.to("pairs");
编辑:
正如评论中所建议的,还需要三个额外的步骤:
Transformer
必须明确地将其状态存储在状态存储中。它将从 ProcessorContext
获取对状态存储的引用,它在 init
方法中传递。
- 状态存储必须在
StreamsBuilder
中注册
- 状态存储的名称必须在
transform
方法中传递。
在此示例中,存储我们看到的最后一条消息就足够了。我们为此使用了 KeyValueStore
,它在每个时间点都有零个或一个条目。
public class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {
private String storeName;
public PairTransformerSupplier(String storeName) {
this.storeName = storeName;
}
@Override
public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {
return new PairTransformer<>(storeName);
}
}
public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {
private ProcessorContext context;
private String storeName;
private KeyValueStore<Integer, V> stateStore;
public PairTransformer(String storeName) {
this.storeName = storeName;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
stateStore = (KeyValueStore<Integer, V>) context.getStateStore(storeName);
}
@Override
public KeyValue<K, Pair<V, V>> transform(K key, V value) {
// 1. Update the store to remember the last message seen.
if (stateStore.get(1) == null) {
stateStore.put(1, value); return null;
}
KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(stateStore.get(1), value));
stateStore.put(1, null);
return result;
}
@Override
public void close() { }
}
public KStream<String, String> sampleStream(StreamsBuilder builder) {
KStream<String, String> messages = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// 2. Create the state store and register it with the streams builder.
KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(stateStoreName);
StoreBuilder storeBuilder = new KeyValueStoreBuilder<>(
store,
new Serdes.IntegerSerde(),
new Serdes.StringSerde(),
Time.SYSTEM
);
builder.addStateStore(storeBuilder);
transformToPairs(messages);
return messages;
}
private void transformToPairs(KStream<String, String> messages) {
// 3. reference the name of the state store when calling transform(...)
KStream<String, Pair<String, String>> pairs = messages.transform(
new PairTransformerSupplier<>(),
stateStoreName
);
KStream<String, Pair<String, String>> filtered = pairs.filter((key, value) -> value != null);
KStream<String, String> serialized = filtered.mapValues(Pair::toString);
serialized.to(outputTopic);
}
可以使用控制台消费者查看对状态存储的更改:
./bin/kafka-console-consumer --topic <changelog-topic-name> --bootstrap-server localhost:9092
完整源代码在这里:https://github.com/1123/spring-kafka-stream-with-state-store
原答案:
org.apache.kafka.streams.kstream.ValueMapper
接口的 JavaDoc 声明它用于无状态逐记录转换,而 org.apache.kafka.streams.kstream.Transformer
接口是
for stateful mapping of an input record to zero, one, or multiple new output records.
因此我想 Transformer
接口是收集消息对的合适选择。这可能只在流应用程序失败和重新启动的情况下相关,这样它们就可以从 Kafka 恢复状态。
因此,这是另一个基于 org.apache.kafka.streams.kstream.Transformer
接口的解决方案:
class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {
@Override
public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {
return new PairTransformer<>();
}
}
public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {
private V left;
@Override
public void init(ProcessorContext context) {
left = null;
}
@Override
public KeyValue<K, Pair<V, V>> transform(K key, V value) {
if (left == null) { left = value; return null; }
KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(left, value));
left = null;
return result;
}
@Override
public KeyValue<K, Pair<V, V>> punctuate(long timestamp) {
return null;
}
public void close() { }
}
然后按如下方式使用 PairTransformerSupplier:
private void accumulateTwo(KStream<String, String> messages) {
messages.transform(new PairTransformerSupplier<>())
.filter((key, value) -> value != null)
.mapValues(Pair::toString)
.to("pairs");
}
在单个进程中针对具有单个分区的主题尝试这两种解决方案会产生完全相同的结果。我没有尝试过具有多个分区和多个流消费者的主题。
这是kafka-streaming的初学者问题。
您将如何使用 java kafka-streaming 库收集消息对并将它们写入新的输出主题?
我在想这样的事情:
private void accumulateTwo(KStream<String, String> messages) {
Optional<String> accumulator = Optional.empty();
messages.mapValues(value -> {
if (accumulator.isPresent()) {
String tmp = accumulator.get();
accumulator = Optional.empty();
return Optional.of(new Tuple<>(tmp, value));
}
else {
accumulator = Optional.of(value);
return Optional.empty();
}
}).filter((key, value) -> value.isPresent()).to("pairs");
但这行不通,因为 Java Lambda 表达式中的变量必须是最终的。
有什么想法吗?
你应该会写一个累加器class
class Accumulator implements ValueMapper<String, Optional<Tuple<String>>> {
private String key;
public Optional<Tuple<String>> get(String item) {
if (key == null) {
key = item;
return Optional.empty();
}
Optional<Tuple<String>> result = Optional.of(new Tuple<>(key, item));
key = null;
return result;
}
}
然后用
处理messages.mapValues(new Accumulator())
.filter(Optional::isPresent) // I don't think your filter is correct
.to("pairs");
编辑:
正如评论中所建议的,还需要三个额外的步骤:
Transformer
必须明确地将其状态存储在状态存储中。它将从ProcessorContext
获取对状态存储的引用,它在init
方法中传递。- 状态存储必须在
StreamsBuilder
中注册
- 状态存储的名称必须在
transform
方法中传递。
在此示例中,存储我们看到的最后一条消息就足够了。我们为此使用了 KeyValueStore
,它在每个时间点都有零个或一个条目。
public class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {
private String storeName;
public PairTransformerSupplier(String storeName) {
this.storeName = storeName;
}
@Override
public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {
return new PairTransformer<>(storeName);
}
}
public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {
private ProcessorContext context;
private String storeName;
private KeyValueStore<Integer, V> stateStore;
public PairTransformer(String storeName) {
this.storeName = storeName;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
stateStore = (KeyValueStore<Integer, V>) context.getStateStore(storeName);
}
@Override
public KeyValue<K, Pair<V, V>> transform(K key, V value) {
// 1. Update the store to remember the last message seen.
if (stateStore.get(1) == null) {
stateStore.put(1, value); return null;
}
KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(stateStore.get(1), value));
stateStore.put(1, null);
return result;
}
@Override
public void close() { }
}
public KStream<String, String> sampleStream(StreamsBuilder builder) {
KStream<String, String> messages = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// 2. Create the state store and register it with the streams builder.
KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(stateStoreName);
StoreBuilder storeBuilder = new KeyValueStoreBuilder<>(
store,
new Serdes.IntegerSerde(),
new Serdes.StringSerde(),
Time.SYSTEM
);
builder.addStateStore(storeBuilder);
transformToPairs(messages);
return messages;
}
private void transformToPairs(KStream<String, String> messages) {
// 3. reference the name of the state store when calling transform(...)
KStream<String, Pair<String, String>> pairs = messages.transform(
new PairTransformerSupplier<>(),
stateStoreName
);
KStream<String, Pair<String, String>> filtered = pairs.filter((key, value) -> value != null);
KStream<String, String> serialized = filtered.mapValues(Pair::toString);
serialized.to(outputTopic);
}
可以使用控制台消费者查看对状态存储的更改:
./bin/kafka-console-consumer --topic <changelog-topic-name> --bootstrap-server localhost:9092
完整源代码在这里:https://github.com/1123/spring-kafka-stream-with-state-store
原答案:
org.apache.kafka.streams.kstream.ValueMapper
接口的 JavaDoc 声明它用于无状态逐记录转换,而 org.apache.kafka.streams.kstream.Transformer
接口是
for stateful mapping of an input record to zero, one, or multiple new output records.
因此我想 Transformer
接口是收集消息对的合适选择。这可能只在流应用程序失败和重新启动的情况下相关,这样它们就可以从 Kafka 恢复状态。
因此,这是另一个基于 org.apache.kafka.streams.kstream.Transformer
接口的解决方案:
class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {
@Override
public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {
return new PairTransformer<>();
}
}
public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {
private V left;
@Override
public void init(ProcessorContext context) {
left = null;
}
@Override
public KeyValue<K, Pair<V, V>> transform(K key, V value) {
if (left == null) { left = value; return null; }
KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(left, value));
left = null;
return result;
}
@Override
public KeyValue<K, Pair<V, V>> punctuate(long timestamp) {
return null;
}
public void close() { }
}
然后按如下方式使用 PairTransformerSupplier:
private void accumulateTwo(KStream<String, String> messages) {
messages.transform(new PairTransformerSupplier<>())
.filter((key, value) -> value != null)
.mapValues(Pair::toString)
.to("pairs");
}
在单个进程中针对具有单个分区的主题尝试这两种解决方案会产生完全相同的结果。我没有尝试过具有多个分区和多个流消费者的主题。