如何使用 Apache Beam 和 Jackson 解析常规(不是换行符分隔)json?

How to parse regular (not newline delimited) json with Apache Beam and Jackson?

我正在尝试学习如何使用 Apache Beam 和 Jackson 将 JSON 数据解析为 CSV 格式。我从一个非常简单的 JSON 文件开始:

{
    "firstName": "John", 
    "lastName": "Smith", 
    "isAlive": true, 
    "age": 27
}

我有对应的POJO结构:

import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public class Person implements Serializable {
    
    private String firstName;
    private String lastName;
    private int age;

    public Person() {}

    public String getFirstName() {
        return firstName;
    }

   ... getters & setters ...

但是,当我尝试解析此 json 时,出现格式错误:

Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected close marker '}': expected ']' (for root starting at [Source: }; line: 1, column: 0])
 at [Source: }; line: 1, column: 2]

我通过将 json 转换为这种格式解决了这个问题:

{"firstName": "John", "lastName": "Smith", "isAlive": true, "age": 27}

我最终的需要是处理普通的旧 json。有没有办法做到这一点,如果有,怎么做?

Apache Beam 代码就是这个简单的管道:

public class DataToModel {

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class);
        Pipeline p = Pipeline.create(options);

        // read data from json
        PCollection<String>  json = p.apply(TextIO.read().from("src/main/resources/test.json"));
        PCollection<Person> person = json
            .apply(ParseJsons.of(Person.class))
            .setCoder(SerializableCoder.of(Person.class));

        // parse json
        PCollection<String> names = person.apply(MapElements
            .into(TypeDescriptors.strings())
            .via(Person::getFirstName)
        );

        // write information to file.
        names.apply(TextIO.write().to("src/main/resources/test_out"));

        p.run().waitUntilFinish();

    }

您可以使用org.json库;简单易用。

请记住(在转换或使用 getJSONObject 和 getJSONArray 等方法时)在 JSON 表示法中 [ … ] 表示一个数组,所以库会将其解析为 JSONArray { … } 表示一个对象,所以库会将其解析为 JSONObject

你可以看到更多information关于。

您可以在下面看到一个简单的例子:

JSON 文件:

{
       "pageInfo": {
             "pageName": "abc",
             "pagePic": "http://example.com/content.jpg"
        },
        "posts": [
             {
                  "post_id": "123456789012_123456789012",
                  "actor_id": "1234567890",
                  "picOfPersonWhoPosted": "http://example.com/photo.jpg",
                  "nameOfPersonWhoPosted": "Jane Doe",
                  "message": "Sounds cool. Can't wait to see it!",
                  "likesCount": "2",
                  "comments": [],
                  "timeOfPost": "1234567890"
             }
        ]
    }

代码示例:

import org.json.*;

String jsonString = ... ; //assign your JSON String here
JSONObject obj = new JSONObject(jsonString);
String pageName = obj.getJSONObject("pageInfo").getString("pageName");

JSONArray arr = obj.getJSONArray("posts"); // notice that `"posts": [...]`
for (int i = 0; i < arr.length(); i++)
{
    String post_id = arr.getJSONObject(i).getString("post_id");
    ......
}

您可以在此处找到更多示例:Parse JSON in Java

Downloadable jar.

问题是您正在使用 TextIO.read() 读取 json 文件。 TextIO 将文本文件的每一行读入一个单独的元素,因此多行 JSON 对象被拆分为多个元素。这意味着您的解析函数尝试解析 JSON 字符串,例如 };。这也解释了为什么如果您将对象完全格式化在一行中它会成功。

根据可用的方法,您可以采用两种方法。

  1. 如果您的 JSON 文件可能,您可以使用 withDelimiter 方法来使用默认换行符之外的自定义分隔符。然而,这非常脆弱,需要非常具体地格式化您的文件。

  2. 您可以从 TextIO 切换到 FileIO,并将每个文件读入单个字符串以发送到 ParseJsons。这是稍微多一点的工作,但远没有那么脆弱,这是我推荐的。