如何处理 ZeroMQ + Ruby 中的线程问题?

How to handle a Thread Issue in ZeroMQ + Ruby?

无意中阅读 ZeroMQ FAQ 关于线程安全的内容。

My multi-threaded program keeps crashing in weird places inside the ZeroMQ library. What am I doing wrong?

ZeroMQ sockets are not thread-safe. This is covered in some detail in the Guide.

The short version is that sockets should not be shared between threads. We recommend creating a dedicated socket for each thread.

For those situations where a dedicated socket per thread is infeasible, a socket may be shared if and only if each thread executes a full memory barrier before accessing the socket. Most languages support a Mutex or Spinlock which will execute the full memory barrier on your behalf.

我的多线程程序总是在 ZeroMQ 库中奇怪的地方崩溃。
我究竟做错了什么?

以下是我的后续代码:

Celluloid::ZMQ.init
module Scp
    module DataStore
    class DataSocket
        include Celluloid::ZMQ 
            def pull_socket(socket)
                @read_socket = Socket::Pull.new.tap do |read_socket|
                    ## IPC socket
                    read_socket.connect(socket)
                end
            end

            def push_socket(socket)
                @write_socket = Socket::Push.new.tap do |write_socket|
                    ## IPC socket
                    write_socket.connect(socket)
                end
            end

            def run
                pull_socket and push_socket and loopify!
            end

            def loopify!
                loop {
                   async.evaluate_response(read_socket.read_multipart)
                }
            end

            def evaluate_response(data)
                return_response(message_id,routing,Parser.parser(data))
            end

            def return_response(message_id,routing,object)
                data = object.to_response
                write_socket.send([message_id,routing,data])
            end
        end
    end
end  

DataSocket.new.run 

现在,有几件事我不清楚:

1) 假设 async 产生一个新的 Thread (每次)并且 write_socket 在所有线程之间共享并且ZeroMQ 说他们的套接字不是线程安全的。我当然看到 write_socket 运行 进入线程安全问题。
(顺便说一句,到目前为止,在所有端到端测试中都没有遇到这个问题。)

问题一:我的理解对吗?

为了解决这个问题,ZeroMQ 要求我们使用 Mutex、Semaphore 来实现。

这导致问题 2

2) 上下文切换。

鉴于线程应用程序可以随时切换上下文。 查看ffi-rzmq代码Celluloid::ZMQ.send()内部调用send_strings(),内部调用send_multiple()

问题 2:上下文切换可以发生在内部(任何地方)(甚至在关键部分)(此处)[https://github.com/chuckremes/ffi-rzmq/blob/master/lib/ffi-rzmq/socket.rb#L510]

这也可能导致数据排序问题。

我的以下观察是否正确?

注:

Operating system ( MacOS, Linux and CentOS )  
Ruby - MRI 2.2.2/2.3.0

这个答案不是解决您问题的好方法,绝对符合 user3666197 的建议。我认为此解决方案有可能奏效,但在大规模情况下可能会因互斥拥塞而产生性能成本。

Question 1: Assuming that async spawn new Thread(every time) and write_socket is shared between the all threads and zeromq says that their socket is not threaded safe. I certainly see write_socket running into threads safety issue. (Btw hasn't faced this issue in all end to end testing thus far.) Is my understanding correct on this?

根据我对文档的理解,是的,这可能是个问题,因为套接字不是线程安全的。即使您没有遇到此问题,它也可能会在稍后弹出。

Question 2: Context Switching can happen(anywhere) inside(even on critical section)

是的,所以我们可能解决这个问题的一种方法是使用 mutex/semaphore 来确保我们不会在错误的时间发生上下文切换。

我会做这样的事情,但可能会有更好的方法,具体取决于调用的方法不是线程安全的:

Celluloid::ZMQ.init
module Scp
  module DataStore
    class DataSocket
      include Celluloid::ZMQ

      def initialize
        @mutex = Mutex.new
      end

      def pull_socket(socket)
        Thread.new do
          @mutex.synchronize do
            @read_socket = Socket::Pull.new.tap do |read_socket|
              ## IPC socket
              read_socket.connect(socket)
            end
          end
        end.join
      end

      def push_socket(socket)
        Thread.new do
          @mutex.synchronize do
            @write_socket = Socket::Push.new.tap do |write_socket|
              ## IPC socket
              write_socket.connect(socket)
            end
          end
        end.join
      end

      def run
        # Missing socket arguments here
        pull_socket and push_socket and loopify!
      end

      def loopify!
        Thread.new do
          @mutex.synchronize do
            loop {
              async.evaluate_response(read_socket.read_multipart)
            }
          end
        end.join
      end

      def evaluate_response(data)
        return_response(message_id,routing,Parser.parser(data))
      end

      def return_response(message_id,routing,object)
        data = object.to_response
        write_socket.send([message_id,routing,data])
      end
    end
  end
end

DataSocket.new.run

任何人都不应轻视应用程序的稳健性而冒险

请原谅这个故事读起来有点长,但作者一生的经验表明 原因 为什么 远比尝试通过实验找到 how

的任何几个 SLOC(可能令人怀疑或看起来神秘或无知的根本​​原因)

初始笔记

虽然 ZeroMQ 几十年来一直被宣传为零共享(零阻塞,(几乎)-零延迟和更多的设计准则。了解优缺点的最佳地点是 Pieter HINTJENS'书籍,不仅是神话般的 "Code Connected, Volume 1",还有真实社会领域的高级设计和工程)哲学,最近的 API 文档介绍并宣传了一些恕我直言的功能,这些功能与这些角落的关系松散- 分布式计算的石头原则,在零共享上不要那么尖锐地吹口哨那么响亮。这就是说,我仍然是一个零共享的人,所以请从这个角度来看post的其余部分。

答案1:
不,先生。 -- 或者更好 -- 是和否,先生。

ZeroMQ 不要求使用 Mutex/Semaphore 障碍。这与 ZeroMQ 设计准则相矛盾。

是的,最近的 API 更改开始提到 (在某些附加条件下) 可能会开始使用共享-套接字……有(许多)附加措施……所以含义是相反的。如果一个 "wants",这个人还采取了所有额外的步骤和措施(并支付了 "allowing" 共享玩具的所有最初隐藏的设计和实施成本,以(希望)在主要(不必要的)战斗中幸存下来与其他不可控的分布式系统环境——因此突然也承担了失败的风险(出于许多明智的原因,在最初的 ZeroMQ 零共享传播中并非如此)——因此,用户决定走哪条路.这很公平。)。

健全和健壮的设计恕我直言,最好还是按照最初的 ZeroMQ API 和福音主义进行开发,其中零共享是一个原则。

答案 2:
ZeroMQ 数据流排序的设计总是存在一个主要的不确定性,ZeroMQ 设计准则之一让设计人员不要依赖关于消息排序和许多其他不受支持的假设(适用例外情况)。可以肯定的是,任何发送到 ZeroMQ 基础设施的消息要么作为完整消息传递,要么根本不传递。因此,可以肯定的是,交付时从未出现过碎片残骸。有关更多详细信息,请阅读下文。


ThreadId 不能证明什么 (除非 inproc transport-class 使用)

鉴于 ZeroMQ 数据泵引擎的内部设计,
的实例化 zmq.Context( number_of_IO_threads ) 决定生成多少线程来处理未来的数据流。这可以是任何地方 { 0, 1: default, 2, .. } 直到几乎耗尽内核固定的最大线程数。 0 的值给出了不浪费资源的合理选择,其中 inproc:// transport-class 实际上是数据流的直接内存区域映射处理(实际上永远不会流 ang 被确定直接进入接收套接字抽象的着陆台 :o) ) 并且这样的工作不需要任何线程。
除此之外,<aSocket>.setsockopt( zmq.AFFINITY, <anIoThreadEnumID#> ) 允许微调与数据相关的 IO-"hydraulics",以便优先级、负载平衡、性能调整线程加载到 zmq.Context() 实例的 IO 线程的枚举池中,并从上面列出的设计和数据流操作方面的更好和最佳设置中获益。


基石元素是 Context() 的实例,
不是 Socket() 的实例

一旦 Context() 的实例被实例化和配置(参考上面的原因和方式),它(几乎)是可以免费共享的(如果设计无法抗拒共享或有一个需要避免设置完全成熟的分布式计算基础设施)。

换句话说,大脑总是在zmq.Context()的实例中——所有套接字相关的dFSA引擎都在那里设置/配置/操作(是的,尽管语法是 <aSocket>.setsockopt(...),但这种效果是在大脑 内部实现的 ——在各自的 zmq.Context 中——而不是在一些来自 A 的线路中-到-B.

最好永远不要分享 <aSocket> (即使 API-4.2.2+ 承诺你可以)

到目前为止,人们可能已经看到了很多代码片段,其中 ZeroMQ 上下文及其套接字被实例化并迅速处理掉,仅连续提供几个 SLOC-s,但是 - 这确实并不意味着这种做法是明智的或根据任何其他需要进行调整,而不是一个非常学术的例子(由于图书出版商的政策,这只是为了在尽可能少的 SLOC 中印刷而制作的)。

即使在这种情况下,也应该提出关于 zmq.Context 基础设施设置/拆除的巨大成本的公平警告,从而避免任何泛化,越少 copy/paste 此类副本代码,只是为了说明目的而临时使用的。

想象一下任何单个 Context 实例需要进行的实际设置——准备好各自的 dFSA 引擎池,维护它们各自的配置设置以及所有套接字端点池相关传输-class 特定硬件 + 外部 O/S-services 处理程序、循环事件扫描器、缓冲区内存池分配 + 它们的动态分配器等。这都需要时间和 O/S 资源,因此如果不影响性能,请明智地处理这些(自然)成本并注意调整后的间接费用。

如果仍然怀疑为什么要提到这一点,试想一下,如果有人坚持要在数据包发送后立即拆除所有 LAN 电缆,并且需要等到新电缆安装好之后才需要发送下一个数据包出现。希望这个 "reasonable-instantiation" 观点现在可以被更好地理解,并成为共享(如果有的话)一个 zmq.Context()-实例的论据,而无需为尝试共享 ZeroMQ 套接字实例而进行任何进一步的斗争(即使新成为(几乎)线程安全本身)。

如果将 ZeroMQ 理念作为高性能分布式计算基础设施的先进设计传播,它是强大的。仅仅调整一个(次要的)方面通常不会调整所有的努力和成本,因为从全球角度来看如何设计安全和高性能的系统,结果不会有一点改善 (即使是绝对可共享的无风险(如果可能的话)套接字实例也不会改变这一点,而声音设计、干净代码和合理可实现的测试能力和调试的所有好处都将丢失) 如果只是这个细节发生了变化——那么,宁愿从现有的大脑中拉出另一根电线到这样一个新线程,或者为一个新线程配备它自己的大脑,这将在本地处理它的资源并允许它连接自己的电线到分布式系统中的所有其他大脑——必要时与之通信)。

如果仍有疑问,请想象一下如果您的国家奥林匹克曲棍球队在比赛期间只共用一根曲棍球棒会怎样。或者,如果您家乡的所有邻居都共享同一个 phone 号码来接听所有许多来电(是的,所有 phone 和手机都响铃,共享同一个号,同时)。 效果如何?


语言绑定不需要反映所有可用的 API 功能

在这里,可以提出,并且在某些情况下是正确的,并非所有 ZeroMQ 语言绑定或所有流行的框架包装器都会将所有 API 细节暴露给用户以进行应用程序级编程(作者of this post 长期以来一直在与此类遗留冲突作斗争,由于这个原因,这些冲突仍然无法解决,不得不绞尽脑汁寻找任何可行的方法来解决这个问题 - 所以(几乎)总是可行的)


结语:

值得一提的是,ZeroMQ API 4.2.2+ 的最新版本开始逐渐沿用最初的传播原则。

尽管如此,值得记住的前世memento mori

(强调,不大写)

Thread safety

ØMQ has both thread safe socket type and not thread safe socket types. Applications MUST NOT use a not thread safe socket from multiple threads except after migrating a socket from one thread to another with a "full fence" memory barrier.

Following are the thread safe sockets: * ZMQ_CLIENT * ZMQ_SERVER * ZMQ_DISH * ZMQ_RADIO * ZMQ_SCATTER * ZMQ_GATHER

虽然这篇文章在某些人看来可能听起来很有前途,但调用服务障碍是设计高级分布式计算系统时最糟糕的事情,性能是必须的。

人们最不想看到的是阻止自己的代码,因为这样的代理进入了一个基本上无法控制的阻塞状态,没有人可以阻止它(无论是内部的代理本身,还是来自外部的任何人),以防远程代理永远不会提供一个刚刚预期的事件(在分布式系统中,这种情况可能由于多种原因或在许多无法控制的情况下发生)。

构建一个容易挂起的系统(对支持的(但天真地使用)语法可能性微笑)确实不是一件令人愉快的事情,更不是认真的设计工作。

人们在这里也不会感到惊讶,许多额外的(最初不可见的)限制适用于使用共享-{ hockey-stick | 的新动作。 telephones } API:

ZMQ_CLIENT sockets are threadsafe. They do not accept the ZMQ_SNDMORE option on sends not ZMQ_RCVMORE on receives. This limits them to single part data. The intention is to extend the API to allow scatter/gather of multi-part data.

c/a

Celluloid::ZMQ 没有报告任何这些新的-API-(共享几乎是宽容的罪恶)套接字类型在其关于支持的套接字类型的部分所以没有好消息可以期待 a- priori 和 Celluloid::ZMQ master activity 似乎在 2015 年的某个地方淡出,所以从这个角度来看,期望应该有点现实。

这就是说,一个有趣的点可能会在通知后面被发现:

before you go building your own distributed Celluloid systems with Celluloid::ZMQ, be sure to give DCell a look and decide if it fits your purposes.


最后但同样重要的是,将事件循环系统组合到另一个事件循环中是一项痛苦的工作。试图将一个嵌入式硬实时系统集成到另一个硬实时系统中甚至可能在数学上证明自己是不可能的。

类似地,如果遇到相同的资源,使用另一个基于代理的组件构建多代理系统会带来其他类型的冲突和竞争条件(无论是有意还是 "just" 某些功能副作用)来自两个(多个)基于代理的框架。

不可挽救的相互死锁只是这些冲突中的一种,它会在无意识的设计尝试中引入最初看不见的麻烦。单代理系统设计之外的第一步会使人失去更多的保证,这些保证在进入多代理(分布式)之前未被注意到,因此开放思想并准备好学习许多 "new" 概念专注于许多需要仔细观察并努力避免的新问题是一个非常重要的先决条件,以免(不知不觉地)引入模式,这些模式现在实际上是分布式系统(多代理)领域中的反模式.

至少
你被警告过
:o)