如何根据某些条件将字符串拆分为不同的kafka主题

How to split the string into different kafka topic based on some conditions

我正在尝试根据条件将字符串拆分为不同的 kafka 主题。
这是拓扑。

  1. 将字符串拆分为单词。
  2. 用条件匹配每个单词(这里是一组好词和一组坏词)
  3. 如果在字符串中找到至少 1 个来自 Bad words set 的单词,它将被发送到 Bad-string
    topic 否则会发送到 Good-string topic.

问题:

Every string is going to only one topic. (Bad-string topic)

输入:

  1. Your service was good.

  2. He was angry and sad.

  3. Your service was bad but still I am happy.

输出:
good-string(话题)

Your service was good. ( It contains Good words, "good" )

错误字符串(主题)

  1. He was angry and sad ( It contains Bad words, "angry" and "sad" )
  2. Your service was bad but still I am happy. (Though, there is a Good word "happy" but there is atleast one Bad word "bad" )

代码如下:

@Configuration
@Slf4j
public class SplitSentence {
    
    private static final Set<String> BAD_WORDS = Set.of("angry", "sad", "bad");
    private static final Set<String> GOOD_WORDS = Set.of("happy", "good", "helpful");

    @SuppressWarnings("unchecked")
    @Bean
    public KStream<String,String> windowCount(StreamsBuilder builder)
    {
        var stringSerde = Serdes.String();
        var jsonSerde = new JsonSerde<>(CountingDemo.class);

        ((JsonDeserializer) jsonSerde.deserializer()).setUseTypeHeaders(false);

        var input = builder.stream("counting",Consumed.with(stringSerde,jsonSerde));

        var feedbackStreams = input.flatMap(splitWords()).branch(isGoodWord(), isBadWord());
        
        boolean newString = feedbackStreams[1].toString().isEmpty();

        if(newString)
            input.to("good-string");
        else    
            input.to("bad-string");

        return input;
    }

    private Predicate<? super String, ? super String> isBadWord() {
        return (key, value) -> BAD_WORDS.contains(value);
    }

    private Predicate<? super String, ? super String> isGoodWord() {
        return (key, value) -> GOOD_WORDS.contains(value);
    }

    private KeyValueMapper<String, CountingDemo, Iterable<KeyValue<String,String>>> splitWords() 
    {
        return (key,value) -> Arrays
                            .asList(value.getText().replaceAll("[^a-zA-Z ]", "").toLowerCase().split("\s+")).stream()
                            .distinct().map(word -> KeyValue.pair(value.getText(), word)).collect(Collectors.toList());
    }
}

   
     

CountingDemo.java

public class CountingDemo {
    
    private String name;
    private String text;
}    
  

我哪里错了?
有没有更好的逻辑?

if 语句始终为假,因为 KStream 对象的 .toString() 是它的元数据,永远不会为空。

如果你想在两个主题之间分割完整的原始字符串,你根本不应该平面化。

话虽这么说,好像你想要

var feedbackStreams = input.branch(hasGoodWords(), hasBadWords());

feedbackStreams[0].to("good-string");
feedbackStreams[1].to("bad-string");

这两个函数获取完整的输入消息并与集合进行比较,而不是给出单个单词。
虽然,我认为你只需要一个函数来捕获所有带有好词good-string的消息bad-string 主题

的所有其他消息(没有good/bad,good/bad,还有一些不好的消息)

例如

    var feedbackStreams = input.branch(this::hasOnlyGoodWords, (k,v) -> true);
    feedbackStreams[0].to("good-string");
    feedbackStreams[1].to("bad-string");
    return input;
}

private boolean hasOnlyGoodWords(Object key, String value) {
    String cleaned = value.getText().replaceAll("[^a-zA-Z ]", "").toLowerCase();
    Set<String> uniqueWords = Arrays.stream(cleaned.split("\s+")).distinct().collect(toSet());
    for (String s : BAD_WORDS) {
        if (uniqueWords.contains(s)) return false;
    }
    uniqueWords.retainAll(GOOD_WORDS);
    return uniqueWords.size() > 0;
}