如何在 Apache Beam 中为 PCollection<List<String>> 设置编码器?

How do I set the coder for a PCollection<List<String>> in Apache Beam?

我正在自学 Apache Beam,专门用于解析 JSON。我能够创建一个将 JSON 解析为 POJO 并将 POJO 解析为 CSV 的简单示例。它要求我使用 .setCoder() 对于我简单的 POJO class.

        pipeline
            .apply("Read source JSON file.", TextIO.read().from(options.getInput()))
            .apply("Parse to POJO matching schema", ParseJsons.of(Person.class))
            .setCoder(SerializableCoder.of(Person.class))
            .apply("Create comma delimited string", new PersonToCsvRow())
            .apply("Write out to file", TextIO.write().to(options.getOutput())
                .withoutSharding());

问题

现在我正尝试使用一些自定义转换跳过解析的 POJO 步骤。我的管道如下所示:

        pipeline
            .apply("Read Json", TextIO.read().from("src/main/resources/family_tree.json"))
            .apply("Traverse Json tree", new JSONTreeToPaths())
            .apply("Format tree paths", new PathsToCSV())
            .apply("Write to CSV", TextIO.write().to("src/main/resources/paths.csv")
                .withoutSharding());

该管道应该采用高度嵌套的 JSON 结构并打印通过树的每条单独路径。我遇到了与上面 POJO 示例中相同的错误:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Traverse Json tree/MapElements/Map/ParMultiDo(Anonymous).output [PCollection@331122245]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().

我试过的

所以我尝试通过几种不同的方式添加编码器:

.setCoder(SerializableCoder.of(List<String>.class))

导致“无法 select 来自参数化类型”。我发现另一个由不同用例生成的错误实例 ,但接受的答案似乎只适用于该用例。

然后我开始仔细阅读 Beam 文档并发现 ListCoder.of() 没有(字面上)描述。但它看起来很有希望,所以我试了一下:

.setCoder(ListCoder.of(SerializableCoder.of(String.class)))

但这让我回到了最初没有手动设置编码器的错误。

问题

如何满足为 List<String> 对象设置编码器的要求?

代码

导致 setCoder 错误的转换是这个:

package transforms;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;

import java.util.ArrayList;
import java.util.List;

public class JSONTreeToPaths extends PTransform<PCollection<String>, PCollection<List<String>>> {

    public static class ExtractPathsFromTree extends SimpleFunction<JsonNode, List<String>> {
        public List<String> apply(JsonNode root) {
            List<String> pathContainer = new ArrayList<>();
            getPaths(root, "", pathContainer);
            return pathContainer;
        }
    }

    public static class GetRootNode extends SimpleFunction<String, JsonNode> {
        public JsonNode apply(String jsonString) {
            try {
                return getRoot(jsonString);
            } catch (JsonProcessingException e) {
               e.printStackTrace();
               return null;
            }
        }
    }

    @Override
    public PCollection<List<String>> expand(PCollection<String> input) {
        return input
            .apply(MapElements.via(new GetRootNode()))
            .apply(MapElements.via(new ExtractPathsFromTree()));
    }

    private static JsonNode getRoot(String jsonString) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readTree(jsonString);
    }

    private static void getPaths(JsonNode node, String currentPath, List<String> paths) {
        //check if leaf:
        if (node.path("children").isMissingNode()) {
            currentPath += node.get("Id");
            paths.add(currentPath);
            System.out.println(currentPath);
            return;
        }

        // recursively iterate over children
        currentPath += (node.get("Id") + ",");
        for (JsonNode child : node.get("children")) {
            getPaths(child, currentPath, paths);
        }
    }
}



虽然错误消息似乎暗示字符串列表是需要编码的,但实际上是 JsonNode。我只需要在错误消息中进一步阅读,因为关于问题所在的开场白有点欺骗:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Traverse Json tree/MapElements/Map/ParMultiDo(Anonymous).output [PCollection@1324829744]. 
...
...
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder 
for com.fasterxml.jackson.databind.JsonNode.
Building a Coder using a registered CoderProvider failed.

发现这一点后,我通过扩展 Beam 的 CustomCoder class 解决了这个问题。这个抽象class很好,因为你只需要编写代码来序列化和反序列化对象:

public class JsonNodeCoder extends CustomCoder<JsonNode> {

    @Override
    public void encode(JsonNode node, OutputStream outStream) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        String nodeString = mapper.writeValueAsString(node);
        outStream.write(nodeString.getBytes());
    }

    @Override
    public JsonNode decode(InputStream inStream) throws IOException {
        byte[] bytes = IOUtils.toByteArray(inStream);
        ObjectMapper mapper = new ObjectMapper();
        String json = new String(bytes);
        return mapper.readTree(json);
    }
}

希望这对其他 Beam 新手有帮助。