如何序列化 JSON 并将其写入文件?

How do I serialize and write JSON to a file?

所以我写了一个 apache beam 管道读取一个包含 99 个其他文件的文件计算校验和并创建文件的键值对及其校验和我需要做的是写这些键值对manifest.json 文件我目前 运行 遇到一些序列化问题,任何建议和帮助都会很棒。

这是我的代码:

public class BeamPipeline {
    private static final Logger log = LoggerFactory.getLogger(BeamPipeline.class);
    public static interface MyOptions extends PipelineOptions {

        @Description("Input Path(with gs:// prefix)")
        String getInput();
        void setInput(String value);
    }

    public static void main(String[] args) {

        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
        Pipeline p = Pipeline.create(options);
        JsonObject obj = new JsonObject();
        File dir = new File(options.getInput());
        for (File file : dir.listFiles()) {
                String inputString = file.toString();
                p
                        .apply("Match Files", FileIO.match().filepattern(inputString))
                        .apply("Read Files", FileIO.readMatches())
                        .apply(MapElements.via(new SimpleFunction<FileIO.ReadableFile, KV<String, String>>() {
                            public KV<String, String> apply(FileIO.ReadableFile file) {
                                String temp = null;
                                try {
                                    temp = file.readFullyAsUTF8String();
                                } catch (IOException e) {

                                }
                                String sha256hex = org.apache.commons.codec.digest.DigestUtils.sha256Hex(temp);

                                obj.addProperty(temp, sha256hex);
                                String json = obj.toString();

                                try (FileWriter fileWriter = new FileWriter("./manifest.json")) {
                                    fileWriter.write(json);
                                } catch (IOException e) {

                                }

                                return KV.of(file.getMetadata().resourceId().toString(), sha256hex);

                            }
                        }))
                        .apply("Print", ParDo.of(new DoFn<KV<String, String>, Void>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) {


                                log.info(String.format("File: %s, SHA-256 %s", c.element().getKey(), c.element().getValue()));

                            }
                        }));
                }
            p.run();
        }
}

这是我目前的错误:

"main" java.lang.IllegalArgumentException: unable to serialize DoFnAndMainOutput{doFn=org.apache.beam.sdk.transforms.MapElements@50756c76, mainOutputTag=Tag<output>}
Caused by: java.io.NotSerializableException: com.google.gson.JsonObject

DoFns 与从 Dofn 访问的所有对象一起序列化。 JsonObject 不可序列化。它们由 DoFn 创建并在 DoFn 中引用,这使得 DoFn 不可序列化。

您可以在 DoFn 中创建 JsonObject 以避免此序列化问题。

public class BeamPipeline {
    private static final Logger log = LoggerFactory.getLogger(BeamPipeline.class);
    public static interface MyOptions extends PipelineOptions {

        @Description("Input Path(with gs:// prefix)")
        String getInput();
        void setInput(String value);
    }

    public static void main(String[] args) {

        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
        Pipeline p = Pipeline.create(options);
        File dir = new File(options.getInput());
        for (File file : dir.listFiles()) {
                String inputString = file.toString();
                p
                        .apply("Match Files", FileIO.match().filepattern(inputString))
                        .apply("Read Files", FileIO.readMatches())
                        .apply(MapElements.via(new SimpleFunction<FileIO.ReadableFile, KV<String, String>>() {
                            public KV<String, String> apply(FileIO.ReadableFile file) {
                                String temp = null;
                                try {
                                    temp = file.readFullyAsUTF8String();
                                } catch (IOException e) {

                                }
                                String sha256hex = org.apache.commons.codec.digest.DigestUtils.sha256Hex(temp);

                                JsonObject obj = new JsonObject();
                                obj.addProperty(temp, sha256hex);
                                String json = obj.toString();

                                try (FileWriter fileWriter = new FileWriter("./manifest.json")) {
                                    fileWriter.write(json);
                                } catch (IOException e) {

                                }

                                return KV.of(file.getMetadata().resourceId().toString(), sha256hex);

                            }
                        }))
                        .apply("Print", ParDo.of(new DoFn<KV<String, String>, Void>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) {


                                log.info(String.format("File: %s, SHA-256 %s", c.element().getKey(), c.element().getValue()));

                            }
                        }));
                }
            p.run();
        }
}