如何从多个线程 read/write 到临时文件
How to read/write to tempfile from multiple threads
上下文:
操作系统:Windows8.1
python --version: Python 2.7.8
我正在尝试 read/write from/to 一个流,并使用一个线程写入该流,另一个线程从中读取已写入的新数据位。我正在使用 tempfile 模块,使用一种方法将二进制数据写入其中,并使用另一种方法从中读取。
在下面的代码中,t1 运行写入,t2 运行读取线程方法。 t3 & t4 exits 使 t1 和 t2 循环退出。
我预期的读取输出是:
READ @0: 0|1|2|3|.........|N --- L 字节读取
READ @L: N|N+1|N+2|......|M --- J 字节读取
READ @L+J: M|M+1|M+3|.....|P --- K 字节读取
READ @L+J+K: --- 0 字节读取(写入线程未写入任何内容)
等等,只要临时文件中有更多数据,它就会被读取和输出,但同时写入必须由另一个线程继续进行,以便写入从流中接收到的数据。
问题:
当我运行它时,我得到的输出是 变化
收到的输出之一:
> python.exe tmp.py
Exception in thread Thread-1:
Traceback (most recent call last):
[truncated output]
File "C:/tmp.py", line 11, in write_temp
READ @ 0 : 264261|975|263976|263977|...[truncated output]...|263972|263
self.myf.write(str(self.current_count)+"|")
IOError: [Errno 0] Error
另一个输出:
> python.exe tmp.py
READ @ 0 : 0|289721|289722|...[truncated output]...289718|28971
Exception in thread Thread-1:
Traceback (most recent call last):
[truncated output]
self.myf.write(str(self.current_count)+"|")
IOError: [Errno 0] Error
其他输出或多或少是上述输出的变体。
我认为 问题是由于读取更改了文件描述符指针,但我认为追加总是写入末尾文件。
源代码
下面是对流的实际代码的抽象,流是从一个子进程stdio读取数据的二进制流,并写入另一个子进程stdio。
import threading, tempfile
class MultipleThreadTIO:
def __init__(self):
self.myf = tempfile.TemporaryFile(mode='a+b')
self.current_count = 0
self.do_write = True
self.do_read = True
def write_temp(self):
while self.do_write:
self.myf.write(str(self.current_count)+"|")
self.current_count += 1
def read_temp(self):
read_at = 0L
while self.do_read:
self.myf.seek(read_at)
d = self.myf.read()
if len(d.strip()) > 0:
print "READ @",read_at,": ", self.myf.read()
read_at = self.myf.tell()
def stop_write(self):
self.do_write = False
def stop_read(self):
self.do_read = False
def __del__(self):
#self.myf.seek(0)
#print ":::DATA CONTENT:::\n"
#print self.myf.read()
#print ":::END DATA CONTENT:::"
self.myf.close()
mtio = MultipleThreadTIO()
t1 = threading.Timer(0.1, mtio.write_temp)
t2 = threading.Timer(0.5, mtio.read_temp)
t3 = threading.Timer(5, mtio.stop_write)
t4 = threading.Timer(3, mtio.stop_read)
t1.start()
t2.start()
t3.start()
t4.start()
问题:
问题 1:以上问题有解决办法吗?
问题 2:我应该使用 queues/os.pipe(/other?) 而不是临时文件吗?
问题 3:对于这种情况还有其他更好的方法吗?
重要:解决方案必须是跨平台的。
这是使用 Queue
, I think this is better if you're using a queue/pipe behaviour. I'm not sure what you're actually writing, so I just added the running counter to the queue. Writing to the queue is stopped using an Event
的抽象,通过发送停止消息停止从队列中读取(但如果您想要更好的控制,可以添加信号):
import threading
import Queue
class MultipleThreadTIO:
def __init__(self):
self.queue = Queue.Queue()
self.current_count = 0
self.stop_write = threading.Event()
def write_temp(self):
while not self.stop_write.isSet():
self.queue.put(str(self.current_count)+"|")
self.current_count += 1
self.stop()
def read_temp(self):
while True:
msg = self.queue.get()
if msg == 'close':
break
else:
print "READ @: " + msg
def stop(self):
self.queue.put('close')
mtio = MultipleThreadTIO()
t1 = threading.Timer(0.1, mtio.write_temp)
t2 = threading.Timer(0.5, mtio.read_temp)
t1.start()
t2.start()
t3 = threading.Timer(5, mtio.stop_write.set)
t3.start()
我无法在 windows 上进行测试,但我认为它应该可以工作,因为这是非常标准的。它在 Ubuntu 14.04 x86_64
上运行
上下文:
操作系统:Windows8.1
python --version: Python 2.7.8
我正在尝试 read/write from/to 一个流,并使用一个线程写入该流,另一个线程从中读取已写入的新数据位。我正在使用 tempfile 模块,使用一种方法将二进制数据写入其中,并使用另一种方法从中读取。
在下面的代码中,t1 运行写入,t2 运行读取线程方法。 t3 & t4 exits 使 t1 和 t2 循环退出。
我预期的读取输出是:
READ @0: 0|1|2|3|.........|N --- L 字节读取 READ @L: N|N+1|N+2|......|M --- J 字节读取 READ @L+J: M|M+1|M+3|.....|P --- K 字节读取 READ @L+J+K: --- 0 字节读取(写入线程未写入任何内容)
等等,只要临时文件中有更多数据,它就会被读取和输出,但同时写入必须由另一个线程继续进行,以便写入从流中接收到的数据。
问题:
当我运行它时,我得到的输出是 变化 收到的输出之一:
> python.exe tmp.py
Exception in thread Thread-1:
Traceback (most recent call last):
[truncated output]
File "C:/tmp.py", line 11, in write_temp
READ @ 0 : 264261|975|263976|263977|...[truncated output]...|263972|263
self.myf.write(str(self.current_count)+"|")
IOError: [Errno 0] Error
另一个输出:
> python.exe tmp.py
READ @ 0 : 0|289721|289722|...[truncated output]...289718|28971
Exception in thread Thread-1:
Traceback (most recent call last):
[truncated output]
self.myf.write(str(self.current_count)+"|")
IOError: [Errno 0] Error
其他输出或多或少是上述输出的变体。
我认为 问题是由于读取更改了文件描述符指针,但我认为追加总是写入末尾文件。
源代码
下面是对流的实际代码的抽象,流是从一个子进程stdio读取数据的二进制流,并写入另一个子进程stdio。
import threading, tempfile
class MultipleThreadTIO:
def __init__(self):
self.myf = tempfile.TemporaryFile(mode='a+b')
self.current_count = 0
self.do_write = True
self.do_read = True
def write_temp(self):
while self.do_write:
self.myf.write(str(self.current_count)+"|")
self.current_count += 1
def read_temp(self):
read_at = 0L
while self.do_read:
self.myf.seek(read_at)
d = self.myf.read()
if len(d.strip()) > 0:
print "READ @",read_at,": ", self.myf.read()
read_at = self.myf.tell()
def stop_write(self):
self.do_write = False
def stop_read(self):
self.do_read = False
def __del__(self):
#self.myf.seek(0)
#print ":::DATA CONTENT:::\n"
#print self.myf.read()
#print ":::END DATA CONTENT:::"
self.myf.close()
mtio = MultipleThreadTIO()
t1 = threading.Timer(0.1, mtio.write_temp)
t2 = threading.Timer(0.5, mtio.read_temp)
t3 = threading.Timer(5, mtio.stop_write)
t4 = threading.Timer(3, mtio.stop_read)
t1.start()
t2.start()
t3.start()
t4.start()
问题:
问题 1:以上问题有解决办法吗?
问题 2:我应该使用 queues/os.pipe(/other?) 而不是临时文件吗?
问题 3:对于这种情况还有其他更好的方法吗?
重要:解决方案必须是跨平台的。
这是使用 Queue
, I think this is better if you're using a queue/pipe behaviour. I'm not sure what you're actually writing, so I just added the running counter to the queue. Writing to the queue is stopped using an Event
的抽象,通过发送停止消息停止从队列中读取(但如果您想要更好的控制,可以添加信号):
import threading
import Queue
class MultipleThreadTIO:
def __init__(self):
self.queue = Queue.Queue()
self.current_count = 0
self.stop_write = threading.Event()
def write_temp(self):
while not self.stop_write.isSet():
self.queue.put(str(self.current_count)+"|")
self.current_count += 1
self.stop()
def read_temp(self):
while True:
msg = self.queue.get()
if msg == 'close':
break
else:
print "READ @: " + msg
def stop(self):
self.queue.put('close')
mtio = MultipleThreadTIO()
t1 = threading.Timer(0.1, mtio.write_temp)
t2 = threading.Timer(0.5, mtio.read_temp)
t1.start()
t2.start()
t3 = threading.Timer(5, mtio.stop_write.set)
t3.start()
我无法在 windows 上进行测试,但我认为它应该可以工作,因为这是非常标准的。它在 Ubuntu 14.04 x86_64
上运行