异步文件复制 - 为什么文件描述符不好?
Async file copy - why is the file descriptor bad?
想要实现 Python 代码来读取和写入(即复制)文件。目标是并发读取和写入,因此减少了执行文件复制的时间。这是我熟悉 async/await 范式的学习练习。
这是我当前的实现,但是代码出错了。第二个或第三个文件读取操作抛出“Bad File Descriptor”。输入文件确实存在,我可以使用 .read()
正常读取它
想知道这里出了什么问题吗?使用异步文件是否有一些意想不到的副作用?
import asyncio
import queue
from aiofile import async_open
async def copy_file(input_fname, output_fname):
CHUNK_SIZE = 4096
chunk_queue = queue.Queue()
read_complete = False
SLEEP_DURATION = 1
async def read_chunks(file_object):
nonlocal CHUNK_SIZE, chunk_queue, read_complete
while True:
chunk = await file_object.read(CHUNK_SIZE)
chunk_queue.put(chunk)
if len(chunk) < CHUNK_SIZE: # Detect last chunk
read_complete = True
return
async def write_chunks(file_object):
nonlocal CHUNK_SIZE, chunk_queue, read_complete, SLEEP_DURATION
while True:
status = chunk_queue.empty()
if not status:
chunk = chunk_queue.get()
await file_object.write(chunk)
else:
await asyncio.sleep(SLEEP_DURATION)
async with async_open(input_fname, "rb") as input_file:
async with async_open(output_fname, "wb+") as output_file:
asyncio.create_task(read_chunks(input_file))
asyncio.create_task(write_chunks(output_file))
print("Copy complete")
async def main():
await copy_file("input.bin", "output.bin")
if __name__ == '__main__':
asyncio.run(main())
这是完整的堆栈跟踪:
Task exception was never retrieved
future: <Task finished name='Task-4' coro=<copy_file.<locals>.write_chunks() done, defined at /Users/kosa/PycharmProjects/copyFile/main.py:21> exception=ValueError('I/O operation on closed file')>
Traceback (most recent call last):
File "/Users/kosa/PycharmProjects/copyFile/main.py", line 27, in write_chunks
await file_object.write(chunk)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 217, in write
await operation
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 243, in write_bytes
data[written:], self.fileno(), offset + written,
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 173, in fileno
return self.__file_obj.fileno()
ValueError: I/O operation on closed file
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<copy_file.<locals>.read_chunks() done, defined at /Users/kosa/PycharmProjects/copyFile/main.py:12> exception=SystemError('Bad file descriptor')>
Traceback (most recent call last):
File "/Users/kosa/PycharmProjects/copyFile/main.py", line 15, in read_chunks
chunk = await file_object.read(CHUNK_SIZE)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 211, in read
return await self.__read(length)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 205, in __read
data = await self.file.read_bytes(length, self._offset)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 202, in read_bytes
return await self.__context.read(size, self.fileno(), offset)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/caio/asyncio_base.py", line 88, in submit
return op.get_value()
SystemError: Bad file descriptor
Copy complete
Process finished with exit code 0
啊哈,我发现错误在这里。问题是我们启动了任务,但在任务完成之前关闭了文件。
相反,我们需要等待完成,像这样:
async with async_open(input_fname, "rb") as input_file:
async with async_open(output_fname, "wb+") as output_file:
reads = asyncio.create_task(read_chunks(input_file))
writes = asyncio.create_task(write_chunks(output_file))
await reads
await writes
print("Copy complete")
以上代码按预期复制了文件
想要实现 Python 代码来读取和写入(即复制)文件。目标是并发读取和写入,因此减少了执行文件复制的时间。这是我熟悉 async/await 范式的学习练习。
这是我当前的实现,但是代码出错了。第二个或第三个文件读取操作抛出“Bad File Descriptor”。输入文件确实存在,我可以使用
想知道这里出了什么问题吗?使用异步文件是否有一些意想不到的副作用?
import asyncio
import queue
from aiofile import async_open
async def copy_file(input_fname, output_fname):
CHUNK_SIZE = 4096
chunk_queue = queue.Queue()
read_complete = False
SLEEP_DURATION = 1
async def read_chunks(file_object):
nonlocal CHUNK_SIZE, chunk_queue, read_complete
while True:
chunk = await file_object.read(CHUNK_SIZE)
chunk_queue.put(chunk)
if len(chunk) < CHUNK_SIZE: # Detect last chunk
read_complete = True
return
async def write_chunks(file_object):
nonlocal CHUNK_SIZE, chunk_queue, read_complete, SLEEP_DURATION
while True:
status = chunk_queue.empty()
if not status:
chunk = chunk_queue.get()
await file_object.write(chunk)
else:
await asyncio.sleep(SLEEP_DURATION)
async with async_open(input_fname, "rb") as input_file:
async with async_open(output_fname, "wb+") as output_file:
asyncio.create_task(read_chunks(input_file))
asyncio.create_task(write_chunks(output_file))
print("Copy complete")
async def main():
await copy_file("input.bin", "output.bin")
if __name__ == '__main__':
asyncio.run(main())
这是完整的堆栈跟踪:
Task exception was never retrieved
future: <Task finished name='Task-4' coro=<copy_file.<locals>.write_chunks() done, defined at /Users/kosa/PycharmProjects/copyFile/main.py:21> exception=ValueError('I/O operation on closed file')>
Traceback (most recent call last):
File "/Users/kosa/PycharmProjects/copyFile/main.py", line 27, in write_chunks
await file_object.write(chunk)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 217, in write
await operation
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 243, in write_bytes
data[written:], self.fileno(), offset + written,
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 173, in fileno
return self.__file_obj.fileno()
ValueError: I/O operation on closed file
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<copy_file.<locals>.read_chunks() done, defined at /Users/kosa/PycharmProjects/copyFile/main.py:12> exception=SystemError('Bad file descriptor')>
Traceback (most recent call last):
File "/Users/kosa/PycharmProjects/copyFile/main.py", line 15, in read_chunks
chunk = await file_object.read(CHUNK_SIZE)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 211, in read
return await self.__read(length)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 205, in __read
data = await self.file.read_bytes(length, self._offset)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 202, in read_bytes
return await self.__context.read(size, self.fileno(), offset)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/caio/asyncio_base.py", line 88, in submit
return op.get_value()
SystemError: Bad file descriptor
Copy complete
Process finished with exit code 0
啊哈,我发现错误在这里。问题是我们启动了任务,但在任务完成之前关闭了文件。 相反,我们需要等待完成,像这样:
async with async_open(input_fname, "rb") as input_file:
async with async_open(output_fname, "wb+") as output_file:
reads = asyncio.create_task(read_chunks(input_file))
writes = asyncio.create_task(write_chunks(output_file))
await reads
await writes
print("Copy complete")
以上代码按预期复制了文件