从大文件中检索行的更有效方法

More efficient way to retrieve lines from a huge file

我有一个 ID 为 1,786,916 条记录的数据文件,我想从另一个包含大约 480 万条记录(在本例中为 DNA 序列,但基本上只是纯文本)的文件中检索相应的记录。我写了一个 python 脚本来执行此操作,但是 运行 需要很长时间(第 3 天,它只完成了 12%)。由于我是 python 的相对新手,我想知道是否有人可以提出更快的建议。

这是一个带有 ID 的数据文件示例(示例中的 ID 是 ANICH889-10):

ANICH889-10 k__Animalia; p__Arthropoda; c__Insecta; o__Lepidoptera; f__Psychidae; g__Ardiosteres; s__Ardiosteres sp. ANIC9
ARONW984-15 k__Animalia; p__Arthropoda; c__Arachnida; o__Araneae; f__Clubionidae; g__Clubiona; s__Clubiona abboti

这是包含记录的第二个文件的示例:

>ASHYE2081-10|Creagrura nigripesDHJ01|COI-5P|HM420985
ATTTTATACTTTTTATTAGGAATATGATCAGGAATAATTGGTCTTTCAATAAGAATCATTATCCGTATTGAATTAAGAAATCCAGGATCTATTATTAATAATGACCAAATTTATAATTCATTAATTACTATACACGCACTATTAATAATTTTTTTTTTAGTTATACCTGTAATAATTGGAGGATTTGGAAATTGATTAATTCCTATTATAATTGGAGCCCCAGATATAGCATTTCCACGAATAAACAATCTTAGATTTTGATTATTAATCCCATCAATTTTCATATTAATATTAAGATCAATTACTAATCAAGGTGTAGGAACAGGATGAACAATATATCCCCCATTATCATTAAATATAAATCAAGAAGGAATATCAATAGATATATCAATTTTTTCTTTACATTTAGCAGGAATATCCTCAATTTTAGGATCAATTAATTTCATTTCAACTATTTTAAATATAAAATTTATTAATTCTAATTATGATCAATTAACTTTATTTTCATGATCAATTCTAATTACTACTATTTTATTATTACTAGCAGTCCCTGTATTAGCAGGAGCAATTACTATAATTTTAACTGATCGAAATTTAAATACTTCTTTTTTTGATCCTAGAGGAGGAGGAGATCCAATTT-----------------
>BCISA145-10|Hemiptera|COI-5P
AACTCTATACTTTTTACTAGGATCCTGGGCAGGAATAGTAGGAACATCATTAAGATGAATAATCCGAATTGAACTAGGACAACCTGGATCTTTTATTGGAGATGACCAAACTTATAATGTAATTGTAACTGCCCACGCATTTGTAATAATTTTCTTTATAGTTATACCAATTATAATTGGAGGATTTGGAAATTGATTAATTCCCTTAATAATTGGAGCACCCGATATAGCATTCCCACGAATGAATAACATAAGATTTTGATTGCTACCACCGTCCCTAACACTTCTAATCATAAGTAGAATTACAGAAAGAGGAGCAGGAACAGGATGAACAGTATACCCTCCATTATCCAGAAACATCGCCCATAGAGGAGCATCTGTAGATTTAGCAATCTTTTCCCTACATCTAGCAGGAGTATCATCAATTTTAGGAGCAGTTAACTTCATTTCAACAATTATTAATATACGACCAGCAGGAATAACCCCAGAACGAATCCCATTATTTGTATGATCTGTAGGAATTACAGCACTACTACTCCTACTTTCATTACCCGTACTAGCAGGAGCCATTACCATACTCTTAACTGACCGAAACTTCAATACTTCTTTTTTTGACCCTGCTGGAGGAGGAGATCCCATCCTATATCAACATCTATTC

然而在第二个文件中,DNA 序列被分成几行,而不是一行,而且它们的长度并不总是相同。

编辑

这是我想要的输出:

>ANICH889-10
GGGATTTGGTAATTGATTAGTTCCTTTAATA---TTGGGGGCCCCTGACATAGCTTTTCCTCGTATAAATAATATAAGATTTTGATTATTACCTCCCTCTCTTACATTATTAATTTCAAGAAGAATTGTAGAAAATGGAGCTGGGACTGGATGAACTGTTTACCCTCCTTTATCTTCTAATATCGCCCATAGAGGAAGCTCTGTAGATTTA---GCAATTTTCTCTTTACATTTAGCAGGAATTTCTTCTATTTTAGGAGCAATTAATTTTATTACAACAATTATTAATATACGTTTAAATAATTTATCTTTCGATCAAATACCTTTATTTGTTTGAGCAGTAGGAATTACAGCATTTTTACTATTACTTTCTTTACCTGTATTAGCTGGA---GCTATTACTATATTATTAACT---------------------------------------------------------------------------
>ARONW984-15
TGGTAACTGATTAGTTCCATTAATACTAGGAGCCCCTGATATAGCCTTCCCCCGAATAAATAATATAAGATTTTGACTTTTACCTCCTTCTCTAATTCTTCTTTTATCAAGGTCTATTATNGAAAATGGAGCA---------GGAACTGGCTGAACAGTTTACCCTCCCCTTTCTTNTAATATTTCCCATGCTGGAGCTTCTGTAGATCTTGCAATCTTTTCCCTACACCTAGCAGGTATTTCCTCAATCCTAGGGGCAGTTAAT------TTTATCACAACCGTAATTAACATACGCTCTAGAGGAATTACATTTGATCGAATGCCTTTATTTGTATGATCTGTATTAATTACAGCTATTCTTCTACTACTCTCCCTCCCAGTATTAGCAGGGGCTATTACAATACTACTCACAGACCGAAATTTAAAT-----------------------------------

这是我为此编写的 python 脚本:

from Bio import SeqIO
from Bio.Seq import Seq
import csv
import sys

#Name of the datafile
Taxonomyfile = "02_Arthropoda_specimen_data_less.txt"

#Name of the original sequence file
OrigTaxonSeqsfile = "00_Arthropoda_specimen.fasta"

#Name of the output sequence file
f4 = open("02_Arthropoda_specimen_less.fasta", 'w')

#Reading the datafile and extracting record IDs   
TaxaKeep = []
with open(Taxonomyfile, 'r') as f1:
    datareader = csv.reader(f1, delimiter='\t')
    for item in datareader:
        TaxaKeep.append(item[0])
    print(len(TaxaKeep))    

#Filtering sequence file to keep only those sequences with the desired IDs
datareader = SeqIO.parse(OrigTaxonSeqsfile, "fasta")
for seq in datareader:
    for item in TaxaKeep:
        if item in seq.id:
            f4.write('>' + str(item) + '\n')
            f4.write(str(seq.seq) + '\n')

我认为这里的问题在于我正在为 480 万条记录中的每条记录遍历 170 万条记录名称的列表。我想过为这 480 万条记录制作一本字典什么的,但我想不出怎么做。有什么建议(包括非python建议)吗?

谢谢!

您的推理是正确的,即使用两个嵌套的 for 循环,您将花费时间来执行 4.8 million * 1.7 million 次重复的单个操作。

这就是为什么我们将使用 SQLite 数据库来存储 OrigTaxonSeqsfile 中包含的所有信息。为什么选择 SQLite?因为

  • SQLite 内置 Python
  • SQLite 支持索引

我无法开始解释 CS 理论,但在像您这样的情况下搜索数据时,索引是上帝派来的。

一旦数据被索引,您只需在数据库中查找来自 Taxonomyfile 的每个记录 ID,并将其写入您的 f4 最终输出文件。

以下代码应该可以如您所愿地工作,它具有以下优点:

  • 显示您在处理的行数方面取得的进步
  • 只需要Python3,生物文库不是严格需要
  • 使用生成器,因此不必一次将所有文件读入内存
  • 不依赖于 list/set/dict,因为(在这种情况下)它们可能会消耗太多 RAM

这是代码

import sqlite3
from itertools import groupby
from contextlib import contextmanager

Taxonomyfile = "02_Arthropoda_specimen_data_less.txt"
OrigTaxonSeqsfile = "00_Arthropoda_specimen.fasta"

@contextmanager
def create_db(file_name):
    """ create SQLite db, works as context manager so file is closed safely"""
    conn = sqlite.connect(file_name, isolation_level="IMMEDIATE")
    cur = conn.connect()
    cur.execute("""
        CREATE TABLE taxonomy
        ( _id INTEGER PRIMARY KEY AUTOINCREMENT
        , record_id TEXT NOT NULL
        , record_extras TEXT
        , dna_sequence TEXT
        );
        CREATE INDEX idx_taxn_recID ON taxonomy (record_id);
    """)
    yield cur
    conn.commit()
    conn.close()
    return

def parse_fasta(file_like):
    """ generate that yields tuple containing record id, extra info
    in tail of header and the DNA sequence with newline characters
    """
    # inspiration = https://www.biostars.org/p/710/
    try:
        from Bio import SeqIO
    except ImportError:
        fa_iter = (x[1] for x in groupby(file_like, lambda line: line[0] == ">"))
        for header in fa_iter:
            # remove the >
            info = header.__next__()[1:].strip()
            # seprate record id from rest of the seqn info
            x = info.split('|')
            recID, recExtras = x[0], x[1:]
            # build the DNA seq using generator
            sequence = "".join(s.strip() for s in fa_iter.__next__())
            yield recID, recExtras, sequence
    else:
        fasta_sequences = SeqIO.parse(file_like, 'fasta')
        for fasta in fasta_sequences:
            info, sequence = fasta.id, fasta.seq.tostring()
            # seprate record id from rest of the seqn info
            x = info.split('|')
            recID, recExtras = x[0], x[1:]
            yield recID, recExtras, sequence
    return

def prepare_data(txt_file, db_file):
    """ put data from txt_file into db_file building index on record id """
    i = 0
    src_gen = open(txt_file, mode='rt')
    fasta_gen = parse_fasta(src_gen)
    with create_db(db_file) as db:
        for recID, recExtras, dna_seq in fasta_gen:
            db.execute("""
                INSERT INTO taxonomy
                (record_id, record_extras, dna_sequence) VALUES (?,?,?)
                """,
                [recID, recExtras, dna_seq]
            )
            if i % 100 == 0:
                print(i, 'lines digested into sql database')
    src_gen.close()
    return

def get_DNA_seq_of(recordID, src):
    """ search for recordID in src database and return a formatted string """
    ans = ""
    exn = src.execute("SELECT * FROM taxonomy WHERE record_id=?", [recordID])
    for match in exn.fetchall():
        a, b, c, dna_seq = match
        ans += ">%s\n%s\n" % (recordID, dna_seq)
    return ans

def main():
    # first of all prepare an optimized database
    db_file = txt_file + ".sqlite"
    prepare_data(OrigTaxonSeqsfile)
    # now start searching and writing
    progress = 0
    db = sqlite3.connect(db_file)
    cur = db.cursor()
    out_file = open("02_Arthropoda_specimen_less.fasta", 'wt')
    taxa_file = open(Taxonomyfile, 'rt')
    with taxa_file, out_file:
        for line in taxa_file:
            question = line.split("\t")[0]
            answer = get_DNA_seq_of(question, cur)
            out_file.write(answer)
            if progress % 100 == 0:
                print(progress, 'lines processed')
    db.close()

if __name__ == '__main__':
    main()

如有任何疑问,请随时提出。
如果您收到任何错误或输出不符合预期,请向我发送 200 行示例,每个 TaxonomyfileOrigTaxonSeqsfile,我将更新代码。


速度提升

以下是粗略估计,只谈磁盘I/O,因为那是最慢的部分。

a = 4.8 millionb = 1.7 million.

在旧方法中,您必须执行磁盘 I/O a * b8160 亿 次。

在我的方法中,一旦你做了索引(即 2* 次),你必须搜索 170 万条记录。所以在我的方法中,总时间是 2 * (a + b) 即 1300 万个磁盘 I/O,这也不算小,但这种方法比 60 万倍快

为什么不 dict()

用多了会被老板和教授骂CPU/RAM。如果你拥有这个系统,一个更简单的基于字典的方法是:

from itertools import groupby

Taxonomyfile = "02_Arthropoda_specimen_data_less.txt"
OrigTaxonSeqsfile = "00_Arthropoda_specimen.fasta"

def parse_fasta(file_like):
    """ generate that yields tuple containing record id, extra info
    in tail of header and the DNA sequence with newline characters
    """
    from Bio import SeqIO
    fasta_sequences = SeqIO.parse(file_like, 'fasta')
    for fasta in fasta_sequences:
        info, sequence = fasta.id, fasta.seq.tostring()
        # seprate record id from rest of the seqn info
        x = info.split('|')
        recID, recExtras = x[0], x[1:]
        yield recID, recExtras, sequence
    return

def prepare_data(txt_file, db_file):
    """ put data from txt_file into dct """
    i = 0
    with open(txt_file, mode='rt') as src_gen:
        fasta_gen = parse_fasta(src_gen)
        for recID, recExtras, dna_seq in fasta_gen:
            dct[recID] = dna_seq
            if i % 100 == 0:
                print(i, 'lines digested into sql database')
    return

def get_DNA_seq_of(recordID, src):
    """ search for recordID in src database and return a formatted string """
    ans = ""
    dna_seq = src[recordID]
    ans += ">%s\n%s\n" % (recordID, dna_seq)
    return ans

def main():
    # first of all prepare an optimized database
    dct = dict()
    prepare_data(OrigTaxonSeqsfile, dct)
    # now start searching and writing
    progress = 0
    out_file = open("02_Arthropoda_specimen_less.fasta", 'wt')
    taxa_file = open(Taxonomyfile, 'rt')
    with taxa_file, out_file:
        for line in taxa_file:
            question = line.split("\t")[0]
            answer = get_DNA_seq_of(question, dct)
            out_file.write(answer)
            if progress % 100 == 0:
                print(progress, 'lines processed')
    return

if __name__ == '__main__':
    main()

我认为您可以通过改进 look-up 来大幅提高性能。

使用 set() 可以帮助您。集合旨在处理非常快速的数据 look-up 并且它们不存储重复值,这使它们成为过滤数据的理想选择。因此,让我们将输入文件中的所有分类 ID 存储在一个集合中。

from Bio import SeqIO
from Bio.Seq import Seq
import csv
import sys

taxonomy_file = "02_Arthropoda_specimen_data_less.txt"
orig_taxon_sequence_file = "00_Arthropoda_specimen.fasta"
output_sequence_file = "02_Arthropoda_specimen_less.fasta"

# build a set for fast look-up of IDs
with open(taxonomy_file, 'r', newline='') as fp:
    datareader = csv.reader(fp, delimiter='\t')
    first_column = (row[0] for row in datareader)
    taxonomy_ids = set(first_column)

# use the set to speed up filtering the input FASTA file
with open(output_sequence_file, 'w') as fp:
    for seq in SeqIO.parse(orig_taxon_sequence_file, "fasta"):
        if seq.id in taxonomy_ids: 
            fp.write('>')
            fp.write(seq.id)
            fp.write(seq.seq)
            fp.write('\n')
  • 我已经重命名了您的一些变量。命名一个变量 f4 只是为了在上面的注释中写上“#Name of the output sequence file”是完全没有意义的。为什么不去掉注释,直接把变量命名为output_sequence_file
  • (row[0] for row in datareader) 是一个 生成器理解 。生成器是一个可迭代对象,这意味着它还不计算 ID 列表——它只知道要做什么。这通过 而不是 构建临时列表来节省时间和内存。一行之后,接受迭代的 set() 构造函数将从第一列中的所有 ID 构建一个集合。
  • 在第二个块中,我们使用if seq.id in taxonomy_ids 来检查是否应输出序列ID。 in 在场景上非常快。
  • 我调用 .write() 四次,而不是用四个项目构建一个临时字符串。我假设 seq.idseq.seq 已经是字符串,所以对它们调用 str() 并不是真正必要的。
  • 我不太了解 FASTA 文件格式,但快速浏览 the BioPython documentation 表明使用 SeqIO.write() 将是创建格式的更好方法。

我已经在你的问题下的评论中要求澄清,但现在你没有回应(无意批评),所以我会在我必须离开之前尝试回答你的问题,我的代码基于以下假设。

  1. 在第二个数据文件中,每条记录占两行,第一行是header排序,第二行是ACGT序列。
  2. 在header行我们有一个前缀">",然后一些字段被"|"分隔,这些字段中的第一个是整体的ID,two-lines记录。

在上述假设下

# If possible, no hardcoded filenames, use sys.argv and the command line
import sys

# command line sanity check
if len(sys.argv) != 4:
     print('A descriptive error message')
     sys.exit(1)

# Names of the input and output files
fn1, fn2, fn3 = sys.argv[1:]

# Use a set comprehension to load the IDs from the first file
IDs = {line.split()[0] for line in open(fn1)} # a set

# Operate on the second file
with open(fn2) as f2:

    # It is possible to use `for line in f2: ...` but here we have(?)
    # two line records, so it's a bit different
    while True:

        # Try to read two lines from file
        try:
            header = f2.next()
            payload = f2.next()
        # no more lines? break out from the while loop...
        except StopIteration:
            break

        # Sanity check on the header line
        if header[0] != ">":
            print('Incorrect header line: "%s".'%header)
            sys.exit(1)

        # Split the header line on "|", find the current ID
        ID = header[1:].split("|")[0]

        # Check if the current ID was mentioned in the first file
        if ID in IDs:
            # your code

因为没有内部循环,这应该快 6 个数量级...它是否满足您的需要还有待观察:-)