用于将 JSON 数组转换为 json 对象的 kafka 流

kafka streams for conversion of JSON arrays to json objects

我在正常 java 中有一个代码,用于将 JSON 数组转换为 JSON 对象,我需要将此正常 java 转换为 kafka 流...如下是我的

import java.io.*;
import java.util.*;
import java.lang.*;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class JsonParseTest {
    public static void main(String[] args) {
        try {
            JSONParser parser = new JSONParser();
            JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
             for (Object o : jsonArray) {
//to get the Json object
              JSONObject snap = (JSONObject) o;
              System.out.println(snap);
             }
}
 catch (FileNotFoundException ex) {
            ex.printStackTrace();
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (ParseException ex) {
            ex.printStackTrace();
        } catch (NullPointerException ex) {
            ex.printStackTrace();
        }
    }
}

如果有人帮我写逻辑部分的代码我可以继续,下面是我的逻辑部分至少我需要帮助

public class JsonParseTest {
    public static void main(String[] args) {
        try {
            JSONParser parser = new JSONParser();
            JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
             for (Object o : jsonArray) {
//to get the Json object
              JSONObject snap = (JSONObject) o;
              System.out.println(snap);
             }
}

如何在 kafka 流中编写相同的代码?有人可以帮忙吗?

我会推荐两个选项:

  • 您实现自己的 serializer/deserializer (Here is the javadoc)
  • 您可以将流处理为 (String,String) 流,并使用 flatMap 解析流中的每个元素并将其转换为 (String,JSONObject) 流:

    JSONParser parser = new JSONParser();
    stringStream.flatMap((k,v) -> {
        List<KeyValue<String,JSONOBject>> tmp = new ArrayList<KeyValue<String,JSONOBject>>();
        JSONArray jsonArray = (JSONArray) parser.parse(v);
        for (Object o : jsonArray) {
             JSONObject snap = (JSONObject) o;
             tmp.add(new KeyValue(k, snap));
        }     
        return tmp;
    });
    

这里我完全没有处理异常,你必须将 lambda 的代码包装成 try/catch。