通过命名管道将数据发送到另一个已经 运行 python 的进程

Send data to another already running python process via named pipe

我正在尝试寻找通过命名管道将数据发送到 运行ning Python (3.7+) 进程的方法。例如,该过程是向字符串添加热心前缀并打印它。该过程有自己的事情要做,并且 运行 无限期地进行。除了拥有自己的东西之外,它还获得了新的任务。由于前面有多个任务,所以有一个队列:

from queue import Queue 
import time
import tkinter as tk 
import os
import asyncio
import win32pipe, win32file, pywintypes
import sys

q = Queue()

for i in range(5):
    q.put(i) #own tasks

class C_Window():
    def __init__(self, parent=None, windowname='Window'): 
        self.parent = parent
        self.root =  tk.Tk()
        self.sendBox = tk.Text(self.root, height=1, width=30)
        self.sendBox.pack()
        self.buttonCommit=tk.Button(self.root, height=1, width=10, text="Commit", 
                            command=lambda: self.commit_button())
        self.buttonCommit.pack()
        self.root.update()

    def commit_button(self):
        inputValue=self.sendBox.get("1.0","end-1c")
        self.sendBox.delete('1.0', tk.END)
        q.put(inputValue)

    async def async_tk(self):
        while True:
            self.root.update()
            await asyncio.sleep(0.25) 

async def async_main():
    while True:
        if q.empty():
            print ("relaxing")
        else: 
            print ("YEAY! ", q.get())
        await asyncio.sleep(1) 


if __name__ == '__main__':
    window_obj = C_Window()
    windowtask = asyncio.ensure_future(window_obj.async_tk()) # create_task() replaces ensure_future() in 3.7+ 
    maintask = asyncio.ensure_future(async_main()) 
    loop = asyncio.get_event_loop()
    loop.run_forever()

这按预期工作。:队列完成,当通过界面添加内容时它完成,如果队列中没有任何内容,它正在放松。

现在我想通过命名管道从外部向队列添加任务。为此,我制作了一个管道 class:

class C_Pipe():
    def __init__(self): 
        self.pipe = win32pipe.CreateNamedPipe(r'\.\pipe\mypipe',
                                    win32pipe.PIPE_ACCESS_DUPLEX,
                                    win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
                                    1,  # nMaxInstances
                                    65536,  # nOutBufferSize
                                    65536,  # nInBufferSize
                                    0, # 50ms timeout (the default)
                                    None) # securityAttributes

    async def async_pipe(self):
        win32pipe.ConnectNamedPipe(self.pipe)
        while True:
            try:
                msg = win32file.ReadFile(self.pipe, 65536)[1].decode('utf-8')
                print(msg)
                self.main_queue.put(msg)
            except pywintypes.error as e:
                if e.winerror == 109: #no process on other side OR Pipe closed
                    win32pipe.DisconnectNamedPipe(self.pipe)
                    print("Reconnecting pipe")
                    win32pipe.ConnectNamedPipe(self.pipe)
            else:
                raise
            await asyncio.sleep(0.25)   

然后尝试 运行 它:

if __name__ == '__main__':
    window_obj = C_Window()
    windowtask = asyncio.ensure_future(window_obj.async_tk()) 
    maintask = asyncio.ensure_future(async_main()) 
    pipe_obj = C_Pipe()
    pipetask = asyncio.ensure_future(pipe_obj.async_pipe()) 
    loop = asyncio.get_event_loop()
    loop.run_forever()

这是行不通的。一旦它是管道任务,它就会在阅读时阻塞,一切都会冻结。这就是为什么我试图把它放在一个单独的线程中:

if __name__ == '__main__':

    loop2 = asyncio.new_event_loop()
    pipe_obj = C_Pipe()
    pipetask = asyncio.run_coroutine_threadsafe(pipe_obj.async_pipe(),loop2) 

    loop = asyncio.get_event_loop()
    window_obj = C_Window()
    windowtask = asyncio.ensure_future(window_obj.async_tk()) 
    maintask = asyncio.ensure_future(async_main()) 

    loop.run_forever()

但管道收到 1 条消息,然后在没有将数据放入队列的情况下失败。我的问题是:

  1. 有没有办法从外部进程将数据输出到队列中(并摆脱命名管道)?
  2. 有没有办法 运行 命名管道在它自己的线程中,这样它就可以保持阻塞状态?
  3. 这里的 asyncio 真的是正确的选择吗?我已经阅读了很多关于 asynciomultiprocessing 的内容,但我找不到清晰的图片

非常感谢!

  1. Is there a way to run the named pipe in its own thread so it can stay blocked?

这可能是最简单的方法。您的尝试失败了,因为您实际上从未创建过不同的线程。您创建了两个事件循环,运行 只创建了一个。 run_coroutine_threadsafe 的想法是允许非异步线程将作业提交到(单个)异步事件循环。首先,通信依赖于同步API,因此可以保持同步:

    # calling it sync_pipe, since it's no longer async
    def sync_pipe(self, enqueue):
        win32pipe.ConnectNamedPipe(self.pipe)
        while True:
            try:
                msg = win32file.ReadFile(self.pipe, 65536)[1].decode('utf-8')
                print(msg)
                enqueue(msg)
            except pywintypes.error as e:
                if e.winerror == 109: #no process on other side OR Pipe closed
                    win32pipe.DisconnectNamedPipe(self.pipe)
                    print("Reconnecting pipe")
                    win32pipe.ConnectNamedPipe(self.pipe)
            else:
                raise
            time.sleep(0.25)   

请注意我们是如何向它发送一个抽象函数的,该函数定义了如何对某些内容进行排队。有了它,主块看起来像这样:

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    def enqueue(item):
        loop.call_soon_threadsafe(queue.put_nowait, item)
    pipe_obj = C_Pipe()
    pipethread = threading.Thread(target=pipe_obj.sync_pipe, args=(enqueue,))

    window_obj = C_Window()
    windowtask = asyncio.ensure_future(window_obj.async_tk()) 
    maintask = asyncio.ensure_future(async_main(queue)) 

    loop.run_forever()

async_main 现在收到它将与 sync_pipe 共享的队列。