如何根据某些条件将字符串拆分为不同的kafka主题
How to split the string into different kafka topic based on some conditions
我正在尝试根据条件将字符串拆分为不同的 kafka 主题。
这是拓扑。
- 将字符串拆分为单词。
- 用条件匹配每个单词(这里是一组好词和一组坏词)
- 如果在字符串中找到至少 1 个来自 Bad words set 的单词,它将被发送到 Bad-string
topic 否则会发送到 Good-string topic.
问题:
Every string is going to only one topic. (Bad-string topic)
输入:
Your service was good.
He was angry and sad.
Your service was bad but still I am happy.
输出:
good-string(话题)
Your service was good. ( It contains Good words, "good" )
错误字符串(主题)
- He was angry and sad ( It contains Bad words, "angry" and "sad" )
- Your service was bad but still I am happy. (Though, there is a
Good word "happy" but there is atleast one Bad word "bad" )
代码如下:
@Configuration
@Slf4j
public class SplitSentence {
private static final Set<String> BAD_WORDS = Set.of("angry", "sad", "bad");
private static final Set<String> GOOD_WORDS = Set.of("happy", "good", "helpful");
@SuppressWarnings("unchecked")
@Bean
public KStream<String,String> windowCount(StreamsBuilder builder)
{
var stringSerde = Serdes.String();
var jsonSerde = new JsonSerde<>(CountingDemo.class);
((JsonDeserializer) jsonSerde.deserializer()).setUseTypeHeaders(false);
var input = builder.stream("counting",Consumed.with(stringSerde,jsonSerde));
var feedbackStreams = input.flatMap(splitWords()).branch(isGoodWord(), isBadWord());
boolean newString = feedbackStreams[1].toString().isEmpty();
if(newString)
input.to("good-string");
else
input.to("bad-string");
return input;
}
private Predicate<? super String, ? super String> isBadWord() {
return (key, value) -> BAD_WORDS.contains(value);
}
private Predicate<? super String, ? super String> isGoodWord() {
return (key, value) -> GOOD_WORDS.contains(value);
}
private KeyValueMapper<String, CountingDemo, Iterable<KeyValue<String,String>>> splitWords()
{
return (key,value) -> Arrays
.asList(value.getText().replaceAll("[^a-zA-Z ]", "").toLowerCase().split("\s+")).stream()
.distinct().map(word -> KeyValue.pair(value.getText(), word)).collect(Collectors.toList());
}
}
CountingDemo.java
public class CountingDemo {
private String name;
private String text;
}
我哪里错了?
有没有更好的逻辑?
if 语句始终为假,因为 KStream 对象的 .toString()
是它的元数据,永远不会为空。
如果你想在两个主题之间分割完整的原始字符串,你根本不应该平面化。
话虽这么说,好像你想要
var feedbackStreams = input.branch(hasGoodWords(), hasBadWords());
feedbackStreams[0].to("good-string");
feedbackStreams[1].to("bad-string");
这两个函数获取完整的输入消息并与集合进行比较,而不是给出单个单词。
虽然,我认为你只需要一个函数来捕获所有带有好词到good-string
和的消息bad-string
主题
的所有其他消息(没有good/bad,good/bad,还有一些不好的消息)
例如
var feedbackStreams = input.branch(this::hasOnlyGoodWords, (k,v) -> true);
feedbackStreams[0].to("good-string");
feedbackStreams[1].to("bad-string");
return input;
}
private boolean hasOnlyGoodWords(Object key, String value) {
String cleaned = value.getText().replaceAll("[^a-zA-Z ]", "").toLowerCase();
Set<String> uniqueWords = Arrays.stream(cleaned.split("\s+")).distinct().collect(toSet());
for (String s : BAD_WORDS) {
if (uniqueWords.contains(s)) return false;
}
uniqueWords.retainAll(GOOD_WORDS);
return uniqueWords.size() > 0;
}
我正在尝试根据条件将字符串拆分为不同的 kafka 主题。
这是拓扑。
- 将字符串拆分为单词。
- 用条件匹配每个单词(这里是一组好词和一组坏词)
- 如果在字符串中找到至少 1 个来自 Bad words set 的单词,它将被发送到 Bad-string
topic 否则会发送到 Good-string topic.
问题:
Every string is going to only one topic. (Bad-string topic)
输入:
Your service was good.
He was angry and sad.
Your service was bad but still I am happy.
输出:
good-string(话题)
Your service was good. ( It contains Good words, "good" )
错误字符串(主题)
- He was angry and sad ( It contains Bad words, "angry" and "sad" )
- Your service was bad but still I am happy. (Though, there is a Good word "happy" but there is atleast one Bad word "bad" )
代码如下:
@Configuration
@Slf4j
public class SplitSentence {
private static final Set<String> BAD_WORDS = Set.of("angry", "sad", "bad");
private static final Set<String> GOOD_WORDS = Set.of("happy", "good", "helpful");
@SuppressWarnings("unchecked")
@Bean
public KStream<String,String> windowCount(StreamsBuilder builder)
{
var stringSerde = Serdes.String();
var jsonSerde = new JsonSerde<>(CountingDemo.class);
((JsonDeserializer) jsonSerde.deserializer()).setUseTypeHeaders(false);
var input = builder.stream("counting",Consumed.with(stringSerde,jsonSerde));
var feedbackStreams = input.flatMap(splitWords()).branch(isGoodWord(), isBadWord());
boolean newString = feedbackStreams[1].toString().isEmpty();
if(newString)
input.to("good-string");
else
input.to("bad-string");
return input;
}
private Predicate<? super String, ? super String> isBadWord() {
return (key, value) -> BAD_WORDS.contains(value);
}
private Predicate<? super String, ? super String> isGoodWord() {
return (key, value) -> GOOD_WORDS.contains(value);
}
private KeyValueMapper<String, CountingDemo, Iterable<KeyValue<String,String>>> splitWords()
{
return (key,value) -> Arrays
.asList(value.getText().replaceAll("[^a-zA-Z ]", "").toLowerCase().split("\s+")).stream()
.distinct().map(word -> KeyValue.pair(value.getText(), word)).collect(Collectors.toList());
}
}
CountingDemo.java
public class CountingDemo {
private String name;
private String text;
}
我哪里错了?
有没有更好的逻辑?
if 语句始终为假,因为 KStream 对象的 .toString()
是它的元数据,永远不会为空。
如果你想在两个主题之间分割完整的原始字符串,你根本不应该平面化。
话虽这么说,好像你想要
var feedbackStreams = input.branch(hasGoodWords(), hasBadWords());
feedbackStreams[0].to("good-string");
feedbackStreams[1].to("bad-string");
这两个函数获取完整的输入消息并与集合进行比较,而不是给出单个单词。
虽然,我认为你只需要一个函数来捕获所有带有好词到good-string
和的消息bad-string
主题
例如
var feedbackStreams = input.branch(this::hasOnlyGoodWords, (k,v) -> true);
feedbackStreams[0].to("good-string");
feedbackStreams[1].to("bad-string");
return input;
}
private boolean hasOnlyGoodWords(Object key, String value) {
String cleaned = value.getText().replaceAll("[^a-zA-Z ]", "").toLowerCase();
Set<String> uniqueWords = Arrays.stream(cleaned.split("\s+")).distinct().collect(toSet());
for (String s : BAD_WORDS) {
if (uniqueWords.contains(s)) return false;
}
uniqueWords.retainAll(GOOD_WORDS);
return uniqueWords.size() > 0;
}