分块处理大文件:与 readline 的查找不一致
Processing large files in chunks: inconsistent seek with readline
我正在尝试使用 Python 以块的形式读取和处理一个大文件。我正在关注 this blog,它提出了一种非常快速的方法来读取和处理分布在多个进程中的大块数据。我只是稍微更新了现有代码,即使用 stat(fin).st_size
而不是 os.path.getsize
。在这个例子中,我也没有实现多处理,因为这个问题也体现在单个进程中。这使得调试更容易。
我在使用这段代码时遇到的问题是它 return 的句子不完整。这是有道理的:指针不考虑行尾,只是 return 一些给定的字节大小。实际上,人们会假设您可以通过省略获取的一批行中的最后一项来解决这个问题,因为那很可能是虚线。不幸的是,这也不能可靠地工作。
from os import stat
def chunkify(pfin, buf_size=1024):
file_end = stat(pfin).st_size
with open(pfin, 'rb') as f:
chunk_end = f.tell()
while True:
chunk_start = chunk_end
f.seek(buf_size, 1)
f.readline()
chunk_end = f.tell()
yield chunk_start, chunk_end - chunk_start
if chunk_end > file_end:
break
def process_batch(pfin, chunk_start, chunk_size):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
batch = f.read(chunk_size).splitlines()
# changing this to batch[:-1] will result in 26 lines total
return batch
if __name__ == '__main__':
fin = r'data/tiny.txt'
lines_n = 0
for start, size in chunkify(fin):
lines = process_batch(fin, start, size)
# Uncomment to see broken lines
# for line in lines:
# print(line)
# print('\n')
lines_n += len(lines)
print(lines_n)
# 29
上面的代码将打印 29
作为已处理行的总数。当您不 return 批处理的最后一项时,天真地假设那是一条虚线,您将得到 26
。实际行数为27行,测试数据见下
She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was .2 million and .1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1
如果打印出创建的行,您会发现确实出现了断句。我觉得这很奇怪。 难道 f.readline()
不应该确保在下一行之前读取文件吗? 在下面的输出中,空行将两个批次分开。这意味着您不能在批次中检查一行与下一行,如果它是子字符串则将其删除 - 断句属于另一个批次而不是完整句子。
...
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, r
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
...
有没有办法去掉这些断句,又不会去掉太多?
您可以下载更大的测试文件(100,000行)here.
经过大量挖掘,似乎实际上是某些无法访问的缓冲区导致了 seek 的不一致行为,如所讨论的 here and here。我尝试了将 iter(f.readline, '')
与 seek
一起使用的建议解决方案,但这仍然给我不一致的结果。我已将我的代码更新为 return 每批 1500 行之后的文件指针,但实际上批次 return 会重叠。
from os import stat
from functools import partial
def chunkify(pfin, max_lines=1500):
file_end = stat(pfin).st_size
with open(pfin, 'r', encoding='utf-8') as f:
chunk_end = f.tell()
for idx, l in enumerate(iter(f.readline, '')):
if idx % max_lines == 0:
chunk_start = chunk_end
chunk_end = f.tell()
# yield start position, size, and is_last
yield chunk_start, chunk_end - chunk_start
chunk_start = chunk_end
yield chunk_start, file_end
def process_batch(pfin, chunk_start, chunk_size):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size).splitlines()
batch = list(filter(None, chunk))
return batch
if __name__ == '__main__':
fin = r'data/100000-ep+gutenberg+news+wiki.txt'
process_func = partial(process_batch, fin)
lines_n = 0
prev_last = ''
for start, size in chunkify(fin):
lines = process_func(start, size)
if not lines:
continue
# print first and last ten sentences of batch
for line in lines[:10]:
print(line)
print('...')
for line in lines[-10:]:
print(line)
print('\n')
lines_n += len(lines)
print(lines_n)
下面是重叠批次的示例。最后一批的前两句半是从前一批的最后一句复制而来的。我不知道如何解释或解决这个问题。
...
The EC ordered the SFA to conduct probes by June 30 and to have them confirmed by a certifying authority or it would deduct a part of the funding or the entire sum from upcoming EU subsidy payments.
Dinner for two, with wine, 250 lari.
It lies a few kilometres north of the slightly higher Weissmies and also close to the slightly lower Fletschhorn on the north.
For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one els
For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one else, whatever their insights or artistic pedigree, is quite as interesting.
Sajid Nadiadwala's reboot version of his cult classic "Judwaa", once again directed by David Dhawan titled "Judwaa 2" broke the dry spell running at the box office in 2017.
They warned that there will be a breaking point, although it is not clear what that would be.
...
除此之外,我还尝试从原始代码中删除 readline
,并跟踪剩余的、不完整的块。然后将不完整的块传递给下一个块并添加到其前面。我现在 运行 遇到的问题是,因为文本是按字节块读取的,所以一个块可能会在没有完全完成一个字符的字节的情况下结束。这将导致解码错误。
from os import stat
def chunkify(pfin, buf_size=1024):
file_end = stat(pfin).st_size
with open(pfin, 'rb') as f:
chunk_end = f.tell()
while True:
chunk_start = chunk_end
f.seek(buf_size, 1)
chunk_end = f.tell()
is_last = chunk_end >= file_end
# yield start position, size, and is_last
yield chunk_start, chunk_end - chunk_start, is_last
if is_last:
break
def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size)
# Add previous leftover to current chunk
chunk = leftover + chunk
batch = chunk.splitlines()
batch = list(filter(None, batch))
# If this chunk is not the last one,
# pop the last item as that will be an incomplete sentence
# We return this leftover to use in the next chunk
if not is_last:
leftover = batch.pop(-1)
return batch, leftover
if __name__ == '__main__':
fin = r'ep+gutenberg+news+wiki.txt'
lines_n = 0
left = ''
for start, size, last in chunkify(fin):
lines, left = process_batch(fin, start, size, last, left)
if not lines:
continue
for line in lines:
print(line)
print('\n')
numberlines = len(lines)
lines_n += numberlines
print(lines_n)
运行上面的代码,必然会产生UnicodeDecodeError
.
Traceback (most recent call last):
File "chunk_tester.py", line 46, in <module>
lines, left = process_batch(fin, start, size, last, left)
File "chunk_tester.py", line 24, in process_batch
chunk = f.read(chunk_size)
File "lib\codecs.py", line 322, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa9 in position 0: invalid start byte
你如此接近!对最终代码进行相对简单的更改(以 bytes
而不是 str
的形式读取数据)使其全部(几乎)正常工作。
主要问题是因为从二进制文件读取计数 字节 ,但从文本文件读取计数 text,而您第一次以字节计数,而你的第二个以 个字符 计数,这导致你对哪些数据已被读取的假设是错误的。这与内部隐藏缓冲区无关。
其他变化:
- 代码需要在
b'\n'
上拆分,而不是使用bytes.splitlines()
,并且只在相关检测代码后删除空行。
- 除非文件的大小发生变化(在这种情况下,您现有的代码无论如何都会中断 ),
chunkify
可以替换为更简单、更快速的功能循环无需打开文件即可完全相同。
这给出了最终代码:
from os import stat
def chunkify(pfin, buf_size=1024**2):
file_end = stat(pfin).st_size
i = -buf_size
for i in range(0, file_end - buf_size, buf_size):
yield i, buf_size, False
leftover = file_end % buf_size
if leftover == 0: # if the last section is buf_size in size
leftover = buf_size
yield i + buf_size, leftover, True
def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
with open(pfin, 'rb') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size)
# Add previous leftover to current chunk
chunk = leftover + chunk
batch = chunk.split(b'\n')
# If this chunk is not the last one,
# pop the last item as that will be an incomplete sentence
# We return this leftover to use in the next chunk
if not is_last:
leftover = batch.pop(-1)
return [s.decode('utf-8') for s in filter(None, batch)], leftover
if __name__ == '__main__':
fin = r'ep+gutenberg+news+wiki.txt'
lines_n = 0
left = b''
for start, size, last in chunkify(fin):
lines, left = process_batch(fin, start, size, last, left)
if not lines:
continue
for line in lines:
print(line)
print('\n')
numberlines = len(lines)
lines_n += numberlines
print(lines_n)
你这里有一个有趣的问题。您有 n
个进程,每个进程都指定了要处理的数据块的位置,但您无法提供块的 确切 位置,因为您正在处理行并且您的位置以字节为单位。即使您将文件拆分成行以获得块的精确位置,您也会遇到一些问题。
这是一个次优的解决方案(我假设您不想按顺序处理行:这看起来很明显):
- 像第一次尝试一样将文件分成块;
- 对于每个块,找到第一个和最后一个换行符。块格式为:
B\nM\nA
其中 B
(之前)和 A
(之后)不包含任何换行符,但 M
可能包含换行符;
- 处理
M
中的行并将 B\nA
放入当前块索引的列表中;
- 最后,处理所有
B\nA
个元素。
这是次优的,因为一旦你处理完每个 M
,你仍然必须处理所有 B\nA
,最后的工作必须等待其他进程完成。
代码如下:
def chunkify(file_end, buf_size=1024):
"""Yield chunks of `buf_size` bytes"""
for chunk_start in range(0, file_end, buf_size):
yield chunk_start, min(buf_size, file_end - chunk_start)
def process_batch(remainders, i, f, chunk_start, chunk_size):
"""Process a chunk"""
f.seek(chunk_start)
chunk = f.read(chunk_size)
chunk, remainders[i] = normalize(chunk)
# process chunk here if chunk is not None
return chunk
def normalize(chunk):
"""Return `M, B\nA`
The chunk format is `B\nM\nA` where `B` (before) and `A` (after) do not contains any line feed,
but `M` may contain line feeds"""
i = chunk.find(b"\n")
j = chunk.rfind(b"\n")
if i == -1 or i == j:
return None, chunk
else:
return chunk[i+1:j], chunk[:i]+chunk[j:]
注意,如果块没有中间(M
部分),那么我们return None
作为块,所有内容都发送到remainders
。
一些测试:
text = """She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was .2 million and .1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1"""
import io, os
def get_line_count(chunk):
return 0 if chunk is None else len(chunk.split(b"\n"))
def process(f, buf_size):
f.seek(0, os.SEEK_END)
file_end = f.tell()
remainders = [b""]*(file_end//buf_size + 1)
L = 0
for i, (start, n) in enumerate(chunkify(file_end, buf_size)):
chunk = process_batch(remainders, i, f, start, n)
L += get_line_count(chunk)
print("first pass: lines processed", L)
print("remainders", remainders)
last_chunk = b"".join(remainders)
print("size of last chunk {} bytes, {} lines".format(len(last_chunk), get_line_count(last_chunk)))
L += get_line_count(last_chunk)
print("second pass: lines processed", L)
process(io.BytesIO(bytes(text, "utf-8")), 256)
process(io.BytesIO(bytes(text, "utf-8")), 512)
with open("/home/jferard/prog/stackoverlfow/ep+gutenberg+news+wiki.txt", 'rb') as f:
process(f, 4096)
with open("/home/jferard/prog/stackoverlfow/ep+gutenberg+news+wiki.txt", 'rb') as f:
process(f, 16384)
输出:
first pass: lines processed 18
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked.\nWhat sort', b" of things do YOU remember best?'\nFor women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for br", b'east cancer.\nBut, Frum explai', b'ns: "Glenn Beck takes it into his head that this guy is bad news."\nThe EAC was created in 2002 to help avoid a repeat of the dispu', b'ted 2000 presidential election.\nThe auction hig', b"hlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disor", b'der.\nAlso there is Meghn', b'a River in the west of Brahmanbaria.\nHer mother first to', b'ok her daughter swimming at the age of three to help her with her cerebal palsy.\nS', b'ep 19: Eibar (h) WON 6-1']
size of last chunk 880 bytes, 9 lines
second pass: lines processed 27
first pass: lines processed 21
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked.\nFor women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for br', b'east cancer.\nThe EAC was created in 2002 to help avoid a repeat of the dispu', b"ted 2000 presidential election.\nThe auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disor", b'der.\nHer mother first to', b'ok her daughter swimming at the age of three to help her with her cerebal palsy.\nSep 19: Eibar (h) WON 6-1']
size of last chunk 698 bytes, 6 lines
second pass: lines processed 27
first pass: lines processed 96963
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked, but where the native Dutch were often less than warm to her and her fellow exiles.\nOne of the Ffarquhar ', ..., b'the old device, Apple will give customers a gift card that can be applied toward the purchase of the new iPhone.']
size of last chunk 517905 bytes, 3037 lines
second pass: lines processed 100000
first pass: lines processed 99240
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked, but where the native Dutch were often less than warm to her and her fellow exiles.\nSoon Carroll was in push-up position walking her hands tow', b'ard the mirror at one side of the room while her feet were dragged along by the casual dinnerware.\nThe track "Getaway" was inspired by and allud', ..., b'the old device, Apple will give customers a gift card that can be applied toward the purchase of the new iPhone.']
size of last chunk 130259 bytes, 760 lines
second pass: lines processed 100000
最后一个示例显示您可以并行处理 100,000 行中的 99,240 行,但在所有处理完成后您必须处理最后 760 行 (130kio)。
关于并发的注意事项:每个子进程拥有 remainders
列表的一个固定单元格,因此应该没有内存损坏。将每个余数存储在其自己的进程对象(真正子进程的包装器)中并在进程完成后加入所有余数可能会更干净。
当以文本模式打开文件时(您的第二个代码示例),然后read
将size
参数视为"number of characters"(不是字节),但seek
和 tell
与 "empty buffer" 在文件中的当前位置相关,因此:
- 您可以根据
len(l)
计算块大小(供 read
使用)
使用file_end = stat(pfin).st_size
计算最后一个块的大小不正确(因为对于utf-8
编码,非拉丁字母的字符数可能不等于使用的字节数)
f.tell()
仍然不能用于计算块大小,但给出 chunk_start
的正确结果。我认为这在某种程度上与 TextIOWrapper
的缓冲有关:tell
提供有关缓冲区+解码器状态的信息,而不是有关文本流中实际位置的信息。可以看看参考实现(def _read_chunk, def tell) and see that it's all complicated and no-one should trust to deltas calculated from different tell
/seek
calls ("# Grab all the decoded text (we will rewind any extra bits later)."又提示了"incorrect"个位置的原因)
Seek/tell 对于 "seeking" 可以正常工作,但不能用于计算 tell
-s 之间的字符数(甚至字节数也不正确)。要获得正确的 byte
增量二进制非缓冲模式,应使用 (with open(path, 'rb', buffering=0) as f: ...
),但在这种情况下,开发人员应确保所有读取 return "full characters"(在 "utf-8"不同的字符有不同的字节长度)
但只需使用 chunk_size + =len(l)
即可解决所有问题,因此您可以继续使用文本模式打开文件!您的代码的下一个修改版本似乎按预期工作:
from functools import partial
def chunkify(pfin, max_lines=1500):
with open(pfin, 'r', encoding='utf-8') as f:
chunk_start = f.tell()
chunk_size = 0
done = True
for idx, l in enumerate(iter(f.readline, '')):
chunk_size += len(l)
done = False
if idx != 0 and idx % max_lines == 0:
yield chunk_start, chunk_size
done = True
chunk_start = f.tell()
chunk_size = 0
if not done:
yield chunk_start, chunk_size
def process_batch(pfin, chunk_start, chunk_size):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size).splitlines()
batch = list(filter(None, chunk))
return batch
if __name__ == '__main__':
fin = r'data/100000-ep+gutenberg+news+wiki.txt'
process_func = partial(process_batch, fin)
lines_n = 0
prev_last = ''
for start, size in chunkify(fin):
lines = process_func(start, size)
if not lines:
continue
# print first and last ten sentences of batch
for line in lines[:10]:
print(line)
print('...')
for line in lines[-10:]:
print(line)
print('\n')
lines_n += len(lines)
print(lines_n)
我正在尝试使用 Python 以块的形式读取和处理一个大文件。我正在关注 this blog,它提出了一种非常快速的方法来读取和处理分布在多个进程中的大块数据。我只是稍微更新了现有代码,即使用 stat(fin).st_size
而不是 os.path.getsize
。在这个例子中,我也没有实现多处理,因为这个问题也体现在单个进程中。这使得调试更容易。
我在使用这段代码时遇到的问题是它 return 的句子不完整。这是有道理的:指针不考虑行尾,只是 return 一些给定的字节大小。实际上,人们会假设您可以通过省略获取的一批行中的最后一项来解决这个问题,因为那很可能是虚线。不幸的是,这也不能可靠地工作。
from os import stat
def chunkify(pfin, buf_size=1024):
file_end = stat(pfin).st_size
with open(pfin, 'rb') as f:
chunk_end = f.tell()
while True:
chunk_start = chunk_end
f.seek(buf_size, 1)
f.readline()
chunk_end = f.tell()
yield chunk_start, chunk_end - chunk_start
if chunk_end > file_end:
break
def process_batch(pfin, chunk_start, chunk_size):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
batch = f.read(chunk_size).splitlines()
# changing this to batch[:-1] will result in 26 lines total
return batch
if __name__ == '__main__':
fin = r'data/tiny.txt'
lines_n = 0
for start, size in chunkify(fin):
lines = process_batch(fin, start, size)
# Uncomment to see broken lines
# for line in lines:
# print(line)
# print('\n')
lines_n += len(lines)
print(lines_n)
# 29
上面的代码将打印 29
作为已处理行的总数。当您不 return 批处理的最后一项时,天真地假设那是一条虚线,您将得到 26
。实际行数为27行,测试数据见下
She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was .2 million and .1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1
如果打印出创建的行,您会发现确实出现了断句。我觉得这很奇怪。 难道 f.readline()
不应该确保在下一行之前读取文件吗? 在下面的输出中,空行将两个批次分开。这意味着您不能在批次中检查一行与下一行,如果它是子字符串则将其删除 - 断句属于另一个批次而不是完整句子。
...
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, r
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
...
有没有办法去掉这些断句,又不会去掉太多?
您可以下载更大的测试文件(100,000行)here.
经过大量挖掘,似乎实际上是某些无法访问的缓冲区导致了 seek 的不一致行为,如所讨论的 here and here。我尝试了将 iter(f.readline, '')
与 seek
一起使用的建议解决方案,但这仍然给我不一致的结果。我已将我的代码更新为 return 每批 1500 行之后的文件指针,但实际上批次 return 会重叠。
from os import stat
from functools import partial
def chunkify(pfin, max_lines=1500):
file_end = stat(pfin).st_size
with open(pfin, 'r', encoding='utf-8') as f:
chunk_end = f.tell()
for idx, l in enumerate(iter(f.readline, '')):
if idx % max_lines == 0:
chunk_start = chunk_end
chunk_end = f.tell()
# yield start position, size, and is_last
yield chunk_start, chunk_end - chunk_start
chunk_start = chunk_end
yield chunk_start, file_end
def process_batch(pfin, chunk_start, chunk_size):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size).splitlines()
batch = list(filter(None, chunk))
return batch
if __name__ == '__main__':
fin = r'data/100000-ep+gutenberg+news+wiki.txt'
process_func = partial(process_batch, fin)
lines_n = 0
prev_last = ''
for start, size in chunkify(fin):
lines = process_func(start, size)
if not lines:
continue
# print first and last ten sentences of batch
for line in lines[:10]:
print(line)
print('...')
for line in lines[-10:]:
print(line)
print('\n')
lines_n += len(lines)
print(lines_n)
下面是重叠批次的示例。最后一批的前两句半是从前一批的最后一句复制而来的。我不知道如何解释或解决这个问题。
...
The EC ordered the SFA to conduct probes by June 30 and to have them confirmed by a certifying authority or it would deduct a part of the funding or the entire sum from upcoming EU subsidy payments.
Dinner for two, with wine, 250 lari.
It lies a few kilometres north of the slightly higher Weissmies and also close to the slightly lower Fletschhorn on the north.
For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one els
For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one else, whatever their insights or artistic pedigree, is quite as interesting.
Sajid Nadiadwala's reboot version of his cult classic "Judwaa", once again directed by David Dhawan titled "Judwaa 2" broke the dry spell running at the box office in 2017.
They warned that there will be a breaking point, although it is not clear what that would be.
...
除此之外,我还尝试从原始代码中删除 readline
,并跟踪剩余的、不完整的块。然后将不完整的块传递给下一个块并添加到其前面。我现在 运行 遇到的问题是,因为文本是按字节块读取的,所以一个块可能会在没有完全完成一个字符的字节的情况下结束。这将导致解码错误。
from os import stat
def chunkify(pfin, buf_size=1024):
file_end = stat(pfin).st_size
with open(pfin, 'rb') as f:
chunk_end = f.tell()
while True:
chunk_start = chunk_end
f.seek(buf_size, 1)
chunk_end = f.tell()
is_last = chunk_end >= file_end
# yield start position, size, and is_last
yield chunk_start, chunk_end - chunk_start, is_last
if is_last:
break
def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size)
# Add previous leftover to current chunk
chunk = leftover + chunk
batch = chunk.splitlines()
batch = list(filter(None, batch))
# If this chunk is not the last one,
# pop the last item as that will be an incomplete sentence
# We return this leftover to use in the next chunk
if not is_last:
leftover = batch.pop(-1)
return batch, leftover
if __name__ == '__main__':
fin = r'ep+gutenberg+news+wiki.txt'
lines_n = 0
left = ''
for start, size, last in chunkify(fin):
lines, left = process_batch(fin, start, size, last, left)
if not lines:
continue
for line in lines:
print(line)
print('\n')
numberlines = len(lines)
lines_n += numberlines
print(lines_n)
运行上面的代码,必然会产生UnicodeDecodeError
.
Traceback (most recent call last):
File "chunk_tester.py", line 46, in <module>
lines, left = process_batch(fin, start, size, last, left)
File "chunk_tester.py", line 24, in process_batch
chunk = f.read(chunk_size)
File "lib\codecs.py", line 322, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa9 in position 0: invalid start byte
你如此接近!对最终代码进行相对简单的更改(以 bytes
而不是 str
的形式读取数据)使其全部(几乎)正常工作。
主要问题是因为从二进制文件读取计数 字节 ,但从文本文件读取计数 text,而您第一次以字节计数,而你的第二个以 个字符 计数,这导致你对哪些数据已被读取的假设是错误的。这与内部隐藏缓冲区无关。
其他变化:
- 代码需要在
b'\n'
上拆分,而不是使用bytes.splitlines()
,并且只在相关检测代码后删除空行。 - 除非文件的大小发生变化(在这种情况下,您现有的代码无论如何都会中断 ),
chunkify
可以替换为更简单、更快速的功能循环无需打开文件即可完全相同。
这给出了最终代码:
from os import stat
def chunkify(pfin, buf_size=1024**2):
file_end = stat(pfin).st_size
i = -buf_size
for i in range(0, file_end - buf_size, buf_size):
yield i, buf_size, False
leftover = file_end % buf_size
if leftover == 0: # if the last section is buf_size in size
leftover = buf_size
yield i + buf_size, leftover, True
def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
with open(pfin, 'rb') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size)
# Add previous leftover to current chunk
chunk = leftover + chunk
batch = chunk.split(b'\n')
# If this chunk is not the last one,
# pop the last item as that will be an incomplete sentence
# We return this leftover to use in the next chunk
if not is_last:
leftover = batch.pop(-1)
return [s.decode('utf-8') for s in filter(None, batch)], leftover
if __name__ == '__main__':
fin = r'ep+gutenberg+news+wiki.txt'
lines_n = 0
left = b''
for start, size, last in chunkify(fin):
lines, left = process_batch(fin, start, size, last, left)
if not lines:
continue
for line in lines:
print(line)
print('\n')
numberlines = len(lines)
lines_n += numberlines
print(lines_n)
你这里有一个有趣的问题。您有 n
个进程,每个进程都指定了要处理的数据块的位置,但您无法提供块的 确切 位置,因为您正在处理行并且您的位置以字节为单位。即使您将文件拆分成行以获得块的精确位置,您也会遇到一些问题。
这是一个次优的解决方案(我假设您不想按顺序处理行:这看起来很明显):
- 像第一次尝试一样将文件分成块;
- 对于每个块,找到第一个和最后一个换行符。块格式为:
B\nM\nA
其中B
(之前)和A
(之后)不包含任何换行符,但M
可能包含换行符; - 处理
M
中的行并将B\nA
放入当前块索引的列表中; - 最后,处理所有
B\nA
个元素。
这是次优的,因为一旦你处理完每个 M
,你仍然必须处理所有 B\nA
,最后的工作必须等待其他进程完成。
代码如下:
def chunkify(file_end, buf_size=1024):
"""Yield chunks of `buf_size` bytes"""
for chunk_start in range(0, file_end, buf_size):
yield chunk_start, min(buf_size, file_end - chunk_start)
def process_batch(remainders, i, f, chunk_start, chunk_size):
"""Process a chunk"""
f.seek(chunk_start)
chunk = f.read(chunk_size)
chunk, remainders[i] = normalize(chunk)
# process chunk here if chunk is not None
return chunk
def normalize(chunk):
"""Return `M, B\nA`
The chunk format is `B\nM\nA` where `B` (before) and `A` (after) do not contains any line feed,
but `M` may contain line feeds"""
i = chunk.find(b"\n")
j = chunk.rfind(b"\n")
if i == -1 or i == j:
return None, chunk
else:
return chunk[i+1:j], chunk[:i]+chunk[j:]
注意,如果块没有中间(M
部分),那么我们return None
作为块,所有内容都发送到remainders
。
一些测试:
text = """She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was .2 million and .1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1"""
import io, os
def get_line_count(chunk):
return 0 if chunk is None else len(chunk.split(b"\n"))
def process(f, buf_size):
f.seek(0, os.SEEK_END)
file_end = f.tell()
remainders = [b""]*(file_end//buf_size + 1)
L = 0
for i, (start, n) in enumerate(chunkify(file_end, buf_size)):
chunk = process_batch(remainders, i, f, start, n)
L += get_line_count(chunk)
print("first pass: lines processed", L)
print("remainders", remainders)
last_chunk = b"".join(remainders)
print("size of last chunk {} bytes, {} lines".format(len(last_chunk), get_line_count(last_chunk)))
L += get_line_count(last_chunk)
print("second pass: lines processed", L)
process(io.BytesIO(bytes(text, "utf-8")), 256)
process(io.BytesIO(bytes(text, "utf-8")), 512)
with open("/home/jferard/prog/stackoverlfow/ep+gutenberg+news+wiki.txt", 'rb') as f:
process(f, 4096)
with open("/home/jferard/prog/stackoverlfow/ep+gutenberg+news+wiki.txt", 'rb') as f:
process(f, 16384)
输出:
first pass: lines processed 18
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked.\nWhat sort', b" of things do YOU remember best?'\nFor women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for br", b'east cancer.\nBut, Frum explai', b'ns: "Glenn Beck takes it into his head that this guy is bad news."\nThe EAC was created in 2002 to help avoid a repeat of the dispu', b'ted 2000 presidential election.\nThe auction hig', b"hlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disor", b'der.\nAlso there is Meghn', b'a River in the west of Brahmanbaria.\nHer mother first to', b'ok her daughter swimming at the age of three to help her with her cerebal palsy.\nS', b'ep 19: Eibar (h) WON 6-1']
size of last chunk 880 bytes, 9 lines
second pass: lines processed 27
first pass: lines processed 21
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked.\nFor women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for br', b'east cancer.\nThe EAC was created in 2002 to help avoid a repeat of the dispu', b"ted 2000 presidential election.\nThe auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disor", b'der.\nHer mother first to', b'ok her daughter swimming at the age of three to help her with her cerebal palsy.\nSep 19: Eibar (h) WON 6-1']
size of last chunk 698 bytes, 6 lines
second pass: lines processed 27
first pass: lines processed 96963
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked, but where the native Dutch were often less than warm to her and her fellow exiles.\nOne of the Ffarquhar ', ..., b'the old device, Apple will give customers a gift card that can be applied toward the purchase of the new iPhone.']
size of last chunk 517905 bytes, 3037 lines
second pass: lines processed 100000
first pass: lines processed 99240
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked, but where the native Dutch were often less than warm to her and her fellow exiles.\nSoon Carroll was in push-up position walking her hands tow', b'ard the mirror at one side of the room while her feet were dragged along by the casual dinnerware.\nThe track "Getaway" was inspired by and allud', ..., b'the old device, Apple will give customers a gift card that can be applied toward the purchase of the new iPhone.']
size of last chunk 130259 bytes, 760 lines
second pass: lines processed 100000
最后一个示例显示您可以并行处理 100,000 行中的 99,240 行,但在所有处理完成后您必须处理最后 760 行 (130kio)。
关于并发的注意事项:每个子进程拥有 remainders
列表的一个固定单元格,因此应该没有内存损坏。将每个余数存储在其自己的进程对象(真正子进程的包装器)中并在进程完成后加入所有余数可能会更干净。
当以文本模式打开文件时(您的第二个代码示例),然后read
将size
参数视为"number of characters"(不是字节),但seek
和 tell
与 "empty buffer" 在文件中的当前位置相关,因此:
- 您可以根据
len(l)
计算块大小(供 使用
file_end = stat(pfin).st_size
计算最后一个块的大小不正确(因为对于utf-8
编码,非拉丁字母的字符数可能不等于使用的字节数)f.tell()
仍然不能用于计算块大小,但给出chunk_start
的正确结果。我认为这在某种程度上与TextIOWrapper
的缓冲有关:tell
提供有关缓冲区+解码器状态的信息,而不是有关文本流中实际位置的信息。可以看看参考实现(def _read_chunk, def tell) and see that it's all complicated and no-one should trust to deltas calculated from differenttell
/seek
calls ("# Grab all the decoded text (we will rewind any extra bits later)."又提示了"incorrect"个位置的原因)
read
使用)
Seek/tell 对于 "seeking" 可以正常工作,但不能用于计算 tell
-s 之间的字符数(甚至字节数也不正确)。要获得正确的 byte
增量二进制非缓冲模式,应使用 (with open(path, 'rb', buffering=0) as f: ...
),但在这种情况下,开发人员应确保所有读取 return "full characters"(在 "utf-8"不同的字符有不同的字节长度)
但只需使用 chunk_size + =len(l)
即可解决所有问题,因此您可以继续使用文本模式打开文件!您的代码的下一个修改版本似乎按预期工作:
from functools import partial
def chunkify(pfin, max_lines=1500):
with open(pfin, 'r', encoding='utf-8') as f:
chunk_start = f.tell()
chunk_size = 0
done = True
for idx, l in enumerate(iter(f.readline, '')):
chunk_size += len(l)
done = False
if idx != 0 and idx % max_lines == 0:
yield chunk_start, chunk_size
done = True
chunk_start = f.tell()
chunk_size = 0
if not done:
yield chunk_start, chunk_size
def process_batch(pfin, chunk_start, chunk_size):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size).splitlines()
batch = list(filter(None, chunk))
return batch
if __name__ == '__main__':
fin = r'data/100000-ep+gutenberg+news+wiki.txt'
process_func = partial(process_batch, fin)
lines_n = 0
prev_last = ''
for start, size in chunkify(fin):
lines = process_func(start, size)
if not lines:
continue
# print first and last ten sentences of batch
for line in lines[:10]:
print(line)
print('...')
for line in lines[-10:]:
print(line)
print('\n')
lines_n += len(lines)
print(lines_n)