Python: 检查命名管道是否有数据
Python: Check if named pipe has data
我的 Unix 系统上有一个 Python3 进程总是 运行ning,我希望能够从仅 [=24] 的其他进程通过命名管道随机向它发送数据=] 偶尔。如果命名管道没有数据,我希望我的进程继续做其他事情,所以我需要检查它是否有数据而不阻塞。
我不知道如何在不打开它的情况下进行检查,但打开块除非我设置非阻塞标志。如果我设置了标志,如果我碰巧在读取之前或期间写入管道,它就会崩溃。
这是我做过的最好的事情:
import os
fifo = "pipe_test.fifo"
done = False
fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
while not done:
try:
s = os.read(fd, 1024) # buffer size may need tweaking
print(s)
done = True
except BlockingIOError as e:
pass
os.close(fd)
如果管道中没有数据,我会收到 b""
,然后退出。如果管道中有数据,它会得到一次异常,重试,然后获取数据。好像我做错了什么,可能 运行 进入奇怪的竞争条件。有更好的方法吗?
如果您可以更改客户端代码,我不会使用命名管道,而是 UNIX domain sockets,因为它们支持数据报:
import errno, fcntl, os, socket
服务器:
# bind socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.bind('pipe_test.fifo')
# set socket non-blocking
fcntl.fcntl(sock.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
# get a datagram
try:
datagram = sock.recv(1024)
except (OSError, socket.error) as ex:
if ex.errno not in (errno.EINTR, errno.EAGAIN):
raise
else:
print('Datagram: %r' % datagram)
客户:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.sendto('Hello!', 'pipe_test.fifo')
但您可能想查看 multithreading
而不是使用非阻塞套接字。
这不是真正的答案,但如果它对任何人都有用,下面是我在另一个线程中的做法。
class QueryThread(threading.Thread):
def __init__(self, args=(), kwargs=None):
threading.Thread.__init__(self, args=(), kwargs=None)
self.daemon = True
self.buf = []
if not general.f_exists("pipe"):
os.mkfifo("pipe")
def run(self):
f = open("pipe")
while True:
try:
query = next(f).replace("\n", "")
if query != "":
self.buf.append(query)
print("Read in new query from pipe: {}, buf = {}".format(query, self.buf))
except StopIteration: # not a pipe error, just means no data is left, so time to re-open
f.close()
f = open("pipe")
f.close()
def get_query(self):
if len(self.buf) == 0: return ""
query = self.buf[0]
self.buf.__delitem__(0)
return query
它将以换行符分隔的消息保存在缓冲区中。您可以从另一个线程调用 get_query
方法并获取收到的最后一条消息。
我的 Unix 系统上有一个 Python3 进程总是 运行ning,我希望能够从仅 [=24] 的其他进程通过命名管道随机向它发送数据=] 偶尔。如果命名管道没有数据,我希望我的进程继续做其他事情,所以我需要检查它是否有数据而不阻塞。
我不知道如何在不打开它的情况下进行检查,但打开块除非我设置非阻塞标志。如果我设置了标志,如果我碰巧在读取之前或期间写入管道,它就会崩溃。
这是我做过的最好的事情:
import os
fifo = "pipe_test.fifo"
done = False
fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
while not done:
try:
s = os.read(fd, 1024) # buffer size may need tweaking
print(s)
done = True
except BlockingIOError as e:
pass
os.close(fd)
如果管道中没有数据,我会收到 b""
,然后退出。如果管道中有数据,它会得到一次异常,重试,然后获取数据。好像我做错了什么,可能 运行 进入奇怪的竞争条件。有更好的方法吗?
如果您可以更改客户端代码,我不会使用命名管道,而是 UNIX domain sockets,因为它们支持数据报:
import errno, fcntl, os, socket
服务器:
# bind socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.bind('pipe_test.fifo')
# set socket non-blocking
fcntl.fcntl(sock.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
# get a datagram
try:
datagram = sock.recv(1024)
except (OSError, socket.error) as ex:
if ex.errno not in (errno.EINTR, errno.EAGAIN):
raise
else:
print('Datagram: %r' % datagram)
客户:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.sendto('Hello!', 'pipe_test.fifo')
但您可能想查看 multithreading
而不是使用非阻塞套接字。
这不是真正的答案,但如果它对任何人都有用,下面是我在另一个线程中的做法。
class QueryThread(threading.Thread):
def __init__(self, args=(), kwargs=None):
threading.Thread.__init__(self, args=(), kwargs=None)
self.daemon = True
self.buf = []
if not general.f_exists("pipe"):
os.mkfifo("pipe")
def run(self):
f = open("pipe")
while True:
try:
query = next(f).replace("\n", "")
if query != "":
self.buf.append(query)
print("Read in new query from pipe: {}, buf = {}".format(query, self.buf))
except StopIteration: # not a pipe error, just means no data is left, so time to re-open
f.close()
f = open("pipe")
f.close()
def get_query(self):
if len(self.buf) == 0: return ""
query = self.buf[0]
self.buf.__delitem__(0)
return query
它将以换行符分隔的消息保存在缓冲区中。您可以从另一个线程调用 get_query
方法并获取收到的最后一条消息。