Spring 云流处理器单输入,多行输出
Spring Cloud Stream Processor Single Input, Multiple Row Output
我一直在尝试从 kafka 流式传输我的 json 事件,将其展平,然后使用 Spring Cloud Stream 将其推回另一个主题。
输入:
{
"major": "Computer Science",
"books": [{
"title": "Learn C",
"author": "Prof C"
},
{
"title": "Learn Java",
"author": "Java Expert"
},{
"title": "Learn Python",
"author": "Python Master"
},]
}
展平过程:
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String(String event){
JSONArray result = new JSONArray();
JSONObject rawEvent = new JSONObject(event);
String major = rawEvent.get("major");
JSONArray books = rawEvent.get("books");
for (int i = 0; i < books.length; i++){
JSONObject book = books.get(i);
book.put("major", major);
result.put(book)
}
return result.toString();
}
只生产:
[{"major":"Computer Science", "books.title":"Learn C", "books.author":"Prof C"},
{"major":"Computer Science", "books.title":"Learn Java", "books.author":"Java Expert"},
{"major":"Computer Science", "books.title":"Learn Python", "books.author":"Python Master"}]
我的问题是如何让它变成这样
{"major":"Computer Science", "books.title":"Learn C", "books.author":"Prof C"}
{"major":"Computer Science", "books.title":"Learn Java", "books.author":"Java Expert"}
{"major":"Computer Science", "books.title":"Learn Python", "books.author":"Python Master"}
所以我可以推回 mutilple JSONObject 而不是像我所做的那样是单个 JSONArray?
Afaik,Spring Cloud Stream 输出只是一个事件,不符合我上面的情况,无法向 Kafka 产生 3 个事件。
谢谢。
您生成的是一个有效的 JSON 数组。您尝试生成的内容无效 JSON。但只要你同意,你可以简单地使用 StringBuilder 并将每个 book.toString() 附加到它(而不是 JSONArray)。这至少会产生你正在寻找的东西。
此外,这确实不是与 Spring Cloud Stream 相关的问题,而是一般性的 Java/JSON 问题,因此我想 Whosebug 上有更具体的论坛可以为您提供更好的答案。
确实是我误会了。
所以在这种情况下,我建议引入 Spring Integration and Enterprise Integration Patterns。您基本上有一个明确的 Splitter 案例。那里有很多例子,但这里有一个简短的片段:
@Splitter
public List<String> split(String input) {
// basically split your input into a collection and splitter will send out each element as a separate message
}
希望有帮助
我一直在尝试从 kafka 流式传输我的 json 事件,将其展平,然后使用 Spring Cloud Stream 将其推回另一个主题。
输入:
{
"major": "Computer Science",
"books": [{
"title": "Learn C",
"author": "Prof C"
},
{
"title": "Learn Java",
"author": "Java Expert"
},{
"title": "Learn Python",
"author": "Python Master"
},]
}
展平过程:
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String(String event){
JSONArray result = new JSONArray();
JSONObject rawEvent = new JSONObject(event);
String major = rawEvent.get("major");
JSONArray books = rawEvent.get("books");
for (int i = 0; i < books.length; i++){
JSONObject book = books.get(i);
book.put("major", major);
result.put(book)
}
return result.toString();
}
只生产:
[{"major":"Computer Science", "books.title":"Learn C", "books.author":"Prof C"},
{"major":"Computer Science", "books.title":"Learn Java", "books.author":"Java Expert"},
{"major":"Computer Science", "books.title":"Learn Python", "books.author":"Python Master"}]
我的问题是如何让它变成这样
{"major":"Computer Science", "books.title":"Learn C", "books.author":"Prof C"}
{"major":"Computer Science", "books.title":"Learn Java", "books.author":"Java Expert"}
{"major":"Computer Science", "books.title":"Learn Python", "books.author":"Python Master"}
所以我可以推回 mutilple JSONObject 而不是像我所做的那样是单个 JSONArray?
Afaik,Spring Cloud Stream 输出只是一个事件,不符合我上面的情况,无法向 Kafka 产生 3 个事件。
谢谢。
您生成的是一个有效的 JSON 数组。您尝试生成的内容无效 JSON。但只要你同意,你可以简单地使用 StringBuilder 并将每个 book.toString() 附加到它(而不是 JSONArray)。这至少会产生你正在寻找的东西。
此外,这确实不是与 Spring Cloud Stream 相关的问题,而是一般性的 Java/JSON 问题,因此我想 Whosebug 上有更具体的论坛可以为您提供更好的答案。
确实是我误会了。
所以在这种情况下,我建议引入 Spring Integration and Enterprise Integration Patterns。您基本上有一个明确的 Splitter 案例。那里有很多例子,但这里有一个简短的片段:
@Splitter
public List<String> split(String input) {
// basically split your input into a collection and splitter will send out each element as a separate message
}
希望有帮助