如何将 ruby 实例变量设置为 RxRuby Observable?

How should one set a ruby instance variable to an RxRuby Observable?

我计划将来自 tcp 端口的传入数据视为来自 "view" 的数据。我想设置一些 RxRuby Observables,然后根据我从 tcp 端口获得的数据,select 适当的 Observable 并通过调用 on_next 方法向它发布一些东西。

下面的代码可以工作,但看起来很笨拙。传递给 Rx::Observable.create 的块只是将一个实例变量设置为传递给它的可观察对象。这不是大量的样板,但似乎有些不对劲。我错过了什么吗?

require 'rx'
class GUIMessagePublisher
  attr_accessor :handshake, :handshake_stream, :remote_button, :remote_button_stream

  def initialize
    self.handshake_stream = Rx::Observable.create { |o| self.handshake = o }
    self.remote_button_stream = Rx::Observable.create { |o| self.remote_button = o }
  end

  def publish_handshake
    handshake.on_next("hello")
  end

  def publish_remote_button
    remote_button.on_next(nil)
  end

end

publisher = GUIMessagePublisher.new
publisher.handshake_stream.subscribe { |m| puts "message = #{m}"}
publisher.remote_button_stream.subscribe { puts "remote button clicked" }
publisher.publish_handshake
publisher.publish_remote_button

阅读了更多有关 Rx::Subject 的信息后,我认为这是处理此问题的首选方法

require 'rx'
require 'forwardable'
class GUIMessagePublisher
  extend Forwardable
  attr_accessor :handshake_subject, :remote_button_subject
  def_delegator :handshake_subject, :as_observable, :handshake_stream
  def_delegator :remote_button_subject, :as_observable, :remote_button_stream

  def initialize
    self.handshake_subject = Rx::Subject.new 
    self.remote_button_subject = Rx::Subject.new
  end

  def publish_handshake
    handshake_subject.on_next("hello")
  end

  def publish_remote_button
    remote_button_subject.on_next("remote button")
  end

end

publisher = GUIMessagePublisher.new
publisher.handshake_stream.subscribe { |m| puts "message = #{m}"}
publisher.remote_button_stream.subscribe { |m| puts "remote button clicked, message = #{m}" }
publisher.publish_handshake
publisher.publish_remote_button

Forwardable 的使用是可选的。我本可以通过方法进行委托,甚至可以在公开的主题上调用 .as_observable,但这似乎是正确的。