用于将 JSON 数组转换为 json 对象的 kafka 流
kafka streams for conversion of JSON arrays to json objects
我在正常 java 中有一个代码,用于将 JSON 数组转换为 JSON 对象,我需要将此正常 java 转换为 kafka 流...如下是我的
import java.io.*;
import java.util.*;
import java.lang.*;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class JsonParseTest {
public static void main(String[] args) {
try {
JSONParser parser = new JSONParser();
JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
for (Object o : jsonArray) {
//to get the Json object
JSONObject snap = (JSONObject) o;
System.out.println(snap);
}
}
catch (FileNotFoundException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
} catch (ParseException ex) {
ex.printStackTrace();
} catch (NullPointerException ex) {
ex.printStackTrace();
}
}
}
如果有人帮我写逻辑部分的代码我可以继续,下面是我的逻辑部分至少我需要帮助
public class JsonParseTest {
public static void main(String[] args) {
try {
JSONParser parser = new JSONParser();
JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
for (Object o : jsonArray) {
//to get the Json object
JSONObject snap = (JSONObject) o;
System.out.println(snap);
}
}
如何在 kafka 流中编写相同的代码?有人可以帮忙吗?
我会推荐两个选项:
- 您实现自己的 serializer/deserializer (Here is the javadoc)
您可以将流处理为 (String,String) 流,并使用 flatMap 解析流中的每个元素并将其转换为 (String,JSONObject) 流:
JSONParser parser = new JSONParser();
stringStream.flatMap((k,v) -> {
List<KeyValue<String,JSONOBject>> tmp = new ArrayList<KeyValue<String,JSONOBject>>();
JSONArray jsonArray = (JSONArray) parser.parse(v);
for (Object o : jsonArray) {
JSONObject snap = (JSONObject) o;
tmp.add(new KeyValue(k, snap));
}
return tmp;
});
这里我完全没有处理异常,你必须将 lambda 的代码包装成 try/catch。
我在正常 java 中有一个代码,用于将 JSON 数组转换为 JSON 对象,我需要将此正常 java 转换为 kafka 流...如下是我的
import java.io.*;
import java.util.*;
import java.lang.*;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class JsonParseTest {
public static void main(String[] args) {
try {
JSONParser parser = new JSONParser();
JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
for (Object o : jsonArray) {
//to get the Json object
JSONObject snap = (JSONObject) o;
System.out.println(snap);
}
}
catch (FileNotFoundException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
} catch (ParseException ex) {
ex.printStackTrace();
} catch (NullPointerException ex) {
ex.printStackTrace();
}
}
}
如果有人帮我写逻辑部分的代码我可以继续,下面是我的逻辑部分至少我需要帮助
public class JsonParseTest {
public static void main(String[] args) {
try {
JSONParser parser = new JSONParser();
JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json"));
for (Object o : jsonArray) {
//to get the Json object
JSONObject snap = (JSONObject) o;
System.out.println(snap);
}
}
如何在 kafka 流中编写相同的代码?有人可以帮忙吗?
我会推荐两个选项:
- 您实现自己的 serializer/deserializer (Here is the javadoc)
您可以将流处理为 (String,String) 流,并使用 flatMap 解析流中的每个元素并将其转换为 (String,JSONObject) 流:
JSONParser parser = new JSONParser(); stringStream.flatMap((k,v) -> { List<KeyValue<String,JSONOBject>> tmp = new ArrayList<KeyValue<String,JSONOBject>>(); JSONArray jsonArray = (JSONArray) parser.parse(v); for (Object o : jsonArray) { JSONObject snap = (JSONObject) o; tmp.add(new KeyValue(k, snap)); } return tmp; });
这里我完全没有处理异常,你必须将 lambda 的代码包装成 try/catch。