如何在不消耗所有内存的情况下将巨大的 JSON 文件作为 Ruby 中的流进行处理?

How can I process huge JSON files as streams in Ruby, without consuming all memory?

我在 Ruby 中处理一个巨大的 JSON 文件时遇到问题。我正在寻找的是一种逐项处理它而不会在内存中保留太多数据的方法。

我认为 yajl-ruby gem would do the work but it consumes all my memory. I've also looked at Yajl::FFI 和 JSON:Stream gems 但那里明确指出:

For larger documents we can use an IO object to stream it into the parser. We still need room for the parsed object, but the document itself is never fully read into memory.

这是我对 Yajl 所做的:

file_stream = File.open(file, "r")
json = Yajl::Parser.parse(file_stream)
json.each do |entry|
    entry.do_something
end
file_stream.close

内存使用率不断升高,直到进程被终止。

我不明白为什么 Yajl 会在内存中保留已处理的条目。我能以某种方式释放它们吗,还是我只是误解了 Yajl 解析器的功能?

如果无法使用 Yajl 完成:是否可以通过任何库在 Ruby 中完成此操作?

问题

json = Yajl::Parser.parse(file_stream)

当您像这样调用 Yajl::Parser 时,整个流将加载到内存中以创建您的数据结构。不要那样做。

解决方案

Yajl 提供了 Parser#parse_chunk, Parser#on_parse_complete, and other related methods that enable you to trigger parsing events on a stream without requiring that the whole IO stream be parsed at once. The README contains an example 如何使用分块。

README 中给出的例子是:

Or lets say you didn't have access to the IO object that contained JSON data, but instead only had access to chunks of it at a time. No problem!

(Assume we're in an EventMachine::Connection instance)

def post_init
  @parser = Yajl::Parser.new(:symbolize_keys => true)
end

def object_parsed(obj)
  puts "Sometimes one pays most for the things one gets for nothing. - Albert Einstein"
  puts obj.inspect
end

def connection_completed
  # once a full JSON object has been parsed from the stream
  # object_parsed will be called, and passed the constructed object
  @parser.on_parse_complete = method(:object_parsed)
end

def receive_data(data)
  # continue passing chunks
  @parser << data
end

Or if you don't need to stream it, it'll just return the built object from the parse when it's done. NOTE: if there are going to be multiple JSON strings in the input, you must specify a block or callback as this is how yajl-ruby will hand you (the caller) each object as it's parsed off the input.

obj = Yajl::Parser.parse(str_or_io)

不管怎样,您一次只能解析 JSON 数据的一个子集。否则,您只是在内存中实例化一个巨大的 Hash,这正是您描述的行为。

在不知道您的数据是什么样子以及您的 JSON 对象是如何组成的情况下,不可能给出比这更详细的解释;因此,您的里程可能会有所不同。但是,这至少应该让您指明正确的方向。

您的解决方案似乎是json-stream and yajl-ffi。两者都有一个非常相似的例子(它们来自同一个人):

def post_init
  @parser = Yajl::FFI::Parser.new
  @parser.start_document { puts "start document" }
  @parser.end_document   { puts "end document" }
  @parser.start_object   { puts "start object" }
  @parser.end_object     { puts "end object" }
  @parser.start_array    { puts "start array" }
  @parser.end_array      { puts "end array" }
  @parser.key            {|k| puts "key: #{k}" }
  @parser.value          {|v| puts "value: #{v}" }
end

def receive_data(data)
  begin
    @parser << data
  rescue Yajl::FFI::ParserError => e
    close_connection
  end
end

在那里,他设置了流解析器可能遇到的数据事件的回调。

给定一个 json 文档,如下所示:

{
  1: {
    name: "fred",
    color: "red",
    dead: true,
  },
  2: {
    name: "tony",
    color: "six",
    dead: true,
  },
  ...
  n: {
    name: "erik",
    color: "black",
    dead: false,
  },
}

可以像这样使用 yajl-ffi 对其进行流式分析:

def parse_dudes file_io, chunk_size
  parser = Yajl::FFI::Parser.new
  object_nesting_level = 0
  current_row = {}
  current_key = nil

  parser.start_object { object_nesting_level += 1 }
  parser.end_object do
    if object_nesting_level.eql? 2
      yield current_row #here, we yield the fully collected record to the passed block
      current_row = {}
    end
    object_nesting_level -= 1
  end

  parser.key do |k|
    if object_nesting_level.eql? 2
      current_key = k
    elsif object_nesting_level.eql? 1
      current_row["id"] = k
    end
  end

  parser.value { |v| current_row[current_key] = v }

  file_io.each(chunk_size) { |chunk| parser << chunk }
end

File.open('dudes.json') do |f|
  parse_dudes f, 1024 do |dude|
    pp dude
  end
end

@CodeGnome 和@A。 Rager 的回答帮助我理解了解决方案。

我最终创建了 gem json-streamer,它提供了一种通用方法,无需为每个场景手动定义回调。