为什么 Thrift 在多个线程请求时引发 AssertionError?

Why Thrift raises AssertionError when requested by multiple threads?

我正在构建一种使用节俭协议的模拟器。 但是当我执行我的虚拟设备的多个线程发送消息时,程序在接收消息后短时间中断,认为缓冲区过载或类似的东西,或者没有,所以如果可能的话我在这里寻求一些帮助。

这是我的代码的主要部分

A class 线程:

class ThreadManager (threading.Thread):    
   def __init__(self, name, obj, client, layout):
      threading.Thread.__init__(self)
      self.name = name
      self.obj = obj
      self.client = client
      self.layout = layout


   def run(self):
       print ("Starting " + self.name)           
       while True:
           sleep(2)
           self.obj.auto_gen_msg(self.client, layout=self.layout)

消息生成方法:

def auto_gen_msg(self, client, layout='',  min_delay=15, max_delay=30):

        if not layout:
            msg = self.gen_message(self.draw_random_model())
        else:
            msg = self.gen_message(layout)
        wait = randint(min_delay, max_delay)
        sleep(wait)

        print(self.eqp_type, " delivered a message ...")
        getattr(client, msg[0])(*msg[1])

主要:

def start(layout, equipment, number):

    try:
        host = 'localhost'

        transport = TSocket.TSocket(host, port=9090)

        transport = TTransport.TBufferedTransport(transport)

        protocol = TCompactProtocol.TCompactProtocol(transport)

        client = SuiteService.Client(protocol)

        transport.open()

        equips = [Equipment(equipment) for i in range(number)]

        threads = [ThreadManager(i.eqp_type, i, client, layout) for i in equips]

        for i in range(len(threads)):
            threads[i].start()
            sleep(2)

        while True:
            pass


        transport.close() 

    except Thrift.TException as tx:
        print ("%s " % (tx.message))

困扰我的错误:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/lem4fia/Documents/sg/loki/loki-thrift/loki_thrift/loki_thrift/lib/thread_manager.py", line 39, in run
    self.obj.auto_gen_msg(self.client, layout=self.layout)
  File "/Users/lem4fia/Documents/sg/loki/loki-thrift/loki_thrift/loki_thrift/lib/virtual.py", line 281, in auto_gen_msg
    getattr(client, msg[0])(*msg[1])
  File "/Users/lem4fia/Documents/sg/loki/thrift-server/thrift_server/suite/SuiteService.py", line 4895, in v1
    self.send_v1(ir, ts, ch, o1, o2, o3, o4, o5, o6, o7)
  File "/Users/lem4fia/Documents/sg/loki/thrift-server/thrift_server/suite/SuiteService.py", line 4899, in send_v1
    self._oprot.writeMessageBegin('v1', TMessageType.CALL, self._seqid)
  File "/Users/lem4fia/Documents/sg/loki/lokiv/lib/python3.6/site-packages/thrift-0.11.0-py3.6-macosx-10.6-intel.egg/thrift/protocol/TCompactProtocol.py", line 156, in writeMessageBegin
    assert self.state == CLEAR
AssertionError

奇怪的是,如果在线程中实例化 2 个虚拟设备不会出错,但是 10 个虚拟设备(有时少于此数量)就足以引发此错误。

有人可以给我一盏灯吗? :)

这里的问题是您似乎必须为每个线程使用一个不同的传输对象。这可能与Thrift的实现有关!

此处参考:http://grokbase.com/t/thrift/user/134s16ks4m/single-connection-and-multiple-threads

作为一般规则 1),Thrift 不适用于跨多个线程。

至少据我所知,这对于所有当前支持的语言都是正确的。

每个线程一个实例即可。


1) 除了像 TThreadedServerTThreadPoolServer

这样的服务器端的东西