非阻塞fifo
Nonblocking fifo
我如何在两个 python 进程之间创建一个 fifo,如果 reader 无法处理输入,允许丢弃行?
- 如果 reader 尝试
read
或 readline
比作者写入的速度快,它应该阻止。
- 如果reader不能像作者写的那么快,作者不应该阻止。不应缓冲行(一次一行除外),并且 reader 在其下一次
readline
尝试时仅应接收最后一行。
这是否可以通过命名的 fifo 实现,或者是否有任何其他简单的方法来实现这一点?
好吧,据我所知,这实际上不是 FIFO(队列)- 它是单个变量。我想如果你设置一个最大大小为 1 的队列或管道它可能是可以实现的,但似乎使用 Lock
on a single object in one of the processes, which the other process references via a proxy object 会更好。 reader每次读取都会设置为None
,写入器每次写入都会覆盖内容。
您可以通过将对象的代理和锁的代理作为所有相关进程的参数传递给其他进程。为了更方便地获取它,您可以使用 Manager
, which provides a single object with proxy that you can pass in, which contains and provides proxies for whatever other objects (including locks) you want to put in it. This answer 提供了一个有用的示例,该示例正确使用管理器将对象传递到新进程中。
以下代码使用命名的 FIFO 允许两个脚本之间进行通信。
- 如果 reader 试图
read
比作者更快,它会阻塞。
- 如果reader跟不上作者,作者不会阻止。
- 操作是面向缓冲区的。当前未实现面向行的操作。
- 此代码应被视为概念验证。延迟和缓冲区大小是任意的。
代码
import argparse
import errno
import os
from select import select
import time
class OneFifo(object):
def __init__(self, name):
self.name = name
def __enter__(self):
if os.path.exists(self.name):
os.unlink(self.name)
os.mkfifo(self.name)
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
if os.path.exists(self.name):
os.unlink(self.name)
def write(self, data):
print "Waiting for client to open FIFO..."
try:
server_file = os.open(self.name, os.O_WRONLY | os.O_NONBLOCK)
except OSError as exc:
if exc.errno == errno.ENXIO:
server_file = None
else:
raise
if server_file is not None:
print "Writing line to FIFO..."
try:
os.write(server_file, data)
print "Done."
except OSError as exc:
if exc.errno == errno.EPIPE:
pass
else:
raise
os.close(server_file)
def read_nonblocking(self):
result = None
try:
client_file = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)
except OSError as exc:
if exc.errno == errno.ENOENT:
client_file = None
else:
raise
if client_file is not None:
try:
rlist = [client_file]
wlist = []
xlist = []
rlist, wlist, xlist = select(rlist, wlist, xlist, 0.01)
if client_file in rlist:
result = os.read(client_file, 1024)
except OSError as exc:
if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK:
result = None
else:
raise
os.close(client_file)
return result
def read(self):
try:
with open(self.name, 'r') as client_file:
result = client_file.read()
except OSError as exc:
if exc.errno == errno.ENOENT:
result = None
else:
raise
if not len(result):
result = None
return result
def parse_argument():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--client', action='store_true',
help='Set this flag for the client')
parser.add_argument('-n', '--non-blocking', action='store_true',
help='Set this flag to read without blocking')
result = parser.parse_args()
return result
if __name__ == '__main__':
args = parse_argument()
if not args.client:
with OneFifo('known_name') as one_fifo:
while True:
one_fifo.write('one line')
time.sleep(0.1)
else:
one_fifo = OneFifo('known_name')
while True:
if args.non_blocking:
result = one_fifo.read_nonblocking()
else:
result = one_fifo.read()
if result is not None:
print result
server
检查client
是否打开了FIFO。如果client
打开了FIFO,server
写入一行。否则,server
继续 运行。我已经实现了非阻塞读取,因为阻塞读取会导致问题:如果 server
重新启动,大多数时候 client
会保持阻塞状态并且永远不会恢复。使用非阻塞 client
,更容易容忍 server
重启。
输出
[user@machine:~] python onefifo.py
Waiting for client to open FIFO...
Waiting for client to open FIFO...
Writing line to FIFO...
Done.
Waiting for client to open FIFO...
Writing line to FIFO...
Done.
[user@machine:~] python onefifo.py -c
one line
one line
备注
启动时,如果 server
检测到 FIFO 已经存在,则会将其删除。这是通知 clients
server
已重新启动的最简单方法。 client
.
的阻塞版本通常会忽略此通知
我如何在两个 python 进程之间创建一个 fifo,如果 reader 无法处理输入,允许丢弃行?
- 如果 reader 尝试
read
或readline
比作者写入的速度快,它应该阻止。 - 如果reader不能像作者写的那么快,作者不应该阻止。不应缓冲行(一次一行除外),并且 reader 在其下一次
readline
尝试时仅应接收最后一行。
这是否可以通过命名的 fifo 实现,或者是否有任何其他简单的方法来实现这一点?
好吧,据我所知,这实际上不是 FIFO(队列)- 它是单个变量。我想如果你设置一个最大大小为 1 的队列或管道它可能是可以实现的,但似乎使用 Lock
on a single object in one of the processes, which the other process references via a proxy object 会更好。 reader每次读取都会设置为None
,写入器每次写入都会覆盖内容。
您可以通过将对象的代理和锁的代理作为所有相关进程的参数传递给其他进程。为了更方便地获取它,您可以使用 Manager
, which provides a single object with proxy that you can pass in, which contains and provides proxies for whatever other objects (including locks) you want to put in it. This answer 提供了一个有用的示例,该示例正确使用管理器将对象传递到新进程中。
以下代码使用命名的 FIFO 允许两个脚本之间进行通信。
- 如果 reader 试图
read
比作者更快,它会阻塞。 - 如果reader跟不上作者,作者不会阻止。
- 操作是面向缓冲区的。当前未实现面向行的操作。
- 此代码应被视为概念验证。延迟和缓冲区大小是任意的。
代码
import argparse
import errno
import os
from select import select
import time
class OneFifo(object):
def __init__(self, name):
self.name = name
def __enter__(self):
if os.path.exists(self.name):
os.unlink(self.name)
os.mkfifo(self.name)
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
if os.path.exists(self.name):
os.unlink(self.name)
def write(self, data):
print "Waiting for client to open FIFO..."
try:
server_file = os.open(self.name, os.O_WRONLY | os.O_NONBLOCK)
except OSError as exc:
if exc.errno == errno.ENXIO:
server_file = None
else:
raise
if server_file is not None:
print "Writing line to FIFO..."
try:
os.write(server_file, data)
print "Done."
except OSError as exc:
if exc.errno == errno.EPIPE:
pass
else:
raise
os.close(server_file)
def read_nonblocking(self):
result = None
try:
client_file = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)
except OSError as exc:
if exc.errno == errno.ENOENT:
client_file = None
else:
raise
if client_file is not None:
try:
rlist = [client_file]
wlist = []
xlist = []
rlist, wlist, xlist = select(rlist, wlist, xlist, 0.01)
if client_file in rlist:
result = os.read(client_file, 1024)
except OSError as exc:
if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK:
result = None
else:
raise
os.close(client_file)
return result
def read(self):
try:
with open(self.name, 'r') as client_file:
result = client_file.read()
except OSError as exc:
if exc.errno == errno.ENOENT:
result = None
else:
raise
if not len(result):
result = None
return result
def parse_argument():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--client', action='store_true',
help='Set this flag for the client')
parser.add_argument('-n', '--non-blocking', action='store_true',
help='Set this flag to read without blocking')
result = parser.parse_args()
return result
if __name__ == '__main__':
args = parse_argument()
if not args.client:
with OneFifo('known_name') as one_fifo:
while True:
one_fifo.write('one line')
time.sleep(0.1)
else:
one_fifo = OneFifo('known_name')
while True:
if args.non_blocking:
result = one_fifo.read_nonblocking()
else:
result = one_fifo.read()
if result is not None:
print result
server
检查client
是否打开了FIFO。如果client
打开了FIFO,server
写入一行。否则,server
继续 运行。我已经实现了非阻塞读取,因为阻塞读取会导致问题:如果 server
重新启动,大多数时候 client
会保持阻塞状态并且永远不会恢复。使用非阻塞 client
,更容易容忍 server
重启。
输出
[user@machine:~] python onefifo.py
Waiting for client to open FIFO...
Waiting for client to open FIFO...
Writing line to FIFO...
Done.
Waiting for client to open FIFO...
Writing line to FIFO...
Done.
[user@machine:~] python onefifo.py -c
one line
one line
备注
启动时,如果 server
检测到 FIFO 已经存在,则会将其删除。这是通知 clients
server
已重新启动的最简单方法。 client
.