Python: 在脚本中实现多线程标志选项

Python: implementing multi-thread flag option in a script

我正在编写一个简单的脚本,它将两个 TSV 文件(file_a.tsvfile_b.tsv)作为输入并解析 file_a.tsv 以检查它们是否包含在 file_b.tsv 中指定的范围内。这是脚本:

import os
import sys
import argparse
import pandas as pd

# Defining the main function:
def myfunc() -> tuple:
    ap = argparse.ArgumentParser()
    ap.add_argument("-a", "--file_a", help="path to file_a")
    ap.add_argument("-b", "--file_b", help="path to file_b")
    return ap.parse_args()

args = myfunc()

file_a = args.file_a
file_b = args.file_b

# Initialising of file_a and file_b as dataframes
with open(file_a, 'r') as a:
    file_a_df = pd.read_table(a, header=None)

with open(file_b, 'r') as b:
    file_b_df = pd.read_table(b, header=None)
    file_b_df.columns = ["seqname", "source", "feature", "start", "end", "score", "strand", "frame", "attribute"]

# Here's two list to be used 
contained = []
not_contained = []

# Defining a function for the file_a parsing
def fileparser(val):
    for index, row in file_b_df.iterrows():
        if val >= row['start'] and val <= row['end']:
            contained.append(val)
            return True
    not_contained.append(val)

# Apply fileparser
file_a.iloc[:, 1].apply(fileparser)

print("Contained: ", len(contained))
print("Not contained: ", len(not_contained)

运行 它从终端看起来像这样:

python my_script.py -a "path/to/file_a" -b "path/to/file_b"

问题来了:file_a 有将近 700 万个值,file_b 有数千个范围需要检查,所以要用主线程完成这是一个相当大的过程。

我想像下面这样向我的管道添加一个 -p 标志,以实现多线程选项并加快进程:

python my_script.py -p 8 -a "path/to/file_a" -b "path/to/file_b"

我知道可以导入 threading 库,如何将它添加到我的脚本中?谢谢。

如果您有数百万行代码,最好在并行化之前先优化算法的复杂性 (big-o)。在这里,您将所有 a 项目与所有 b 范围进行比较,并且使用慢速方法 .apply() 并手动迭代行 - 让我们也将所有内容替换为 pandas/numpy基元。

免费的好处是 您不再需要自己处理并行性,numpy 会在后台为您完成。例如,参见“使用并行基元”部分下的 this guide

我假设出于优化目的,您在 file_a 和 M 不同的间隔中有 N 个值。

1。我们可以首先将所有 b 间隔合并为最小数量的不同间隔

>>> starts = df['start'].value_counts().sort_index()
>>> ends = df['end'].value_counts().sort_index()
>>> ends.index += 1
>>> idx = ends.index.join(starts.index, how='outer')
>>> depth = (starts.reindex(idx, fill_value=0) - ends.reindex(idx, fill_value=0)).cumsum()
>>> depth[depth.eq(0) | depth.eq(0).shift(fill_value=True)]
0      1
36     0
38     1
40     0
41     1
86     0
87     1
103    0
dtype: int64
>>> min_intervals = pd.DataFrame({
...     'start': idx[depth.eq(0).shift(fill_value=True)],
...     'end': idx[depth.eq(0)] - 1
... })
>>> min_intervals
   start  end
0      0   35
1     38   39
2     41   85
3     87  102

这里的主要微妙之处是 +1-1 到结束边界,因为边界是包容性的,我们想要正确计算连续间隔的并集,即 [3, 5] [6, 8] 应该是 [3, 8]

当然,如果您知道您的间隔已经分开,那么您可以跳过所有这些并执行 min_intervals = file_b_df[['start', 'end']].sort_values('start')

2。将值与排序边界进行比较,O(N×M) -> O(N×log(M))

请注意,现在 min_intervals 已排序,如果我们将其堆叠,我们将按排序顺序获得边界。现在让我们加回 +1,我们可以使用 pd.Series.searchsorted() 来找出在这个边界序列中应该插入一个数字以保持顺序的位置:

>>> bounds = min_intervals.stack()
>>> bounds.loc[slice(None), ['end']] += 1
>>> bounds
0  start      0
   end       36
1  start     38
   end       40
2  start     41
   end       86
3  start     87
   end      103
dtype: int64
>>> bounds.searchsorted([0, 1, 35, 36, 100, 86], 'right')
array([1, 1, 1, 2, 7, 6])

如您所见,区间内的数字([0, 35] 中的 0、1 和 35,[87, 103] 中的 100)return 是奇数,非奇数在一个区间 (36, 86) return 一个偶数。

>>> file_a = pd.DataFrame({'val': [0, 1, 35, 36, 100, 86]})
>>> contained = pd.Series(bounds.searchsorted(file_a['val'], 'right'), index=file_a.index).mod(2).eq(1)
>>> file_a[contained]
   val
0    0
1    1
2   35
4  100
>>> file_a[~contained]
   val
3   36
5   86

如果您不想修改 file_a,这很有效,当然如果您愿意排序 file_a,您可以获得更快的结果(假设 N > M)。

3。将排序后的值与区间边界进行比较,O(N×log(M)) -> O(log(N)×M)

>>> file_a.sort_values('val', inplace=True)

从那里我们现在可以在 file_a 上使用 searchsorted,例如计算包含的值行:

>>> rows = pd.Series(file_a['val'].searchsorted(bounds, 'left'), index=bounds.index)
>>> rows
0  start    0
   end      3
1  start    4
   end      4
2  start    4
   end      4
3  start    5
   end      6
dtype: int64
>>> pd.concat([file_a.iloc[slice(*v)] for k, v in rows.groupby(level=0)])
   val
0    0
1    1
2   35
4  100

或者我们可以用与间隔交集相同的方式构造布尔索引器:

>>> contained = rows.xs('start', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained -= rows.xs('end', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained = contained.cumsum().reindex(np.arange(len(file_a))).ffill().eq(1)
>>> contained.index = file_a.index  # Index was row numbers, use file_a’s index
>>> contained
0     True
1     True
2     True
3    False
5    False
4     True
dtype: bool
>>> file_a[contained]
   val
0    0
1    1
2   35
4  100
>>> file_a[~contained]
   val
3   36
5   86

现在这个策略的另一个好处是我们编写的代码中没有循环。所以除了允许 numpy 使用它之外,我们不需要关心并行化。如果您真的想自己添加并行优化,this other question 会很有趣。