多处理管道 send() 块
Multiprocessing Pipe send() blocks
根据 Python 文档,只有 recv()
块而不是 send()
。我写了下面的代码试图制作一个 GUI 数独游戏。我这样做的方式是即使 tkinter
正在执行它的 mainloop
我也可以更新游戏板。但是,在测试 运行 期间,我发现如果我在游戏更新时关闭 window,pipe.send()
就会开始阻塞(我发现使用 CPython探查器。)任何人都可以告诉我为什么,如果可能的话,如何解决这个问题?
产生问题:在脚本更新时关闭弹出窗口 window。也就是说,关闭 window,同时将一些数字打印到控制台。
我的系统:macOS Sierra 10.12.5
import multiprocessing as mp
import threading
import random
import time
try:
import tkinter as tk # Python3
except ImportError:
import Tkinter as tk # Python2
class VisualizedBoard:
def __init__(self,input_string,pipe):
'''input_string: a string has a length of at least 81 that represent the board from top-left to bottom right.
empty cell is 0'''
self.update_scheduled=False
self.pipe=pipe
# create board
self.root = tk.Tk()
self.canvas = tk.Canvas(self.root, width=500, height=500)
self.canvas.create_rectangle(50, 50, 500, 500, width=2)
for i in range(1, 10):
self.canvas.create_text(25 + 50 * i, 30, text=str(i))
self.canvas.create_text(30, 25 + 50 * i, text=str(i))
self.canvas.create_line(50 + 50 * i, 50, 50 + 50 * i, 500, width=2 if i % 3 == 0 else 1)
self.canvas.create_line(50, 50 + 50 * i, 500, 50 + 50 * i, width=2 if i % 3 == 0 else 1)
for i in range(81):
if input_string[i] != '0':
self.canvas.create_text(75 + 50 * (i // 9), 75 + 50 * (i % 9), tags=str((i//9+1,i%9+1)).replace(' ',''),text=input_string[i], fill='black')
self.canvas.pack()
self.root.attributes('-topmost', True)
self.root.geometry('550x550+%d+%d' % ((self.root.winfo_screenwidth() - 550) // 2, (self.root.winfo_screenheight() - 550) // 2))
self.root.wm_protocol('WM_DELETE_WINDOW',lambda :(self.root.destroy()))
threading.Thread(target=self.listen, args=()).start()
self.root.mainloop()
def update(self,coordinate,value,color='magenta'):
"""
:parameter coordinate: a tuple (x,y)
:parameter value: single digit
:returns: None
"""
try:
assert isinstance(coordinate,tuple)
except AssertionError:
print('Update Failed. Coordinate should be a tuple (x,y)')
coordinate_tag=str(coordinate).replace(' ','')
self.canvas.delete(coordinate_tag)
if value != 0 and value != '0':
self.canvas.create_text(25+50*coordinate[0],25+50*coordinate[1],tags=coordinate_tag,text=str(value),fill=color)
self.postponed_update()
#self.canvas.update()
def postponed_update(self):
if not self.update_scheduled:
self.canvas.after(50,self.scheduled_update)
self.update_scheduled=True
def scheduled_update(self):
self.canvas.update()
self.update_scheduled=False
def new_board(self,input_string):
self.root.destroy()
return VisualizedBoard(input_string,self.pipe)
def listen(self):
try:
while True:
msg=self.pipe.recv()
self.update(*msg)
except EOFError:
self.pipe.close()
tk.Label(self.root,text='Connection to the main script has been closed.\nIt is safe to close this window now.').pack()
except Exception as m:
self.pipe.close()
print('Error during listing:',m)
class BoardConnection:
def __init__(self,input_string):
ctx = mp.get_context('spawn')
self.receive,self.pipe=ctx.Pipe(False)
self.process=ctx.Process(target=VisualizedBoard,args=(input_string,self.receive))
self.process.start()
def update(self,coordinate,value,color='magenta'):
"""
:parameter coordinate: a tuple (x,y)
:parameter value: single digit
:returns: None
"""
self.pipe.send((coordinate,value,color))
def close(self):
self.pipe.close()
self.process.terminate()
if __name__ == "__main__":
b=BoardConnection('000000000302540000050301070000000004409006005023054790000000050700810000080060009')
start = time.time()
for i in range(5000): #test updating using random numbers
b.update((random.randint(1, 9), random.randint(1, 9)), random.randrange(10))
print(i)
print(time.time() - start)
Python Pipe
是对 OS 无名管道的抽象。
一个OS管道通常被实现为内核中一定大小的内存缓冲区。默认情况下,如果缓冲区已满,则对 send/write 的下一次调用将被阻止。
如果您希望能够在没有消费者使用数据的情况下继续发布数据,您应该使用 multiprocessing.Queue
或 asyncio
设施。
multiprocessing.Queue
使用一个"limitless" 缓冲区和一个线程将数据推入OS 管道。如果管道已满,调用者将继续 运行,因为发布的数据将堆积在 Queue
缓冲区中。
IIRC,asyncio 设置管道 O_NONBLOCK
标志并等待管道被消耗。与 multiprocessing.Queue
.
一样,其他消息存储在 "limitless" 缓冲区中
根据 Python 文档,只有 recv()
块而不是 send()
。我写了下面的代码试图制作一个 GUI 数独游戏。我这样做的方式是即使 tkinter
正在执行它的 mainloop
我也可以更新游戏板。但是,在测试 运行 期间,我发现如果我在游戏更新时关闭 window,pipe.send()
就会开始阻塞(我发现使用 CPython探查器。)任何人都可以告诉我为什么,如果可能的话,如何解决这个问题?
产生问题:在脚本更新时关闭弹出窗口 window。也就是说,关闭 window,同时将一些数字打印到控制台。
我的系统:macOS Sierra 10.12.5
import multiprocessing as mp
import threading
import random
import time
try:
import tkinter as tk # Python3
except ImportError:
import Tkinter as tk # Python2
class VisualizedBoard:
def __init__(self,input_string,pipe):
'''input_string: a string has a length of at least 81 that represent the board from top-left to bottom right.
empty cell is 0'''
self.update_scheduled=False
self.pipe=pipe
# create board
self.root = tk.Tk()
self.canvas = tk.Canvas(self.root, width=500, height=500)
self.canvas.create_rectangle(50, 50, 500, 500, width=2)
for i in range(1, 10):
self.canvas.create_text(25 + 50 * i, 30, text=str(i))
self.canvas.create_text(30, 25 + 50 * i, text=str(i))
self.canvas.create_line(50 + 50 * i, 50, 50 + 50 * i, 500, width=2 if i % 3 == 0 else 1)
self.canvas.create_line(50, 50 + 50 * i, 500, 50 + 50 * i, width=2 if i % 3 == 0 else 1)
for i in range(81):
if input_string[i] != '0':
self.canvas.create_text(75 + 50 * (i // 9), 75 + 50 * (i % 9), tags=str((i//9+1,i%9+1)).replace(' ',''),text=input_string[i], fill='black')
self.canvas.pack()
self.root.attributes('-topmost', True)
self.root.geometry('550x550+%d+%d' % ((self.root.winfo_screenwidth() - 550) // 2, (self.root.winfo_screenheight() - 550) // 2))
self.root.wm_protocol('WM_DELETE_WINDOW',lambda :(self.root.destroy()))
threading.Thread(target=self.listen, args=()).start()
self.root.mainloop()
def update(self,coordinate,value,color='magenta'):
"""
:parameter coordinate: a tuple (x,y)
:parameter value: single digit
:returns: None
"""
try:
assert isinstance(coordinate,tuple)
except AssertionError:
print('Update Failed. Coordinate should be a tuple (x,y)')
coordinate_tag=str(coordinate).replace(' ','')
self.canvas.delete(coordinate_tag)
if value != 0 and value != '0':
self.canvas.create_text(25+50*coordinate[0],25+50*coordinate[1],tags=coordinate_tag,text=str(value),fill=color)
self.postponed_update()
#self.canvas.update()
def postponed_update(self):
if not self.update_scheduled:
self.canvas.after(50,self.scheduled_update)
self.update_scheduled=True
def scheduled_update(self):
self.canvas.update()
self.update_scheduled=False
def new_board(self,input_string):
self.root.destroy()
return VisualizedBoard(input_string,self.pipe)
def listen(self):
try:
while True:
msg=self.pipe.recv()
self.update(*msg)
except EOFError:
self.pipe.close()
tk.Label(self.root,text='Connection to the main script has been closed.\nIt is safe to close this window now.').pack()
except Exception as m:
self.pipe.close()
print('Error during listing:',m)
class BoardConnection:
def __init__(self,input_string):
ctx = mp.get_context('spawn')
self.receive,self.pipe=ctx.Pipe(False)
self.process=ctx.Process(target=VisualizedBoard,args=(input_string,self.receive))
self.process.start()
def update(self,coordinate,value,color='magenta'):
"""
:parameter coordinate: a tuple (x,y)
:parameter value: single digit
:returns: None
"""
self.pipe.send((coordinate,value,color))
def close(self):
self.pipe.close()
self.process.terminate()
if __name__ == "__main__":
b=BoardConnection('000000000302540000050301070000000004409006005023054790000000050700810000080060009')
start = time.time()
for i in range(5000): #test updating using random numbers
b.update((random.randint(1, 9), random.randint(1, 9)), random.randrange(10))
print(i)
print(time.time() - start)
Python Pipe
是对 OS 无名管道的抽象。
一个OS管道通常被实现为内核中一定大小的内存缓冲区。默认情况下,如果缓冲区已满,则对 send/write 的下一次调用将被阻止。
如果您希望能够在没有消费者使用数据的情况下继续发布数据,您应该使用 multiprocessing.Queue
或 asyncio
设施。
multiprocessing.Queue
使用一个"limitless" 缓冲区和一个线程将数据推入OS 管道。如果管道已满,调用者将继续 运行,因为发布的数据将堆积在 Queue
缓冲区中。
IIRC,asyncio 设置管道 O_NONBLOCK
标志并等待管道被消耗。与 multiprocessing.Queue
.