通过命名管道将数据发送到另一个已经 运行 python 的进程
Send data to another already running python process via named pipe
我正在尝试寻找通过命名管道将数据发送到 运行ning Python (3.7+) 进程的方法。例如,该过程是向字符串添加热心前缀并打印它。该过程有自己的事情要做,并且 运行 无限期地进行。除了拥有自己的东西之外,它还获得了新的任务。由于前面有多个任务,所以有一个队列:
from queue import Queue
import time
import tkinter as tk
import os
import asyncio
import win32pipe, win32file, pywintypes
import sys
q = Queue()
for i in range(5):
q.put(i) #own tasks
class C_Window():
def __init__(self, parent=None, windowname='Window'):
self.parent = parent
self.root = tk.Tk()
self.sendBox = tk.Text(self.root, height=1, width=30)
self.sendBox.pack()
self.buttonCommit=tk.Button(self.root, height=1, width=10, text="Commit",
command=lambda: self.commit_button())
self.buttonCommit.pack()
self.root.update()
def commit_button(self):
inputValue=self.sendBox.get("1.0","end-1c")
self.sendBox.delete('1.0', tk.END)
q.put(inputValue)
async def async_tk(self):
while True:
self.root.update()
await asyncio.sleep(0.25)
async def async_main():
while True:
if q.empty():
print ("relaxing")
else:
print ("YEAY! ", q.get())
await asyncio.sleep(1)
if __name__ == '__main__':
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk()) # create_task() replaces ensure_future() in 3.7+
maintask = asyncio.ensure_future(async_main())
loop = asyncio.get_event_loop()
loop.run_forever()
这按预期工作。:队列完成,当通过界面添加内容时它完成,如果队列中没有任何内容,它正在放松。
现在我想通过命名管道从外部向队列添加任务。为此,我制作了一个管道 class:
class C_Pipe():
def __init__(self):
self.pipe = win32pipe.CreateNamedPipe(r'\.\pipe\mypipe',
win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
1, # nMaxInstances
65536, # nOutBufferSize
65536, # nInBufferSize
0, # 50ms timeout (the default)
None) # securityAttributes
async def async_pipe(self):
win32pipe.ConnectNamedPipe(self.pipe)
while True:
try:
msg = win32file.ReadFile(self.pipe, 65536)[1].decode('utf-8')
print(msg)
self.main_queue.put(msg)
except pywintypes.error as e:
if e.winerror == 109: #no process on other side OR Pipe closed
win32pipe.DisconnectNamedPipe(self.pipe)
print("Reconnecting pipe")
win32pipe.ConnectNamedPipe(self.pipe)
else:
raise
await asyncio.sleep(0.25)
然后尝试 运行 它:
if __name__ == '__main__':
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk())
maintask = asyncio.ensure_future(async_main())
pipe_obj = C_Pipe()
pipetask = asyncio.ensure_future(pipe_obj.async_pipe())
loop = asyncio.get_event_loop()
loop.run_forever()
这是行不通的。一旦它是管道任务,它就会在阅读时阻塞,一切都会冻结。这就是为什么我试图把它放在一个单独的线程中:
if __name__ == '__main__':
loop2 = asyncio.new_event_loop()
pipe_obj = C_Pipe()
pipetask = asyncio.run_coroutine_threadsafe(pipe_obj.async_pipe(),loop2)
loop = asyncio.get_event_loop()
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk())
maintask = asyncio.ensure_future(async_main())
loop.run_forever()
但管道收到 1 条消息,然后在没有将数据放入队列的情况下失败。我的问题是:
- 有没有办法从外部进程将数据输出到队列中(并摆脱命名管道)?
- 有没有办法 运行 命名管道在它自己的线程中,这样它就可以保持阻塞状态?
- 这里的 asyncio 真的是正确的选择吗?我已经阅读了很多关于
asyncio
和 multiprocessing
的内容,但我找不到清晰的图片
非常感谢!
- Is there a way to run the named pipe in its own thread so it can stay blocked?
这可能是最简单的方法。您的尝试失败了,因为您实际上从未创建过不同的线程。您创建了两个事件循环,运行 只创建了一个。 run_coroutine_threadsafe
的想法是允许非异步线程将作业提交到(单个)异步事件循环。首先,通信依赖于同步API,因此可以保持同步:
# calling it sync_pipe, since it's no longer async
def sync_pipe(self, enqueue):
win32pipe.ConnectNamedPipe(self.pipe)
while True:
try:
msg = win32file.ReadFile(self.pipe, 65536)[1].decode('utf-8')
print(msg)
enqueue(msg)
except pywintypes.error as e:
if e.winerror == 109: #no process on other side OR Pipe closed
win32pipe.DisconnectNamedPipe(self.pipe)
print("Reconnecting pipe")
win32pipe.ConnectNamedPipe(self.pipe)
else:
raise
time.sleep(0.25)
请注意我们是如何向它发送一个抽象函数的,该函数定义了如何对某些内容进行排队。有了它,主块看起来像这样:
if __name__ == '__main__':
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
def enqueue(item):
loop.call_soon_threadsafe(queue.put_nowait, item)
pipe_obj = C_Pipe()
pipethread = threading.Thread(target=pipe_obj.sync_pipe, args=(enqueue,))
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk())
maintask = asyncio.ensure_future(async_main(queue))
loop.run_forever()
async_main
现在收到它将与 sync_pipe
共享的队列。
我正在尝试寻找通过命名管道将数据发送到 运行ning Python (3.7+) 进程的方法。例如,该过程是向字符串添加热心前缀并打印它。该过程有自己的事情要做,并且 运行 无限期地进行。除了拥有自己的东西之外,它还获得了新的任务。由于前面有多个任务,所以有一个队列:
from queue import Queue
import time
import tkinter as tk
import os
import asyncio
import win32pipe, win32file, pywintypes
import sys
q = Queue()
for i in range(5):
q.put(i) #own tasks
class C_Window():
def __init__(self, parent=None, windowname='Window'):
self.parent = parent
self.root = tk.Tk()
self.sendBox = tk.Text(self.root, height=1, width=30)
self.sendBox.pack()
self.buttonCommit=tk.Button(self.root, height=1, width=10, text="Commit",
command=lambda: self.commit_button())
self.buttonCommit.pack()
self.root.update()
def commit_button(self):
inputValue=self.sendBox.get("1.0","end-1c")
self.sendBox.delete('1.0', tk.END)
q.put(inputValue)
async def async_tk(self):
while True:
self.root.update()
await asyncio.sleep(0.25)
async def async_main():
while True:
if q.empty():
print ("relaxing")
else:
print ("YEAY! ", q.get())
await asyncio.sleep(1)
if __name__ == '__main__':
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk()) # create_task() replaces ensure_future() in 3.7+
maintask = asyncio.ensure_future(async_main())
loop = asyncio.get_event_loop()
loop.run_forever()
这按预期工作。:队列完成,当通过界面添加内容时它完成,如果队列中没有任何内容,它正在放松。
现在我想通过命名管道从外部向队列添加任务。为此,我制作了一个管道 class:
class C_Pipe():
def __init__(self):
self.pipe = win32pipe.CreateNamedPipe(r'\.\pipe\mypipe',
win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
1, # nMaxInstances
65536, # nOutBufferSize
65536, # nInBufferSize
0, # 50ms timeout (the default)
None) # securityAttributes
async def async_pipe(self):
win32pipe.ConnectNamedPipe(self.pipe)
while True:
try:
msg = win32file.ReadFile(self.pipe, 65536)[1].decode('utf-8')
print(msg)
self.main_queue.put(msg)
except pywintypes.error as e:
if e.winerror == 109: #no process on other side OR Pipe closed
win32pipe.DisconnectNamedPipe(self.pipe)
print("Reconnecting pipe")
win32pipe.ConnectNamedPipe(self.pipe)
else:
raise
await asyncio.sleep(0.25)
然后尝试 运行 它:
if __name__ == '__main__':
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk())
maintask = asyncio.ensure_future(async_main())
pipe_obj = C_Pipe()
pipetask = asyncio.ensure_future(pipe_obj.async_pipe())
loop = asyncio.get_event_loop()
loop.run_forever()
这是行不通的。一旦它是管道任务,它就会在阅读时阻塞,一切都会冻结。这就是为什么我试图把它放在一个单独的线程中:
if __name__ == '__main__':
loop2 = asyncio.new_event_loop()
pipe_obj = C_Pipe()
pipetask = asyncio.run_coroutine_threadsafe(pipe_obj.async_pipe(),loop2)
loop = asyncio.get_event_loop()
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk())
maintask = asyncio.ensure_future(async_main())
loop.run_forever()
但管道收到 1 条消息,然后在没有将数据放入队列的情况下失败。我的问题是:
- 有没有办法从外部进程将数据输出到队列中(并摆脱命名管道)?
- 有没有办法 运行 命名管道在它自己的线程中,这样它就可以保持阻塞状态?
- 这里的 asyncio 真的是正确的选择吗?我已经阅读了很多关于
asyncio
和multiprocessing
的内容,但我找不到清晰的图片
非常感谢!
- Is there a way to run the named pipe in its own thread so it can stay blocked?
这可能是最简单的方法。您的尝试失败了,因为您实际上从未创建过不同的线程。您创建了两个事件循环,运行 只创建了一个。 run_coroutine_threadsafe
的想法是允许非异步线程将作业提交到(单个)异步事件循环。首先,通信依赖于同步API,因此可以保持同步:
# calling it sync_pipe, since it's no longer async
def sync_pipe(self, enqueue):
win32pipe.ConnectNamedPipe(self.pipe)
while True:
try:
msg = win32file.ReadFile(self.pipe, 65536)[1].decode('utf-8')
print(msg)
enqueue(msg)
except pywintypes.error as e:
if e.winerror == 109: #no process on other side OR Pipe closed
win32pipe.DisconnectNamedPipe(self.pipe)
print("Reconnecting pipe")
win32pipe.ConnectNamedPipe(self.pipe)
else:
raise
time.sleep(0.25)
请注意我们是如何向它发送一个抽象函数的,该函数定义了如何对某些内容进行排队。有了它,主块看起来像这样:
if __name__ == '__main__':
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
def enqueue(item):
loop.call_soon_threadsafe(queue.put_nowait, item)
pipe_obj = C_Pipe()
pipethread = threading.Thread(target=pipe_obj.sync_pipe, args=(enqueue,))
window_obj = C_Window()
windowtask = asyncio.ensure_future(window_obj.async_tk())
maintask = asyncio.ensure_future(async_main(queue))
loop.run_forever()
async_main
现在收到它将与 sync_pipe
共享的队列。