如何使用 python 以编程方式计算存档中的文件数
How to programmatically count the number of files in an archive using python
在我维护的程序中,它是这样完成的:
# count the files in the archive
length = 0
command = ur'"%s" l -slt "%s"' % (u'path/to/7z.exe', srcFile)
ins, err = Popen(command, stdout=PIPE, stdin=PIPE,
startupinfo=startupinfo).communicate()
ins = StringIO.StringIO(ins)
for line in ins: length += 1
ins.close()
- 真的只有这样吗?我似乎找不到 any other command 但我不能只询问文件数量似乎有点奇怪
错误检查呢?将其修改为:
是否足够
proc = Popen(command, stdout=PIPE, stdin=PIPE,
startupinfo=startupinfo)
out = proc.stdout
# ... count
returncode = proc.wait()
if returncode:
raise Exception(u'Failed reading number of files from ' + srcFile)
或者我应该实际解析 Popen 的输出?
编辑:对 7z、rar、zip 档案感兴趣(7z.exe 支持)——但 7z 和 zip 对初学者来说就足够了
计算 Python 中 zip 存档中的存档成员数:
#!/usr/bin/env python
import sys
from contextlib import closing
from zipfile import ZipFile
with closing(ZipFile(sys.argv[1])) as archive:
count = len(archive.infolist())
print(count)
它可能会使用 zlib
、bz2
、lzma
模块(如果可用)来解压缩存档。
要计算 tar 存档中常规文件的数量:
#!/usr/bin/env python
import sys
import tarfile
with tarfile.open(sys.argv[1]) as archive:
count = sum(1 for member in archive if member.isreg())
print(count)
它可能支持 gzip
、bz2
和 lzma
压缩,具体取决于 Python 的版本。
您可以找到可为 7z 存档提供类似功能的第 3 方模块。
要使用 7z
实用程序获取存档中的文件数:
import os
import subprocess
def count_files_7z(archive):
s = subprocess.check_output(["7z", "l", archive], env=dict(os.environ, LC_ALL="C"))
return int(re.search(br'(\d+)\s+files,\s+\d+\s+folders$', s).group(1))
如果存档中有很多文件,以下版本可能会占用更少的内存:
import os
import re
from subprocess import Popen, PIPE, CalledProcessError
def count_files_7z(archive):
command = ["7z", "l", archive]
p = Popen(command, stdout=PIPE, bufsize=1, env=dict(os.environ, LC_ALL="C"))
with p.stdout:
for line in p.stdout:
if line.startswith(b'Error:'): # found error
error = line + b"".join(p.stdout)
raise CalledProcessError(p.wait(), command, error)
returncode = p.wait()
assert returncode == 0
return int(re.search(br'(\d+)\s+files,\s+\d+\s+folders', line).group(1))
示例:
import sys
try:
print(count_files_7z(sys.argv[1]))
except CalledProcessError as e:
getattr(sys.stderr, 'buffer', sys.stderr).write(e.output)
sys.exit(e.returncode)
计算通用子进程输出中的行数:
from functools import partial
from subprocess import Popen, PIPE, CalledProcessError
p = Popen(command, stdout=PIPE, bufsize=-1)
with p.stdout:
read_chunk = partial(p.stdout.read, 1 << 15)
count = sum(chunk.count(b'\n') for chunk in iter(read_chunk, b''))
if p.wait() != 0:
raise CalledProcessError(p.returncode, command)
print(count)
支持无限输出
Could you explain why buffsize=-1 (as opposed to buffsize=1 in your previous answer: whosebug.com/a/30984882/281545)
bufsize=-1
表示在 Python 上使用默认的 I/O 缓冲区大小而不是 bufsize=0
(无缓冲)2. 它是 Python 上的性能提升2.最近的Python3版本是默认的。如果在某些较早的 Python 3 版本中 bufsize
未更改为 bufsize=-1
.
,您可能会得到简短的读取(丢失数据)
这个答案以块的形式读取,因此流被完全缓冲以提高效率。 是面向行的。 bufsize=1
表示 "line buffered"。否则与 bufsize=-1
的区别很小。
and also what the read_chunk = partial(p.stdout.read, 1 << 15) buys us ?
它等同于 read_chunk = lambda: p.stdout.read(1<<15)
但总体上提供了更多的内省。习惯于implement wc -l
in Python efficiently.
因为我已经 7z.exe 与应用程序捆绑在一起,我当然想避免使用第三方库,而我确实需要解析 rar 和 7z 档案,我想我会选择:
regErrMatch = re.compile(u'Error:', re.U).match # needs more testing
r"""7z list command output is of the form:
Date Time Attr Size Compressed Name
------------------- ----- ------------ ------------ ------------------------
2015-06-29 21:14:04 ....A <size> <filename>
where ....A is the attribute value for normal files, ....D for directories
"""
reFileMatch = re.compile(ur'(\d|:|-|\s)*\.\.\.\.A', re.U).match
def countFilesInArchive(srcArch, listFilePath=None):
"""Count all regular files in srcArch (or only the subset in
listFilePath)."""
#
command = ur'"%s" l -scsUTF-8 -sccUTF-8 "%s"' % ('compiled/7z.exe', srcArch)
if listFilePath: command += u' @"%s"' % listFilePath
proc = Popen(command, stdout=PIPE, startupinfo=startupinfo, bufsize=-1)
length, errorLine = 0, []
with proc.stdout as out:
for line in iter(out.readline, b''):
line = unicode(line, 'utf8')
if errorLine or regErrMatch(line):
errorLine.append(line)
elif reFileMatch(line):
length += 1
returncode = proc.wait()
if returncode or errorLine: raise StateError(u'%s: Listing failed\n' +
srcArch + u'7z.exe return value: ' + str(returncode) +
u'\n' + u'\n'.join([x.strip() for x in errorLine if x.strip()]))
return length
@JFSebastien
中的错误检查
我的最终(差不多)基于已接受的答案 - 可能不需要 unicode,暂时保留它,因为我在任何地方都使用它。还保留了正则表达式(我可能会扩展它,我见过像 re.compile(u'^(Error:.+|.+ Data Error?|Sub items Errors:.+)',re.U)
这样的东西。将不得不研究 check_output 和 CalledProcessError。
def countFilesInArchive(srcArch, listFilePath=None):
"""Count all regular files in srcArch (or only the subset in
listFilePath)."""
command = [exe7z, u'l', u'-scsUTF-8', u'-sccUTF-8', srcArch]
if listFilePath: command += [u'@%s' % listFilePath]
proc = Popen(command, stdout=PIPE, stdin=PIPE, # stdin needed if listFilePath
startupinfo=startupinfo, bufsize=1)
errorLine = line = u''
with proc.stdout as out:
for line in iter(out.readline, b''): # consider io.TextIOWrapper
line = unicode(line, 'utf8')
if regErrMatch(line):
errorLine = line + u''.join(out)
break
returncode = proc.wait()
msg = u'%s: Listing failed\n' % srcArch.s
if returncode or errorLine:
msg += u'7z.exe return value: ' + str(returncode) + u'\n' + errorLine
elif not line: # should not happen
msg += u'Empty output'
else: msg = u''
if msg: raise StateError(msg) # consider using CalledProcessError
# number of files is reported in the last line - example:
# 3534900 325332 75 files, 29 folders
return int(re.search(ur'(\d+)\s+files,\s+\d+\s+folders', line).group(1))
将根据我的发现对其进行编辑。
在我维护的程序中,它是这样完成的:
# count the files in the archive
length = 0
command = ur'"%s" l -slt "%s"' % (u'path/to/7z.exe', srcFile)
ins, err = Popen(command, stdout=PIPE, stdin=PIPE,
startupinfo=startupinfo).communicate()
ins = StringIO.StringIO(ins)
for line in ins: length += 1
ins.close()
- 真的只有这样吗?我似乎找不到 any other command 但我不能只询问文件数量似乎有点奇怪
错误检查呢?将其修改为:
是否足够proc = Popen(command, stdout=PIPE, stdin=PIPE, startupinfo=startupinfo) out = proc.stdout # ... count returncode = proc.wait() if returncode: raise Exception(u'Failed reading number of files from ' + srcFile)
或者我应该实际解析 Popen 的输出?
编辑:对 7z、rar、zip 档案感兴趣(7z.exe 支持)——但 7z 和 zip 对初学者来说就足够了
计算 Python 中 zip 存档中的存档成员数:
#!/usr/bin/env python
import sys
from contextlib import closing
from zipfile import ZipFile
with closing(ZipFile(sys.argv[1])) as archive:
count = len(archive.infolist())
print(count)
它可能会使用 zlib
、bz2
、lzma
模块(如果可用)来解压缩存档。
要计算 tar 存档中常规文件的数量:
#!/usr/bin/env python
import sys
import tarfile
with tarfile.open(sys.argv[1]) as archive:
count = sum(1 for member in archive if member.isreg())
print(count)
它可能支持 gzip
、bz2
和 lzma
压缩,具体取决于 Python 的版本。
您可以找到可为 7z 存档提供类似功能的第 3 方模块。
要使用 7z
实用程序获取存档中的文件数:
import os
import subprocess
def count_files_7z(archive):
s = subprocess.check_output(["7z", "l", archive], env=dict(os.environ, LC_ALL="C"))
return int(re.search(br'(\d+)\s+files,\s+\d+\s+folders$', s).group(1))
如果存档中有很多文件,以下版本可能会占用更少的内存:
import os
import re
from subprocess import Popen, PIPE, CalledProcessError
def count_files_7z(archive):
command = ["7z", "l", archive]
p = Popen(command, stdout=PIPE, bufsize=1, env=dict(os.environ, LC_ALL="C"))
with p.stdout:
for line in p.stdout:
if line.startswith(b'Error:'): # found error
error = line + b"".join(p.stdout)
raise CalledProcessError(p.wait(), command, error)
returncode = p.wait()
assert returncode == 0
return int(re.search(br'(\d+)\s+files,\s+\d+\s+folders', line).group(1))
示例:
import sys
try:
print(count_files_7z(sys.argv[1]))
except CalledProcessError as e:
getattr(sys.stderr, 'buffer', sys.stderr).write(e.output)
sys.exit(e.returncode)
计算通用子进程输出中的行数:
from functools import partial
from subprocess import Popen, PIPE, CalledProcessError
p = Popen(command, stdout=PIPE, bufsize=-1)
with p.stdout:
read_chunk = partial(p.stdout.read, 1 << 15)
count = sum(chunk.count(b'\n') for chunk in iter(read_chunk, b''))
if p.wait() != 0:
raise CalledProcessError(p.returncode, command)
print(count)
支持无限输出
Could you explain why buffsize=-1 (as opposed to buffsize=1 in your previous answer: whosebug.com/a/30984882/281545)
bufsize=-1
表示在 Python 上使用默认的 I/O 缓冲区大小而不是 bufsize=0
(无缓冲)2. 它是 Python 上的性能提升2.最近的Python3版本是默认的。如果在某些较早的 Python 3 版本中 bufsize
未更改为 bufsize=-1
.
这个答案以块的形式读取,因此流被完全缓冲以提高效率。 bufsize=1
表示 "line buffered"。否则与 bufsize=-1
的区别很小。
and also what the read_chunk = partial(p.stdout.read, 1 << 15) buys us ?
它等同于 read_chunk = lambda: p.stdout.read(1<<15)
但总体上提供了更多的内省。习惯于implement wc -l
in Python efficiently.
因为我已经 7z.exe 与应用程序捆绑在一起,我当然想避免使用第三方库,而我确实需要解析 rar 和 7z 档案,我想我会选择:
regErrMatch = re.compile(u'Error:', re.U).match # needs more testing
r"""7z list command output is of the form:
Date Time Attr Size Compressed Name
------------------- ----- ------------ ------------ ------------------------
2015-06-29 21:14:04 ....A <size> <filename>
where ....A is the attribute value for normal files, ....D for directories
"""
reFileMatch = re.compile(ur'(\d|:|-|\s)*\.\.\.\.A', re.U).match
def countFilesInArchive(srcArch, listFilePath=None):
"""Count all regular files in srcArch (or only the subset in
listFilePath)."""
#
command = ur'"%s" l -scsUTF-8 -sccUTF-8 "%s"' % ('compiled/7z.exe', srcArch)
if listFilePath: command += u' @"%s"' % listFilePath
proc = Popen(command, stdout=PIPE, startupinfo=startupinfo, bufsize=-1)
length, errorLine = 0, []
with proc.stdout as out:
for line in iter(out.readline, b''):
line = unicode(line, 'utf8')
if errorLine or regErrMatch(line):
errorLine.append(line)
elif reFileMatch(line):
length += 1
returncode = proc.wait()
if returncode or errorLine: raise StateError(u'%s: Listing failed\n' +
srcArch + u'7z.exe return value: ' + str(returncode) +
u'\n' + u'\n'.join([x.strip() for x in errorLine if x.strip()]))
return length
@JFSebastien
我的最终(差不多)基于已接受的答案 - 可能不需要 unicode,暂时保留它,因为我在任何地方都使用它。还保留了正则表达式(我可能会扩展它,我见过像 re.compile(u'^(Error:.+|.+ Data Error?|Sub items Errors:.+)',re.U)
这样的东西。将不得不研究 check_output 和 CalledProcessError。
def countFilesInArchive(srcArch, listFilePath=None):
"""Count all regular files in srcArch (or only the subset in
listFilePath)."""
command = [exe7z, u'l', u'-scsUTF-8', u'-sccUTF-8', srcArch]
if listFilePath: command += [u'@%s' % listFilePath]
proc = Popen(command, stdout=PIPE, stdin=PIPE, # stdin needed if listFilePath
startupinfo=startupinfo, bufsize=1)
errorLine = line = u''
with proc.stdout as out:
for line in iter(out.readline, b''): # consider io.TextIOWrapper
line = unicode(line, 'utf8')
if regErrMatch(line):
errorLine = line + u''.join(out)
break
returncode = proc.wait()
msg = u'%s: Listing failed\n' % srcArch.s
if returncode or errorLine:
msg += u'7z.exe return value: ' + str(returncode) + u'\n' + errorLine
elif not line: # should not happen
msg += u'Empty output'
else: msg = u''
if msg: raise StateError(msg) # consider using CalledProcessError
# number of files is reported in the last line - example:
# 3534900 325332 75 files, 29 folders
return int(re.search(ur'(\d+)\s+files,\s+\d+\s+folders', line).group(1))
将根据我的发现对其进行编辑。