Kafka Streams - 分配密钥
Kafka Streams - assigning keys
我正在查看一些使用 Kafka 流的基本示例。我注意到没有设置键。我正在尝试为每条记录设置一个唯一的键。一行文本被分成记录——一条记录是一行中的一个词。我目前拥有的脚本将相同的密钥附加到每条记录。我希望每条记录都有一个唯一的键,但我看不到放置它的好地方。非常感谢一些帮助。这是我现在拥有的:
KStream<String, String> source = builder.stream(INPUT_TOPIC);
KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String line) {
return Arrays.asList(line.split("\W+"));
}
});
Long key = (long)(Math.random()*9000)+1000;
KStream<String, String> wordsKeyed = words.map((k, v) -> new KeyValue<>("xyx"+key, v));
wordsKeyed.to(OUTPUT_TOPIC);
它附加相同的密钥,因为您只计算一个随机值。
如果您希望每个密钥都是随机的,请使用new KeyValue<>("xyx" + (Math.random()*9000+1000), ...)
如果您希望每个密钥都是 唯一的(因为随机数可以重叠),请使用 AtomicInteger
和 incrementAndGet()
我正在查看一些使用 Kafka 流的基本示例。我注意到没有设置键。我正在尝试为每条记录设置一个唯一的键。一行文本被分成记录——一条记录是一行中的一个词。我目前拥有的脚本将相同的密钥附加到每条记录。我希望每条记录都有一个唯一的键,但我看不到放置它的好地方。非常感谢一些帮助。这是我现在拥有的:
KStream<String, String> source = builder.stream(INPUT_TOPIC);
KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String line) {
return Arrays.asList(line.split("\W+"));
}
});
Long key = (long)(Math.random()*9000)+1000;
KStream<String, String> wordsKeyed = words.map((k, v) -> new KeyValue<>("xyx"+key, v));
wordsKeyed.to(OUTPUT_TOPIC);
它附加相同的密钥,因为您只计算一个随机值。
如果您希望每个密钥都是随机的,请使用new KeyValue<>("xyx" + (Math.random()*9000+1000), ...)
如果您希望每个密钥都是 唯一的(因为随机数可以重叠),请使用 AtomicInteger
和 incrementAndGet()