Ruby 管道流是异步的吗?

Is Ruby pipe streaming asynchronous?

我有一个用 Ruby 编写的 map/reduce streaming pipeline,它的行为很奇怪。管道如下所示:

mapper | sort | reducer | expander | sort | splitter | uploader

映射器写入STDOUT(通过puts), the reducer reads from STDIN (via ARGF.each)和写入STDOUT(通过puts)等等

似乎在执行上传程序时,拆分器应该创建的文件尚未创建。所以上传者没有上传任何东西。

这是我的管道 class:

class Pipeline

  def run(context)
    raise ArgumentError, 'context is nil' unless context
    raise ArgumentError, 'context[:logger] is nil' unless context[:logger]

    current_path = File.dirname(__FILE__)
    mapper       = File.join(current_path, 'mapper.rb')
    reducer      = File.join(current_path, 'reducer.rb')
    expander     = File.join(current_path, 'expander.rb')
    splitter     = File.join(current_path, 'splitter.rb')
    uploader     = File.join(current_path, 'uploader.rb')

    mapper_args = context[:order_id] == nil ? nil : " #{context[:order_id]}"

    command_line = "ruby #{mapper}#{mapper_args} | sort | ruby #{reducer} | ruby #{expander} | sort | ruby #{splitter} | ruby #{uploader}"

    context[:logger].debug command_line

    %x{#{command_line}}
  end

end

如果管道流在 Ruby 中是异步的,我想知道做 RubyMine 所做的是否可以解决这个问题。例如,在 运行 一个 Ruby 脚本之前,他们会像这样在命令行前面添加:ruby -e $stdout.sync=true;$stderr.sync=true;load([=14=]=ARGV.shift).

我已经用这种技术更新了我的代码,但是,我想知道这是否正确。或者,有没有更好的方法?

class Pipeline

  def run(context)
    raise ArgumentError, 'context is nil' unless context
    raise ArgumentError, 'context[:logger] is nil' unless context[:logger]

    current_path = File.dirname(__FILE__)
    ruby         = 'ruby -e $stdout.sync=true;$stderr.sync=true;load([=12=]=ARGV.shift)'
    mapper       = File.join(current_path, 'mapper.rb')
    reducer      = File.join(current_path, 'reducer.rb')
    expander     = File.join(current_path, 'expander.rb')
    splitter     = File.join(current_path, 'splitter.rb')
    uploader     = File.join(current_path, 'uploader.rb')

    mapper_args = context[:order_id] == nil ? nil : " #{context[:order_id]}"

    create_reports_command_line = "#{ruby} #{mapper}#{mapper_args} | sort | #{ruby} #{reducer} | #{ruby} #{expander} | sort | #{ruby} #{splitter}"

    context[:logger].debug create_reports_command_line

    %x{#{create_reports_command_line}}

    sleep 60 # Sleep for 1 min, just in case...

    upload_reports_command_line = "#{ruby} #{uploader}"

    context[:logger].debug upload_reports_command_line

    %x{#{upload_reports_command_line}}
  end

end

您需要启用 sync。这告诉 Ruby 不是 缓冲输出,而是将其作为输出发送。

Sets the “sync mode” to true or false. When sync mode is true, all output is immediately flushed to the underlying operating system and is not buffered internally.

管道应该与 sync disabled/false 一起工作,但是直到第一个管道,然后是后续管道,看到一个关闭的输入,或者它们有完整的缓冲区并刷新它们,你才会看到任何东西,这可能需要一段时间。

有关详细信息,请参阅 IO.sync=