如何处理异步作业的子批次?
How to process sub-batches of asynchronous jobs?
我有一组异步作业(大约 100 个),我想 运行 以五个为一组对每个作业使用 subprocess.popen
。我的计划是:
- 执行作业列表中的前五个作业
- 每分钟左右轮询一次活动作业(每个作业需要几分钟 运行)
- 如果一个作业完成,执行下一个作业,始终保证我们 运行一次执行五个作业
- 继续,直到我们完成整个工作列表
在 python 中是否有已知的模式可以做到这一点?
在 Python 2 中,我为此使用了 multiprocessing.Pool
和 subprocess
的组合。但这确实以池进程的形式产生了额外的开销。
所以在 Python 3 中我使用 concurrent.futures.ThreadPoolExecutor
而不是 multiprocessing.pool
;
下面的代码片段展示了如何使用 ThreadPoolExecutor
;
import concurrent.futures as cf
import logging
import os
errmsg = 'conversion of track {} failed, return code {}'
okmsg = 'finished track {}, "{}"'
num = len(data['tracks'])
with cf.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp:
fl = [tp.submit(runflac, t, data) for t in range(num)]
for fut in cf.as_completed(fl):
idx, rv = fut.result()
if rv == 0:
logging.info(okmsg.format(idx+1, data['tracks'][idx]))
else:
logging.error(errmsg.format(idx+1, rv))
runflac
函数使用subprocess
调用flac(1)
转换音乐文件:
import subprocess
def runflac(idx, data):
"""Use the flac(1) program to convert a music file to FLAC format.
Arguments:
idx: track index (starts from 0)
data: album data dictionary
Returns:
A tuple containing the track index and return value of flac.
"""
num = idx + 1
ifn = 'track{:02d}.cdda.wav'.format(num)
args = ['flac', '--best', '--totally-silent',
'-TARTIST=' + data['artist'], '-TALBUM=' + data['title'],
'-TTITLE=' + data['tracks'][idx],
'-TDATE={}'.format(data['year']),
'-TGENRE={}'.format(data['genre']),
'-TTRACKNUM={:02d}'.format(num), '-o',
'track{:02d}.flac'.format(num), ifn]
rv = subprocess.call(args, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return (idx, rv)
更新:
在 Python 2.7 中,还有另一种技术稍微复杂一些,但避免了使用多处理池的开销。
基本形式为:
starter = functools.partial(startencoder, crf=args.crf, preset=args.preset)
procs = []
maxprocs = cpu_count()
for ifile in args.files:
while len(procs) == maxprocs:
manageprocs(procs)
procs.append(starter(ifile))
while len(procs) > 0:
manageprocs(procs)
(使用functools.partial
是一种为函数设置默认参数的方法,与原理无关。)startencoder
函数基本上是对subprocess.Popen
的包装,但它 returns 除了 Popen
实例之外的一些额外信息;
def startencoder(fname, crf, preset):
"""
Use ffmpeg to convert a video file to H.264/AAC streams in an MP4
container.
Arguments:
fname: Name of the file to convert.
crf: Constant rate factor. See ffmpeg docs.
preset: Encoding preset. See ffmpeg docs.
Returns:
A 3-tuple of a Process, input path and output path.
"""
basename, ext = os.path.splitext(fname)
known = ['.mp4', '.avi', '.wmv', '.flv', '.mpg', '.mpeg', '.mov', '.ogv',
'.mkv', '.webm']
if ext.lower() not in known:
ls = "File {} has unknown extension, ignoring it.".format(fname)
logging.warning(ls)
return (None, fname, None)
ofn = basename + '.mp4'
args = ['ffmpeg', '-i', fname, '-c:v', 'libx264', '-crf', str(crf),
'-preset', preset, '-flags', '+aic+mv4', '-c:a', 'libfaac',
'-sn', '-y', ofn]
try:
p = subprocess.Popen(args, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
logging.info("Conversion of {} to {} started.".format(fname, ofn))
except:
logging.error("Starting conversion of {} failed.".format(fname))
return (p, fname, ofn)
重要的是manageprocs
函数:
def manageprocs(proclist):
"""
Check a list of subprocesses tuples for processes that have ended and
remove them from the list.
Arguments:
proclist: a list of (process, input filename, output filename)
tuples.
"""
nr = '# of conversions running: {}\r'.format(len(proclist))
logging.info(nr)
sys.stdout.flush()
for p in proclist:
pr, ifn, ofn = p
if pr is None:
proclist.remove(p)
elif pr.poll() is not None:
logging.info('Conversion of {} to {} finished.'.format(ifn, ofn))
proclist.remove(p)
sleep(0.5)
我有一组异步作业(大约 100 个),我想 运行 以五个为一组对每个作业使用 subprocess.popen
。我的计划是:
- 执行作业列表中的前五个作业
- 每分钟左右轮询一次活动作业(每个作业需要几分钟 运行)
- 如果一个作业完成,执行下一个作业,始终保证我们 运行一次执行五个作业
- 继续,直到我们完成整个工作列表
在 python 中是否有已知的模式可以做到这一点?
在 Python 2 中,我为此使用了 multiprocessing.Pool
和 subprocess
的组合。但这确实以池进程的形式产生了额外的开销。
所以在 Python 3 中我使用 concurrent.futures.ThreadPoolExecutor
而不是 multiprocessing.pool
;
下面的代码片段展示了如何使用 ThreadPoolExecutor
;
import concurrent.futures as cf
import logging
import os
errmsg = 'conversion of track {} failed, return code {}'
okmsg = 'finished track {}, "{}"'
num = len(data['tracks'])
with cf.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp:
fl = [tp.submit(runflac, t, data) for t in range(num)]
for fut in cf.as_completed(fl):
idx, rv = fut.result()
if rv == 0:
logging.info(okmsg.format(idx+1, data['tracks'][idx]))
else:
logging.error(errmsg.format(idx+1, rv))
runflac
函数使用subprocess
调用flac(1)
转换音乐文件:
import subprocess
def runflac(idx, data):
"""Use the flac(1) program to convert a music file to FLAC format.
Arguments:
idx: track index (starts from 0)
data: album data dictionary
Returns:
A tuple containing the track index and return value of flac.
"""
num = idx + 1
ifn = 'track{:02d}.cdda.wav'.format(num)
args = ['flac', '--best', '--totally-silent',
'-TARTIST=' + data['artist'], '-TALBUM=' + data['title'],
'-TTITLE=' + data['tracks'][idx],
'-TDATE={}'.format(data['year']),
'-TGENRE={}'.format(data['genre']),
'-TTRACKNUM={:02d}'.format(num), '-o',
'track{:02d}.flac'.format(num), ifn]
rv = subprocess.call(args, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return (idx, rv)
更新:
在 Python 2.7 中,还有另一种技术稍微复杂一些,但避免了使用多处理池的开销。
基本形式为:
starter = functools.partial(startencoder, crf=args.crf, preset=args.preset)
procs = []
maxprocs = cpu_count()
for ifile in args.files:
while len(procs) == maxprocs:
manageprocs(procs)
procs.append(starter(ifile))
while len(procs) > 0:
manageprocs(procs)
(使用functools.partial
是一种为函数设置默认参数的方法,与原理无关。)startencoder
函数基本上是对subprocess.Popen
的包装,但它 returns 除了 Popen
实例之外的一些额外信息;
def startencoder(fname, crf, preset):
"""
Use ffmpeg to convert a video file to H.264/AAC streams in an MP4
container.
Arguments:
fname: Name of the file to convert.
crf: Constant rate factor. See ffmpeg docs.
preset: Encoding preset. See ffmpeg docs.
Returns:
A 3-tuple of a Process, input path and output path.
"""
basename, ext = os.path.splitext(fname)
known = ['.mp4', '.avi', '.wmv', '.flv', '.mpg', '.mpeg', '.mov', '.ogv',
'.mkv', '.webm']
if ext.lower() not in known:
ls = "File {} has unknown extension, ignoring it.".format(fname)
logging.warning(ls)
return (None, fname, None)
ofn = basename + '.mp4'
args = ['ffmpeg', '-i', fname, '-c:v', 'libx264', '-crf', str(crf),
'-preset', preset, '-flags', '+aic+mv4', '-c:a', 'libfaac',
'-sn', '-y', ofn]
try:
p = subprocess.Popen(args, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
logging.info("Conversion of {} to {} started.".format(fname, ofn))
except:
logging.error("Starting conversion of {} failed.".format(fname))
return (p, fname, ofn)
重要的是manageprocs
函数:
def manageprocs(proclist):
"""
Check a list of subprocesses tuples for processes that have ended and
remove them from the list.
Arguments:
proclist: a list of (process, input filename, output filename)
tuples.
"""
nr = '# of conversions running: {}\r'.format(len(proclist))
logging.info(nr)
sys.stdout.flush()
for p in proclist:
pr, ifn, ofn = p
if pr is None:
proclist.remove(p)
elif pr.poll() is not None:
logging.info('Conversion of {} to {} finished.'.format(ifn, ofn))
proclist.remove(p)
sleep(0.5)