Python: inotify, concurrent.futures - 如何添加现有文件
Python: inotify, concurrent.futures - how to add exisiting files
我有这个使用 inotify module and mulit-threading:
处理文件的简单脚本
import concurrent.futures
import inotify.adapters
def main():
i = inotify.adapters.Inotify()
i.add_watch(b'/data')
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
try:
for event in i.event_gen():
if event is not None:
(header, type_names, watch_path, filename) = event
# inotify event: IN_CLOSE_WRITE
if header.mask == 8:
future = executor.submit(process, filename.decode('utf-8'))
future.add_done_callback(future_callback)
finally:
i.remove_watch(b'/data')
if __name__ == '__main__':
main()
我遇到的问题是,在脚本实际启动之前,监视目录可能有很多文件。
我考虑过类似于下面示例的内容,但这不会启动 "yielding" inotify 生成器,直到处理完所有现有文件,并且它还会错过在此期间创建的新事件:
import concurrent.futures
import inotify.adapters
def main():
i = inotify.adapters.Inotify()
i.add_watch(b'/data')
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
files = os.listdir('/data')
if files:
for filename in files:
future = executor.submit(run, filename)
future.add_done_callback(future_callback)
try:
for event in i.event_gen():
if event is not None:
(header, type_names, watch_path, filename) = event
# inotify event: IN_CLOSE_WRITE
if header.mask == 8:
future = executor.submit(process, filename.decode('utf-8'))
future.add_done_callback(future_callback)
finally:
i.remove_watch(b'/data')
if __name__ == '__main__':
main()
有没有办法手动发送 inotify
事件或将这些文件添加到 i.event_gen()
生成器?
这是一个处理其中一个工作人员中的旧文件的示例,允许在处理旧的现有文件的同时并行捕获新事件。作为记录,即使使用您的线性代码,我也没有遇到丢失事件的问题。
此外,根据我使用的 inotify module,PyInotify 模块是 "defunct and no longer available."。
#!/usr/bin/env python3
import concurrent.futures
import inotify.adapters
import time
import os
from functools import partial
DIRECTORY='.'
def run(filename, suffix=''):
time.sleep(1)
return 'run: ' + filename + suffix
def process(filename):
return run(filename, suffix=' (inotify)')
def future_callback(fut):
print('future_callback: ' + fut.result())
def do_directory(executor):
fn = partial(run, suffix=' (dir list)')
for filename in os.listdir(DIRECTORY):
future = executor.submit(fn, filename)
future.add_done_callback(future_callback)
def main():
i = inotify.adapters.Inotify()
i.add_watch(DIRECTORY.encode())
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Process the directory in a thread or locally. Not sure if it
# is safe to submit to the executor from within one its workers.
# Seems like it should be.
executor.submit(do_directory, executor)
# do_directory(executor)
try:
for event in i.event_gen():
if event is not None:
(header, type_names, watch_path, filename) = event
# inotify event: IN_CLOSE_WRITE
if header.mask == 8:
future = executor.submit(process, filename.decode('utf-8'))
future.add_done_callback(future_callback)
print('Submitted inotify for', filename.decode())
except KeyboardInterrupt:
pass
finally:
i.remove_watch(DIRECTORY.encode())
if __name__ == '__main__':
main()
测试:
从包含 10 个文件的目录开始。启动程序,等待2秒,然后新建5个文件。查找 "submit" 消息以查看事件已接收并排队,同时仍在处理初始文件以及最终处理新文件。
~/p/TEST $ touch A1 A2 A3 A4 A5 A6 A7 A8 A9 A10
~/p/TEST $ do_test() {
> rm B*
> ../inotify-test.py &
> sleep 2
> touch B1 B2 B3 B4 B5
> sleep 5
> pkill -f inotify-test.py
> }
~/p/TEST $ do_test
[1] 26663
future_callback: run: A10 (dir list)
future_callback: run: A4 (dir list)
future_callback: run: A5 (dir list)
future_callback: run: A9 (dir list)
future_callback: run: A2 (dir list)
Submitted inotify for B1
Submitted inotify for B2
Submitted inotify for B3
Submitted inotify for B4
Submitted inotify for B5
future_callback: run: A3 (dir list)
future_callback: run: A8 (dir list)
future_callback: run: A1 (dir list)
future_callback: run: A7 (dir list)
future_callback: run: A6 (dir list)
future_callback: run: B1 (inotify)
future_callback: run: B2 (inotify)
future_callback: run: B3 (inotify)
future_callback: run: B4 (inotify)
future_callback: run: B5 (inotify)
~/p/TEST $
[1]+ Terminated ../inotify-test.py
~/p/TEST $
我有这个使用 inotify module and mulit-threading:
处理文件的简单脚本import concurrent.futures
import inotify.adapters
def main():
i = inotify.adapters.Inotify()
i.add_watch(b'/data')
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
try:
for event in i.event_gen():
if event is not None:
(header, type_names, watch_path, filename) = event
# inotify event: IN_CLOSE_WRITE
if header.mask == 8:
future = executor.submit(process, filename.decode('utf-8'))
future.add_done_callback(future_callback)
finally:
i.remove_watch(b'/data')
if __name__ == '__main__':
main()
我遇到的问题是,在脚本实际启动之前,监视目录可能有很多文件。
我考虑过类似于下面示例的内容,但这不会启动 "yielding" inotify 生成器,直到处理完所有现有文件,并且它还会错过在此期间创建的新事件:
import concurrent.futures
import inotify.adapters
def main():
i = inotify.adapters.Inotify()
i.add_watch(b'/data')
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
files = os.listdir('/data')
if files:
for filename in files:
future = executor.submit(run, filename)
future.add_done_callback(future_callback)
try:
for event in i.event_gen():
if event is not None:
(header, type_names, watch_path, filename) = event
# inotify event: IN_CLOSE_WRITE
if header.mask == 8:
future = executor.submit(process, filename.decode('utf-8'))
future.add_done_callback(future_callback)
finally:
i.remove_watch(b'/data')
if __name__ == '__main__':
main()
有没有办法手动发送 inotify
事件或将这些文件添加到 i.event_gen()
生成器?
这是一个处理其中一个工作人员中的旧文件的示例,允许在处理旧的现有文件的同时并行捕获新事件。作为记录,即使使用您的线性代码,我也没有遇到丢失事件的问题。
此外,根据我使用的 inotify module,PyInotify 模块是 "defunct and no longer available."。
#!/usr/bin/env python3
import concurrent.futures
import inotify.adapters
import time
import os
from functools import partial
DIRECTORY='.'
def run(filename, suffix=''):
time.sleep(1)
return 'run: ' + filename + suffix
def process(filename):
return run(filename, suffix=' (inotify)')
def future_callback(fut):
print('future_callback: ' + fut.result())
def do_directory(executor):
fn = partial(run, suffix=' (dir list)')
for filename in os.listdir(DIRECTORY):
future = executor.submit(fn, filename)
future.add_done_callback(future_callback)
def main():
i = inotify.adapters.Inotify()
i.add_watch(DIRECTORY.encode())
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Process the directory in a thread or locally. Not sure if it
# is safe to submit to the executor from within one its workers.
# Seems like it should be.
executor.submit(do_directory, executor)
# do_directory(executor)
try:
for event in i.event_gen():
if event is not None:
(header, type_names, watch_path, filename) = event
# inotify event: IN_CLOSE_WRITE
if header.mask == 8:
future = executor.submit(process, filename.decode('utf-8'))
future.add_done_callback(future_callback)
print('Submitted inotify for', filename.decode())
except KeyboardInterrupt:
pass
finally:
i.remove_watch(DIRECTORY.encode())
if __name__ == '__main__':
main()
测试:
从包含 10 个文件的目录开始。启动程序,等待2秒,然后新建5个文件。查找 "submit" 消息以查看事件已接收并排队,同时仍在处理初始文件以及最终处理新文件。
~/p/TEST $ touch A1 A2 A3 A4 A5 A6 A7 A8 A9 A10
~/p/TEST $ do_test() {
> rm B*
> ../inotify-test.py &
> sleep 2
> touch B1 B2 B3 B4 B5
> sleep 5
> pkill -f inotify-test.py
> }
~/p/TEST $ do_test
[1] 26663
future_callback: run: A10 (dir list)
future_callback: run: A4 (dir list)
future_callback: run: A5 (dir list)
future_callback: run: A9 (dir list)
future_callback: run: A2 (dir list)
Submitted inotify for B1
Submitted inotify for B2
Submitted inotify for B3
Submitted inotify for B4
Submitted inotify for B5
future_callback: run: A3 (dir list)
future_callback: run: A8 (dir list)
future_callback: run: A1 (dir list)
future_callback: run: A7 (dir list)
future_callback: run: A6 (dir list)
future_callback: run: B1 (inotify)
future_callback: run: B2 (inotify)
future_callback: run: B3 (inotify)
future_callback: run: B4 (inotify)
future_callback: run: B5 (inotify)
~/p/TEST $
[1]+ Terminated ../inotify-test.py
~/p/TEST $