在 Python 3.7 中使用和重叠命名管道
Using and Overlapped Named Pipe in Python 3.7
我正在使用 Windows 和 Python 3.7,我正在尝试在 python 进程之间异步共享数据和字符串。其中一个是 运行ning 无限期(接收方),另一个可能在任何时候开始发送一些数据然后结束(发送方)。我正在尝试为此使用命名管道。
当他们 运行 同步时我设法得到了这个(接收者在阻塞的管道上等待直到他获得数据),但是接收者有其他事情要做所以理想情况下他不应该一直等待.也可能在某个时候有第二个发送者,所以阻塞的管道不是很好。
接收者的代码是:
import os
import time
import sys
import win32pipe, win32file, pywintypes
pipe_name = r'\.\pipe\mypipe'
pipe = win32pipe.CreateNamedPipe(
pipe_name,
win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED, # open mode
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT, # pipe mode
1, 65536, 65536, # max instances, out buffer size, in buffer size
0, # timeout
None)
while 1:
print("doing my other stuff")
try:
win32pipe.ConnectNamedPipe(pipe, pywintypes.OVERLAPPED())
except pywintypes.error as e:
if e.winerror == 232: #disconnected pipe
win32pipe.DisconnectNamedPipe(pipe)
print("Disconnecting pipe")
else:
print(e)
try:
retval, msg = win32file.ReadFile(pipe, 0, pywintypes.OVERLAPPED())
print(msg)
except pywintypes.error as e:
if e.winerror == 536: #Wating for connection
print("waiting")
elif e.winerror == 233: #no process on other side
continue
time.sleep(1)
发件人的代码是:
import os
import time
import sys
import win32pipe, win32file, pywintypes
pipe_name = r'\.\pipe\mypipe'
for x in range(5):
handle = win32file.CreateFile(
pipe_name,
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0,
None,
win32file.OPEN_EXISTING,
win32file.FILE_FLAG_OVERLAPPED,
None
)
res = win32pipe.SetNamedPipeHandleState(handle, win32pipe.PIPE_READMODE_MESSAGE, None, None)
print(f"sending {x}")
win32file.WriteFile(handle, str.encode(f"hello world {x}"))
win32file.CloseHandle(handle)
time.sleep(2)
现在两者都可以 运行 并且有一些连接,但我无法真正获取数据。如果发送了一些东西,接收者可以做其他事情并断开连接并重新打开管道,但 msg
最终为空。如果我在调试器中停止它并发送一些东西 msg
的值得到 "memory at 0x0......." 我会将其解释为某种指针,但是你可能已经注意到我对管道的理解是有限的。
Here I found a good example for a synchronos pipe which worked. I changed the the creation of the pipe to the reciever but that wasn't to hard. I found some examples for asynchronous (overlapped) pipes here,这也很棒,但给我留下了我现在面临的问题。
从重叠的管道中读取数据仍然是 win32file.ReadFile
的任务,还是我缺少的其他东西?
非常感谢!
我找到了解决方案并想分享它,以防其他人遇到这个问题。事实证明 msg
得到 "memory at 0x0......." 是一个 memoryview 对象,它的数据可以通过 bytes(msg)
.
公开
我的 ReadFile
命令也有问题,因为缓冲区必须 >0 才能实现任何功能。现在它单独读取每个字节并将它们添加到一个字符串中。这可能不是很好的性能,但它对我有用,它解决了如果消息短于缓冲区长度则必须结束的问题。
msg = ''
rtnvalue, data = win32file.ReadFile(pipe, 1, pywintypes.OVERLAPPED())
while rtnvalue == 234:
msg = msg + bytes(data).decode('ASCII')
rtnvalue, data = win32file.ReadFile(pipe, 1, pywintypes.OVERLAPPED())
if rtnvalue == 0: #end of stream is reached
msg = msg + bytes(data).decode('ASCII')
return msg
发布者的代码和接受的答案不处理 ERROR_IO_PENDING。不幸的是,我在其他地方找不到好的 Python 示例,所以我花了一些时间来解决它。
针对您的环境进行修改,连接到您的管道服务器,然后调用 receive()。它将 return 来自管道的完整消息,或 None.
如果您想观察代码的工作原理,请将 read_buf 的大小设置为较小的值(例如 64 字节)并添加一些日志记录语句。
同样的代码应该同样适用于文件和套接字(经过精简修改)。
import win32file
import win32pipe
import winerror
import pywintypes
class pipe_comms:
def __init__(self):
self.pipe_open = False
self.pipe_handle = None
self.pipe_name = r”\.\pipe\your_pipe”
self.read_msg = None
self.overlapped = None
self.read_buf = win32file.AllocateReadBuffer(4096)
def connect(self):
if not self.pipe_open:
try:
self.pipe_handle = win32file.CreateFile(self.pipe_name
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0, None, win32file.OPEN_EXISTING,
win32file.FILE_FLAG_OVERLAPPED, None)
win32pipe.SetNamedPipeHandleState(self.pipe_handle,
win32pipe.PIPE_READMODE_MESSAGE, None, None)
self.pipe_open = True
self.read_msg = None
except pywintypes.error as e:
self.handle_error(e.args[0])
return False
return True
def receive(self):
try:
result = self.receive_overlapped()
if result == winerror.ERROR_SUCCESS:
fullMsg = self.read_msg
self.read_msg = None
return fullMsg
elif result == winerror.ERROR_IO_PENDING:
return None
else:
self.handle_error(result)
except pywintypes.error as e:
self.handle_error(e.args[0])
return None
def receive_overlapped(self):
if self.overlapped:
try:
bytes_read = win32file.GetOverlappedResult(self.pipe_handle, self.overlapped, 0)
self.read_msg += bytes(self.read_buf[:bytes_read])
self.overlapped = None
return winerror.ERROR_SUCCESS
except pywintypes.error as e:
result = e.args[0]
if result = winerror.ERROR_MORE_DATA:
bytes_read = len(self.read_buf)
self.read_msg += bytes(self.read_buf[:bytes_read])
# fall thru to issue new ReadFile
else:
# ERROR_IO_PENDING is normal, anything else is not
return result
else:
self.read_msg = bytes()
self.overlapped = pywintypes.OVERLAPPED()
result, data = win32file.ReadFile(self.pipe_handle, self.read_buf, self.overlapped)
while True:
if result = winerror.ERROR_MORE_DATA:
bytes_read = len(pipe_data)
self.read_msg = bytes(data[:bytes_read])
result, data = win32file.ReadFile(self.pipe_handle, self.read_buf, self.overlapped)
continue
elif result == winerror.ERROR_SUCCESS:
bytes_read = win32file.GetOverlappedResult(self.pipe_handle, self.overlapped, 0)
self.read_msg = bytes(data[:bytes_read])
self.overlapped = None
return result
def handle_error(self, result):
reset_pipe = False
if result == winerror.ERROR_BROKEN_PIPE:
win32pipe.DisconnectNamedPipe(self.pipe_handle)
reset_pipe = True
elif result == winerror.ERROR_NO_DATA:
reset_pipe = True
if reset_pipe:
self.pipe_handle = None
self.pipe_open = False
self.read_msg = None
self.overlapped = None
我正在使用 Windows 和 Python 3.7,我正在尝试在 python 进程之间异步共享数据和字符串。其中一个是 运行ning 无限期(接收方),另一个可能在任何时候开始发送一些数据然后结束(发送方)。我正在尝试为此使用命名管道。
当他们 运行 同步时我设法得到了这个(接收者在阻塞的管道上等待直到他获得数据),但是接收者有其他事情要做所以理想情况下他不应该一直等待.也可能在某个时候有第二个发送者,所以阻塞的管道不是很好。
接收者的代码是:
import os
import time
import sys
import win32pipe, win32file, pywintypes
pipe_name = r'\.\pipe\mypipe'
pipe = win32pipe.CreateNamedPipe(
pipe_name,
win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED, # open mode
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT, # pipe mode
1, 65536, 65536, # max instances, out buffer size, in buffer size
0, # timeout
None)
while 1:
print("doing my other stuff")
try:
win32pipe.ConnectNamedPipe(pipe, pywintypes.OVERLAPPED())
except pywintypes.error as e:
if e.winerror == 232: #disconnected pipe
win32pipe.DisconnectNamedPipe(pipe)
print("Disconnecting pipe")
else:
print(e)
try:
retval, msg = win32file.ReadFile(pipe, 0, pywintypes.OVERLAPPED())
print(msg)
except pywintypes.error as e:
if e.winerror == 536: #Wating for connection
print("waiting")
elif e.winerror == 233: #no process on other side
continue
time.sleep(1)
发件人的代码是:
import os
import time
import sys
import win32pipe, win32file, pywintypes
pipe_name = r'\.\pipe\mypipe'
for x in range(5):
handle = win32file.CreateFile(
pipe_name,
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0,
None,
win32file.OPEN_EXISTING,
win32file.FILE_FLAG_OVERLAPPED,
None
)
res = win32pipe.SetNamedPipeHandleState(handle, win32pipe.PIPE_READMODE_MESSAGE, None, None)
print(f"sending {x}")
win32file.WriteFile(handle, str.encode(f"hello world {x}"))
win32file.CloseHandle(handle)
time.sleep(2)
现在两者都可以 运行 并且有一些连接,但我无法真正获取数据。如果发送了一些东西,接收者可以做其他事情并断开连接并重新打开管道,但 msg
最终为空。如果我在调试器中停止它并发送一些东西 msg
的值得到 "memory at 0x0......." 我会将其解释为某种指针,但是你可能已经注意到我对管道的理解是有限的。
Here I found a good example for a synchronos pipe which worked. I changed the the creation of the pipe to the reciever but that wasn't to hard. I found some examples for asynchronous (overlapped) pipes here,这也很棒,但给我留下了我现在面临的问题。
从重叠的管道中读取数据仍然是 win32file.ReadFile
的任务,还是我缺少的其他东西?
非常感谢!
我找到了解决方案并想分享它,以防其他人遇到这个问题。事实证明 msg
得到 "memory at 0x0......." 是一个 memoryview 对象,它的数据可以通过 bytes(msg)
.
我的 ReadFile
命令也有问题,因为缓冲区必须 >0 才能实现任何功能。现在它单独读取每个字节并将它们添加到一个字符串中。这可能不是很好的性能,但它对我有用,它解决了如果消息短于缓冲区长度则必须结束的问题。
msg = ''
rtnvalue, data = win32file.ReadFile(pipe, 1, pywintypes.OVERLAPPED())
while rtnvalue == 234:
msg = msg + bytes(data).decode('ASCII')
rtnvalue, data = win32file.ReadFile(pipe, 1, pywintypes.OVERLAPPED())
if rtnvalue == 0: #end of stream is reached
msg = msg + bytes(data).decode('ASCII')
return msg
发布者的代码和接受的答案不处理 ERROR_IO_PENDING。不幸的是,我在其他地方找不到好的 Python 示例,所以我花了一些时间来解决它。
针对您的环境进行修改,连接到您的管道服务器,然后调用 receive()。它将 return 来自管道的完整消息,或 None.
如果您想观察代码的工作原理,请将 read_buf 的大小设置为较小的值(例如 64 字节)并添加一些日志记录语句。
同样的代码应该同样适用于文件和套接字(经过精简修改)。
import win32file
import win32pipe
import winerror
import pywintypes
class pipe_comms:
def __init__(self):
self.pipe_open = False
self.pipe_handle = None
self.pipe_name = r”\.\pipe\your_pipe”
self.read_msg = None
self.overlapped = None
self.read_buf = win32file.AllocateReadBuffer(4096)
def connect(self):
if not self.pipe_open:
try:
self.pipe_handle = win32file.CreateFile(self.pipe_name
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0, None, win32file.OPEN_EXISTING,
win32file.FILE_FLAG_OVERLAPPED, None)
win32pipe.SetNamedPipeHandleState(self.pipe_handle,
win32pipe.PIPE_READMODE_MESSAGE, None, None)
self.pipe_open = True
self.read_msg = None
except pywintypes.error as e:
self.handle_error(e.args[0])
return False
return True
def receive(self):
try:
result = self.receive_overlapped()
if result == winerror.ERROR_SUCCESS:
fullMsg = self.read_msg
self.read_msg = None
return fullMsg
elif result == winerror.ERROR_IO_PENDING:
return None
else:
self.handle_error(result)
except pywintypes.error as e:
self.handle_error(e.args[0])
return None
def receive_overlapped(self):
if self.overlapped:
try:
bytes_read = win32file.GetOverlappedResult(self.pipe_handle, self.overlapped, 0)
self.read_msg += bytes(self.read_buf[:bytes_read])
self.overlapped = None
return winerror.ERROR_SUCCESS
except pywintypes.error as e:
result = e.args[0]
if result = winerror.ERROR_MORE_DATA:
bytes_read = len(self.read_buf)
self.read_msg += bytes(self.read_buf[:bytes_read])
# fall thru to issue new ReadFile
else:
# ERROR_IO_PENDING is normal, anything else is not
return result
else:
self.read_msg = bytes()
self.overlapped = pywintypes.OVERLAPPED()
result, data = win32file.ReadFile(self.pipe_handle, self.read_buf, self.overlapped)
while True:
if result = winerror.ERROR_MORE_DATA:
bytes_read = len(pipe_data)
self.read_msg = bytes(data[:bytes_read])
result, data = win32file.ReadFile(self.pipe_handle, self.read_buf, self.overlapped)
continue
elif result == winerror.ERROR_SUCCESS:
bytes_read = win32file.GetOverlappedResult(self.pipe_handle, self.overlapped, 0)
self.read_msg = bytes(data[:bytes_read])
self.overlapped = None
return result
def handle_error(self, result):
reset_pipe = False
if result == winerror.ERROR_BROKEN_PIPE:
win32pipe.DisconnectNamedPipe(self.pipe_handle)
reset_pipe = True
elif result == winerror.ERROR_NO_DATA:
reset_pipe = True
if reset_pipe:
self.pipe_handle = None
self.pipe_open = False
self.read_msg = None
self.overlapped = None