AttributeError: Can't get attribute 'journalerReader' on <module '__mp_main__
AttributeError: Can't get attribute 'journalerReader' on <module '__mp_main__
我试图在python中实现Lmax。我试图在4个进程中处理数据
import disruptor
import multiprocessing
import random
if __name__ == '__main__':
cb = disruptor.CircularBuffer(5)
def receiveWriter():
while(True):
n = random.randint(5,20)
cb.receive(n)
def ReplicatorReader():
while(True):
cb.replicator()
def journalerReader():
while(True):
cb.journaler()
def unmarshallerReader():
while(True):
cb.unmarshaller()
def consumeReader():
while(True):
print(cb.consume())
p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
p1.start()
p0 = multiprocessing.Process(name="p0",target=receiveWriter)
p0.start()
p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
p1.start()
p2 = multiprocessing.Process(name="p2",target=journalerReader)
p2.start()
p3 = multiprocessing.Process(name="p3",target=unmarshallerReader)
p3.start()
p4 = multiprocessing.Process(name="p4",target=consumeReader)
p4.start()
但是我的代码中出现了这个错误:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "<string>", line 1, in <module>
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
exitcode = _main(fd, parent_sentinel)
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 126, in _main
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'unmarshallerReader' on <module '__mp_main__' from 'd:\python\RunDisruptor.py'>
AttributeError: Can't get attribute 'consumeReader' on <module '__mp_main__' from 'd:\python\RunDisruptor.py'>
您的第一个问题是 Process
调用的 target 不能在 if __name__ == '__main__':
块内。但是:
正如我在您之前的 post 中提到的,我看到您可以在多个 processess 之间共享 CircularBuffer
实例的唯一方法是实施 managed class,令人惊讶的是,这并不难做到。但是,当您创建托管 class 并创建该 class 的实例时,您实际上拥有的是对该对象的 proxy 引用。这有两个含义:
- 每个方法调用更像是对由您将启动的管理器创建的特殊服务器进程的远程过程调用,因此比本地方法调用具有更多的开销。
- 如果打印引用,class的
__str__
方法将不会被调用;您将打印代理指针的表示。您可能应该将方法 __str__
重命名为 dump
之类的名称,并在需要实例表示时显式调用它。
您还应该明确地等待正在创建的进程完成,以便管理器服务不会过早关闭,这意味着每个进程都应该分配给一个唯一的变量并具有唯一的名称。
import disruptor
import multiprocessing
from multiprocessing.managers import BaseManager
import random
class CircularBufferManager(BaseManager):
pass
def receiveWriter(cb):
while(True):
n = random.randint(5,20)
cb.receive(n)
def ReplicatorReader(cb):
while(True):
cb.replicator()
def journalerReader(cb):
while(True):
cb.journaler()
def unmarshallerReader(cb):
while(True):
cb.unmarshaller()
def consumeReader(cb):
while(True):
print(cb.consume())
if __name__ == '__main__':
# Create managed class
CircularBufferManager.register('CircularBuffer', disruptor.CircularBuffer)
# create and start manager:
with CircularBufferManager() as manager:
cb = manager.CircularBuffer(5)
p1 = multiprocessing.Process(name="p1", target=ReplicatorReader, args=(cb,))
p1.start()
p0 = multiprocessing.Process(name="p0",target=receiveWriter, args=(cb,))
p0.start()
p1a = multiprocessing.Process(name="p1a",target=ReplicatorReader, args=(cb,))
p1a.start()
p2 = multiprocessing.Process(name="p2",target=journalerReader, args=(cb,))
p2.start()
p3 = multiprocessing.Process(name="p3",target=unmarshallerReader, args=(cb,))
p3.start()
p4 = multiprocessing.Process(name="p4",target=consumeReader, args=(cb,))
p4.start()
p1.join()
p0.join()
p1a.join()
p2.join()
p3.join()
p4.join()
我试图在python中实现Lmax。我试图在4个进程中处理数据
import disruptor
import multiprocessing
import random
if __name__ == '__main__':
cb = disruptor.CircularBuffer(5)
def receiveWriter():
while(True):
n = random.randint(5,20)
cb.receive(n)
def ReplicatorReader():
while(True):
cb.replicator()
def journalerReader():
while(True):
cb.journaler()
def unmarshallerReader():
while(True):
cb.unmarshaller()
def consumeReader():
while(True):
print(cb.consume())
p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
p1.start()
p0 = multiprocessing.Process(name="p0",target=receiveWriter)
p0.start()
p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
p1.start()
p2 = multiprocessing.Process(name="p2",target=journalerReader)
p2.start()
p3 = multiprocessing.Process(name="p3",target=unmarshallerReader)
p3.start()
p4 = multiprocessing.Process(name="p4",target=consumeReader)
p4.start()
但是我的代码中出现了这个错误:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "<string>", line 1, in <module>
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
exitcode = _main(fd, parent_sentinel)
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 126, in _main
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'unmarshallerReader' on <module '__mp_main__' from 'd:\python\RunDisruptor.py'>
AttributeError: Can't get attribute 'consumeReader' on <module '__mp_main__' from 'd:\python\RunDisruptor.py'>
您的第一个问题是 Process
调用的 target 不能在 if __name__ == '__main__':
块内。但是:
正如我在您之前的 post 中提到的,我看到您可以在多个 processess 之间共享 CircularBuffer
实例的唯一方法是实施 managed class,令人惊讶的是,这并不难做到。但是,当您创建托管 class 并创建该 class 的实例时,您实际上拥有的是对该对象的 proxy 引用。这有两个含义:
- 每个方法调用更像是对由您将启动的管理器创建的特殊服务器进程的远程过程调用,因此比本地方法调用具有更多的开销。
- 如果打印引用,class的
__str__
方法将不会被调用;您将打印代理指针的表示。您可能应该将方法__str__
重命名为dump
之类的名称,并在需要实例表示时显式调用它。
您还应该明确地等待正在创建的进程完成,以便管理器服务不会过早关闭,这意味着每个进程都应该分配给一个唯一的变量并具有唯一的名称。
import disruptor
import multiprocessing
from multiprocessing.managers import BaseManager
import random
class CircularBufferManager(BaseManager):
pass
def receiveWriter(cb):
while(True):
n = random.randint(5,20)
cb.receive(n)
def ReplicatorReader(cb):
while(True):
cb.replicator()
def journalerReader(cb):
while(True):
cb.journaler()
def unmarshallerReader(cb):
while(True):
cb.unmarshaller()
def consumeReader(cb):
while(True):
print(cb.consume())
if __name__ == '__main__':
# Create managed class
CircularBufferManager.register('CircularBuffer', disruptor.CircularBuffer)
# create and start manager:
with CircularBufferManager() as manager:
cb = manager.CircularBuffer(5)
p1 = multiprocessing.Process(name="p1", target=ReplicatorReader, args=(cb,))
p1.start()
p0 = multiprocessing.Process(name="p0",target=receiveWriter, args=(cb,))
p0.start()
p1a = multiprocessing.Process(name="p1a",target=ReplicatorReader, args=(cb,))
p1a.start()
p2 = multiprocessing.Process(name="p2",target=journalerReader, args=(cb,))
p2.start()
p3 = multiprocessing.Process(name="p3",target=unmarshallerReader, args=(cb,))
p3.start()
p4 = multiprocessing.Process(name="p4",target=consumeReader, args=(cb,))
p4.start()
p1.join()
p0.join()
p1a.join()
p2.join()
p3.join()
p4.join()