在 Java 中有效地从输入流中流式传输大型 JSON

Streaming large JSON from input stream efficiently in Java

为了节省内存并避免 OOM 错误,我想从输入流中流出一个大的 JSON 并从中提取所需的东西。更确切地说,我想从 JSON:

中提取并保存一些字符串
  1. files.content.fileContent.subList.text = "文件中的一些文本"
  2. files.content.fileContent.subList.text = "file2 中的一些文本"

并将它们保存到字符串变量中:

String result = "some text in file \r\nsome text in file2"

我尝试使用 Jackson 解析 JSON:

        JsonFactory jsonFactory = new JsonFactory();

        StringBuilder result = new StringBuilder();
        try (JsonParser jsonParser = jsonFactory.createParser(jsonAsInputStream)) {
            String fieldName;
            while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
                jsonParser.nextToken();
                fieldName = jsonParser.getCurrentName();
                if ("files".equals(fieldName)) {

                    while (true) {
                        jsonParser.nextToken();
                        fieldName = jsonParser.getCurrentName();
                        if ("content".equals(fieldName)) {
                            jsonParser.nextToken();
                            fieldName = jsonParser.getCurrentName();
                            while (true) {
                                if ("text".equals(fieldName)) {
                                    result.append(jsonParser.getText());
                                }
                            }
                        }
                    }
                }
            }
            LOGGER.info("result: {}", result);
        } catch (JsonParseException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

以上根本不起作用,该解决方案变得复杂。有什么简单的方法可以解析 JSON inputStream 并从中提取一些文本吗?

下面是JSON附件:

{
"id": "1",
"name": "TestFile.xlsx",
"files": [
    {
        "id": "1",
        "fileName": "TestFile.xlsx",
        "types": {
            "fileId": "1",
            "context": [
                {
                    "id": 1,
                    "contextId": "xyz",
                    "metadata": {
                        "abc": "1"
                    }
                },
                {
                    "id": 2,
                    "contextId": "abc"
                }
            ],
            "fileSettings": [
                {
                    "id": 1,
                    "settingId": 1
                },
                {
                    "id": 2,
                    "settingId": 2
                }
                
            ],
            "fileAttachments": [
                {
                    "id": 1,
                    "canDelete": true,
                    "canAttach": []
                }
            ],
            "replacements": [
                {
                    "id": 1,
                    "replacementText": "xcv"
                }
            ]
        },
        "content": [
            {
                "id": "1",
                "contextList": [
                    1,
                    2,
                    3
                ],
                "fileContent": {
                    "contentType": "text",
                    "subList": [
                        {
                            "id": "1",
                            "subList": [
                                {
                                    "id": "1",
                                    "text": "some text in file",
                                    "type": "text"
                                }
                            ]
                        }
                    ]
                },
                "externalContent": {
                    "id": "1",
                    "children": [
                        {
                            "id": "1",
                            "contentType": "text corrupted",
                            "children": []
                        }
                    ]
                }
            },
            {
                "id": "2",
                "contextList": [
                    1,
                    2
                ],
                "fileContent": {
                    "contentType": "text",
                    "subList": [
                        {
                            "id": "2",
                            "subList": [
                                {
                                    "id": "1",
                                    "text": "some text in file2",
                                    "type": "text"
                                }
                            ]
                        }
                    ]
                },
                "externalContent": {
                    "id": "2",
                    "children": [
                        {
                            "id": "2",
                            "contentType": "text corrupted2",
                            "children": []
                        }
                    ]
                }
            }
        ]
    }
]

}

你检查过JsonPath了吗?您可以使用 Gson 或 Jackson 作为提供者,但默认情况下,它使用 Json-smart,即 performance-focused。

这是一个基于您所附的 JSON 的示例。

InputStream inputStream = Main.class.getClassLoader().getResourceAsStream("file.json");
String[] textArray = JsonPath.parse(inputStream).read("files[*].content[*].fileContent.subList[*].subList[*].text", String[].class);
Arrays.stream(textArray).forEach(System.out::println);

JsonPath 会占用大量内存。如果您没有足够的内存来处理大文件,您可以使用 stream/token 方法。如果您不存储文本,下面的代码可以处理 json 个 6GB 的文件和 900MB 或更少的堆。

public class Main {

public static void main(String[] args) throws Exception {

    try (InputStream inputStream = getJsonAsInputStream()) {
        EnumMap<JsonToken, JsonTokenHandler> map = getJsonTokenHandler();

        Context context = new Context();
        JsonReader reader = new JsonReader(new InputStreamReader(inputStream));

        while (true) {
            JsonToken token = reader.peek();
            JsonTokenHandler jsonTokenHandler = map.get(token);
            jsonTokenHandler.handle(reader, context);
            if (token.equals(END_DOCUMENT)) {
                break;
            }
        }

        context.getTexts().forEach(System.out::println);
    }
}

private static EnumMap<JsonToken, JsonTokenHandler> getJsonTokenHandler() {
    EnumMap<JsonToken, JsonTokenHandler> map = new EnumMap<>(JsonToken.class);
    map.put(BEGIN_ARRAY, (reader, context) -> reader.beginArray());
    map.put(END_ARRAY, (reader, context) -> reader.endArray());
    map.put(BEGIN_OBJECT, (reader, context) -> reader.beginObject());
    map.put(END_OBJECT, (reader, context) -> reader.endObject());
    map.put(NAME, (reader, context) -> {
        reader.nextName();
        context.setCurrentPath(reader.getPath());
    });
    map.put(STRING, (reader, context) -> {
        String string = reader.nextString();
        if (context.isTextAttribute()) {
            context.addText(string);
        }

    });
    map.put(NUMBER, (reader, context) -> reader.nextString());
    map.put(BOOLEAN, (reader, context) -> reader.nextBoolean());
    map.put(NULL, (reader, context) -> reader.nextNull());
    map.put(END_DOCUMENT, (reader, context) -> {
    });
    return map;
}

private static InputStream getJsonAsInputStream() throws FileNotFoundException {
    File inFile = new File("/path/to/your/large/file.json");
    ReadableByteChannel rChannel = new RandomAccessFile(inFile, "r").getChannel();
    return Channels.newInputStream(rChannel);
}


static class Context {
    private String currentPath;
    private List<String> texts = new ArrayList<>();

    public void addText(String text) {
        texts.add(text);
    }

    public List<String> getTexts() {
        return texts;
    }

    public void setCurrentPath(String path) {
        this.currentPath = path;
    }

    public boolean isTextAttribute() {
        return currentPath.matches("\$\.files\[\d+\]\.content\[\d+\]\.fileContent\.subList\[\d+\]\.subList\[\d+\]\.text");
    }
}

interface JsonTokenHandler {
    void handle(JsonReader reader, Context context) throws IOException;
}

}

简而言之,

  • 您的代码无法运行,因为它实现了错误的算法;
  • JsonPath,正如所建议的那样,似乎是一个很好的 DSL 实现,但它使用 DOM 方法将整个 JSON 树收集到内存中,因此您将 运行 再次OOM。

你有两个解决方案:

  • 在您当前的方法中实施适当的算法(我同意您的方法是正确的);
  • 尝试实现类似于 JsonPath 实现的东西,将问题分解为更小的问题,支持真正的流式处理方法。

我不会记录我的大部分代码,因为它很容易理解和适应其他库,但您可以使用 Java 17(带预览功能)开发以下代码的更高级的东西已启用)和 javax.json(+ 一些用于 Java 样板的 Lombok):

@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class PathJsonParser
        implements JsonParser, Iterator<JsonParser.Event> {

    private static final int DEFAULT_PATH_LENGTH = 32;

    private final JsonParser jsonParser;
    private final AbstractPathElement[] path;
    private int last;

    public static PathJsonParser create(final JsonParser jsonParser) {
        final int maxPathLength = DEFAULT_PATH_LENGTH;
        final PathJsonParser pathJsonParser = new PathJsonParser(jsonParser, new AbstractPathElement[maxPathLength]);
        pathJsonParser.path[0] = AbstractPathElement.Root.instance;
        for ( int i = 1; i < maxPathLength; i++ ) {
            pathJsonParser.path[i] = new AbstractPathElement.Container();
        }
        return pathJsonParser;
    }

    @Override
    public Event next() {
        final Event event = jsonParser.next();
        switch ( event ) {
        case START_ARRAY -> {
            path[last].tryIncreaseIndex();
            path[++last].reset(JsonValue.ValueType.ARRAY);
        }
        case START_OBJECT -> {
            path[last].tryIncreaseIndex();
            path[++last].reset(JsonValue.ValueType.OBJECT);
        }
        case KEY_NAME -> path[last].setKeyName(jsonParser.getString());
        case VALUE_STRING -> path[last].tryIncreaseIndex();
        case VALUE_NUMBER -> path[last].tryIncreaseIndex();
        case VALUE_TRUE -> path[last].tryIncreaseIndex();
        case VALUE_FALSE -> path[last].tryIncreaseIndex();
        case VALUE_NULL -> path[last].tryIncreaseIndex();
        case END_OBJECT -> --last;
        case END_ARRAY -> --last;
        default -> throw new AssertionError(event);
        }
        return event;
    }

    public boolean matchesRoot(final int at) {
        @Nullable
        final AbstractPathElement e = tryElementAt(at);
        return e != null && e.matchesRoot();
    }

    public boolean matchesIndex(final int at, final IntPredicate predicate) {
        @Nullable
        final AbstractPathElement e = tryElementAt(at);
        return e != null && e.matchesIndex(predicate);
    }

    public boolean matchesName(final int at, final Predicate<? super String> predicate) {
        @Nullable
        final AbstractPathElement e = tryElementAt(at);
        return e != null && e.matchesName(predicate);
    }

    // @formatter:off
    @Override public boolean hasNext() { return jsonParser.hasNext(); }
    @Override public String getString() { return jsonParser.getString(); }
    @Override public boolean isIntegralNumber() { return jsonParser.isIntegralNumber(); }
    @Override public int getInt() { return jsonParser.getInt(); }
    @Override public long getLong() { return jsonParser.getLong(); }
    @Override public BigDecimal getBigDecimal() { return jsonParser.getBigDecimal(); }
    @Override public JsonLocation getLocation() { return jsonParser.getLocation(); }
    @Override @SuppressWarnings("MethodDoesntCallSuperMethod") public JsonObject getObject() { return jsonParser.getObject(); }
    @Override @SuppressWarnings("MethodDoesntCallSuperMethod") public JsonValue getValue() { return jsonParser.getValue(); }
    @Override @SuppressWarnings("MethodDoesntCallSuperMethod") public JsonArray getArray() { return jsonParser.getArray(); }
    @Override @SuppressWarnings("MethodDoesntCallSuperMethod") public Stream<JsonValue> getArrayStream() { return jsonParser.getArrayStream(); }
    @Override @SuppressWarnings("MethodDoesntCallSuperMethod") public Stream<Map.Entry<String, JsonValue>> getObjectStream() { return jsonParser.getObjectStream(); }
    @Override @SuppressWarnings("MethodDoesntCallSuperMethod") public Stream<JsonValue> getValueStream() { return jsonParser.getValueStream(); }
    @Override @SuppressWarnings("MethodDoesntCallSuperMethod") public void skipArray() { jsonParser.skipArray(); }
    @Override @SuppressWarnings("MethodDoesntCallSuperMethod") public void skipObject() { jsonParser.skipObject(); }
    @Override public void close() { jsonParser.close(); }
    // @formatter:on

    @Nullable
    private AbstractPathElement tryElementAt(final int at) {
        final int pathAt;
        if ( at >= 0 ) {
            pathAt = at;
        } else {
            pathAt = last + at + 1;
        }
        if ( pathAt < 0 || pathAt > last ) {
            return null;
        }
        return path[pathAt];
    }

    private abstract static sealed class AbstractPathElement
            permits AbstractPathElement.Root, AbstractPathElement.Container {

        abstract void reset(JsonValue.ValueType valueType);

        abstract void setKeyName(String keyName);

        abstract void tryIncreaseIndex();

        abstract boolean matchesRoot();

        abstract boolean matchesIndex(IntPredicate predicate);

        abstract boolean matchesName(Predicate<? super String> predicate);

        @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
        private static final class Root
                extends AbstractPathElement {

            private static final AbstractPathElement instance = new Root();

            @Override
            void reset(final JsonValue.ValueType valueType) {
                throw new UnsupportedOperationException();
            }

            @Override
            void setKeyName(final String keyName) {
                throw new UnsupportedOperationException();
            }

            @Override
            void tryIncreaseIndex() {
                // do nothing
            }

            @Override
            boolean matchesRoot() {
                return true;
            }

            @Override
            boolean matchesIndex(final IntPredicate predicate) {
                return false;
            }

            @Override
            boolean matchesName(final Predicate<? super String> predicate) {
                return false;
            }

        }

        @RequiredArgsConstructor(access = AccessLevel.PACKAGE)
        private static final class Container
                extends AbstractPathElement {

            private static final String NO_KEY_NAME = null;
            private static final int NO_INDEX = -1;

            private JsonValue.ValueType valueType;
            private String keyName = NO_KEY_NAME;
            private int index = NO_INDEX;

            @Override
            void reset(final JsonValue.ValueType valueType) {
                this.valueType = valueType;
                keyName = NO_KEY_NAME;
                index = NO_INDEX;
            }

            @Override
            void setKeyName(final String keyName) {
                this.keyName = keyName;
            }

            @Override
            void tryIncreaseIndex() {
                if ( valueType == JsonValue.ValueType.ARRAY ) {
                    index++;
                }
            }

            @Override
            boolean matchesRoot() {
                return false;
            }

            @Override
            boolean matchesIndex(final IntPredicate predicate) {
                return switch ( valueType ) {
                    case ARRAY -> index != NO_INDEX && predicate.test(index);
                    case OBJECT -> false;
                    case STRING, NUMBER, TRUE, FALSE, NULL -> throw new AssertionError(valueType);
                };
            }

            @Override
            boolean matchesName(final Predicate<? super String> predicate) {
                return switch ( valueType ) {
                    case ARRAY -> false;
                    case OBJECT -> !Objects.equals(keyName, NO_KEY_NAME) && predicate.test(keyName);
                    case STRING, NUMBER, TRUE, FALSE, NULL -> throw new AssertionError(valueType);
                };
            }

        }

    }

}

使用示例:

public final class PathJsonParserTest {

    // $.files.0.content.0.fileContent.subList.0.subList.0.text
    private static boolean matches(final PathJsonParser parser) {
        return parser.matchesName(-1, name -> name.equals("text"))
                && parser.matchesIndex(-2, index -> true)
                && parser.matchesName(-3, name -> name.equals("subList"))
                && parser.matchesIndex(-4, index -> true)
                && parser.matchesName(-5, name -> name.equals("subList"))
                && parser.matchesName(-6, name -> name.equals("fileContent"))
                && parser.matchesIndex(-7, index -> true)
                && parser.matchesName(-8, name -> name.equals("content"))
                && parser.matchesIndex(-9, index -> true)
                && parser.matchesName(-10, name -> name.equals("files"))
                && parser.matchesRoot(-11);
    }

    @Test
    public void test()
            throws IOException {
        try ( final PathJsonParser parser = PathJsonParser.create(JsonParsers.openFromResource(PathJsonParserTest.class, "input.json")) ) {
            for ( ; parser.hasNext(); parser.next() ) {
                if ( matches(parser) ) {
                    parser.next();
                    System.out.println(parser.getValue());
                }
            }
        }
    }

}

当然,不像 JsonPath 那样 cool-looking,但您可以执行以下操作:

  • 实施匹配器构建器API使其看起来更漂亮;
  • 实施 JSON Path-compliant 解析器来构建匹配器;
  • for/if/next() 模式包装成通用算法(类似于 BufferedReader.readLine() 为流 API 实现或包装它);
  • 实现某种简单的JSON-to-objects解串器。

或者,如果可能的话,找到一个好的代码生成器,它可以生成一个流式解析器,并且 运行 时间成本尽可能小(其结果与您的非常相似,但可以工作)。 (如果您知道,请联系我。)