Spring 云流处理器单输入,多行输出

Spring Cloud Stream Processor Single Input, Multiple Row Output

我一直在尝试从 kafka 流式传输我的 json 事件,将其展平,然后使用 Spring Cloud Stream 将其推回另一个主题。

输入:

{
    "major": "Computer Science",
    "books": [{
        "title": "Learn C",
        "author": "Prof C"
    },
    {
        "title": "Learn Java",
        "author": "Java Expert"
    },{
        "title": "Learn Python",
        "author": "Python Master"
    },]
}

展平过程:

@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String(String event){
    JSONArray result = new JSONArray();

    JSONObject rawEvent = new JSONObject(event);

    String major = rawEvent.get("major");
    JSONArray books = rawEvent.get("books");

    for (int i = 0; i < books.length; i++){
        JSONObject book = books.get(i);
        book.put("major", major);
        result.put(book)
    }

    return result.toString();
}

只生产:

    [{"major":"Computer Science", "books.title":"Learn C", "books.author":"Prof C"}, 
{"major":"Computer Science", "books.title":"Learn Java", "books.author":"Java Expert"}, 
{"major":"Computer Science", "books.title":"Learn Python", "books.author":"Python Master"}]

我的问题是如何让它变成这样

{"major":"Computer Science", "books.title":"Learn C", "books.author":"Prof C"}
{"major":"Computer Science", "books.title":"Learn Java", "books.author":"Java Expert"}
{"major":"Computer Science", "books.title":"Learn Python", "books.author":"Python Master"}

所以我可以推回 mutilple JSONObject 而不是像我所做的那样是单个 JSONArray?

Afaik,Spring Cloud Stream 输出只是一个事件,不符合我上面的情况,无法向 Kafka 产生 3 个事件。

谢谢。

您生成的是一个有效的 JSON 数组。您尝试生成的内容无效 JSON。但只要你同意,你可以简单地使用 StringBuilder 并将每个 book.toString() 附加到它(而不是 JSONArray)。这至少会产生你正在寻找的东西。

此外,这确实不是与 Spring Cloud Stream 相关的问题,而是一般性的 Java/JSON 问题,因此我想 Whosebug 上有更具体的论坛可以为您提供更好的答案。

确实是我误会了。 所以在这种情况下,我建议引入 Spring Integration and Enterprise Integration Patterns。您基本上有一个明确的 Splitter 案例。那里有很多例子,但这里有一个简短的片段: @Splitter public List<String> split(String input) { // basically split your input into a collection and splitter will send out each element as a separate message } 希望有帮助