Python: 在脚本中实现多线程标志选项
Python: implementing multi-thread flag option in a script
我正在编写一个简单的脚本,它将两个 TSV 文件(file_a.tsv
和 file_b.tsv
)作为输入并解析 file_a.tsv
以检查它们是否包含在 file_b.tsv
中指定的范围内。这是脚本:
import os
import sys
import argparse
import pandas as pd
# Defining the main function:
def myfunc() -> tuple:
ap = argparse.ArgumentParser()
ap.add_argument("-a", "--file_a", help="path to file_a")
ap.add_argument("-b", "--file_b", help="path to file_b")
return ap.parse_args()
args = myfunc()
file_a = args.file_a
file_b = args.file_b
# Initialising of file_a and file_b as dataframes
with open(file_a, 'r') as a:
file_a_df = pd.read_table(a, header=None)
with open(file_b, 'r') as b:
file_b_df = pd.read_table(b, header=None)
file_b_df.columns = ["seqname", "source", "feature", "start", "end", "score", "strand", "frame", "attribute"]
# Here's two list to be used
contained = []
not_contained = []
# Defining a function for the file_a parsing
def fileparser(val):
for index, row in file_b_df.iterrows():
if val >= row['start'] and val <= row['end']:
contained.append(val)
return True
not_contained.append(val)
# Apply fileparser
file_a.iloc[:, 1].apply(fileparser)
print("Contained: ", len(contained))
print("Not contained: ", len(not_contained)
运行 它从终端看起来像这样:
python my_script.py -a "path/to/file_a" -b "path/to/file_b"
问题来了:file_a
有将近 700 万个值,file_b
有数千个范围需要检查,所以要用主线程完成这是一个相当大的过程。
我想像下面这样向我的管道添加一个 -p
标志,以实现多线程选项并加快进程:
python my_script.py -p 8 -a "path/to/file_a" -b "path/to/file_b"
我知道可以导入 threading
库,如何将它添加到我的脚本中?谢谢。
如果您有数百万行代码,最好在并行化之前先优化算法的复杂性 (big-o)。在这里,您将所有 a
项目与所有 b
范围进行比较,并且使用慢速方法 .apply()
并手动迭代行 - 让我们也将所有内容替换为 pandas/numpy基元。
免费的好处是 您不再需要自己处理并行性,numpy 会在后台为您完成。例如,参见“使用并行基元”部分下的 this guide。
我假设出于优化目的,您在 file_a 和 M
不同的间隔中有 N
个值。
1。我们可以首先将所有 b
间隔合并为最小数量的不同间隔
>>> starts = df['start'].value_counts().sort_index()
>>> ends = df['end'].value_counts().sort_index()
>>> ends.index += 1
>>> idx = ends.index.join(starts.index, how='outer')
>>> depth = (starts.reindex(idx, fill_value=0) - ends.reindex(idx, fill_value=0)).cumsum()
>>> depth[depth.eq(0) | depth.eq(0).shift(fill_value=True)]
0 1
36 0
38 1
40 0
41 1
86 0
87 1
103 0
dtype: int64
>>> min_intervals = pd.DataFrame({
... 'start': idx[depth.eq(0).shift(fill_value=True)],
... 'end': idx[depth.eq(0)] - 1
... })
>>> min_intervals
start end
0 0 35
1 38 39
2 41 85
3 87 102
这里的主要微妙之处是 +1
和 -1
到结束边界,因为边界是包容性的,我们想要正确计算连续间隔的并集,即 [3, 5]
[6, 8]
应该是 [3, 8]
当然,如果您知道您的间隔已经分开,那么您可以跳过所有这些并执行 min_intervals = file_b_df[['start', 'end']].sort_values('start')
。
2。将值与排序边界进行比较,O(N×M) -> O(N×log(M))
请注意,现在 min_intervals
已排序,如果我们将其堆叠,我们将按排序顺序获得边界。现在让我们加回 +1
,我们可以使用 pd.Series.searchsorted()
来找出在这个边界序列中应该插入一个数字以保持顺序的位置:
>>> bounds = min_intervals.stack()
>>> bounds.loc[slice(None), ['end']] += 1
>>> bounds
0 start 0
end 36
1 start 38
end 40
2 start 41
end 86
3 start 87
end 103
dtype: int64
>>> bounds.searchsorted([0, 1, 35, 36, 100, 86], 'right')
array([1, 1, 1, 2, 7, 6])
如您所见,区间内的数字([0, 35] 中的 0、1 和 35,[87, 103] 中的 100)return 是奇数,非奇数在一个区间 (36, 86) return 一个偶数。
>>> file_a = pd.DataFrame({'val': [0, 1, 35, 36, 100, 86]})
>>> contained = pd.Series(bounds.searchsorted(file_a['val'], 'right'), index=file_a.index).mod(2).eq(1)
>>> file_a[contained]
val
0 0
1 1
2 35
4 100
>>> file_a[~contained]
val
3 36
5 86
如果您不想修改 file_a
,这很有效,当然如果您愿意排序 file_a
,您可以获得更快的结果(假设 N > M)。
3。将排序后的值与区间边界进行比较,O(N×log(M)) -> O(log(N)×M)
>>> file_a.sort_values('val', inplace=True)
从那里我们现在可以在 file_a
上使用 searchsorted
,例如计算包含的值行:
>>> rows = pd.Series(file_a['val'].searchsorted(bounds, 'left'), index=bounds.index)
>>> rows
0 start 0
end 3
1 start 4
end 4
2 start 4
end 4
3 start 5
end 6
dtype: int64
>>> pd.concat([file_a.iloc[slice(*v)] for k, v in rows.groupby(level=0)])
val
0 0
1 1
2 35
4 100
或者我们可以用与间隔交集相同的方式构造布尔索引器:
>>> contained = rows.xs('start', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained -= rows.xs('end', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained = contained.cumsum().reindex(np.arange(len(file_a))).ffill().eq(1)
>>> contained.index = file_a.index # Index was row numbers, use file_a’s index
>>> contained
0 True
1 True
2 True
3 False
5 False
4 True
dtype: bool
>>> file_a[contained]
val
0 0
1 1
2 35
4 100
>>> file_a[~contained]
val
3 36
5 86
现在这个策略的另一个好处是我们编写的代码中没有循环。所以除了允许 numpy 使用它之外,我们不需要关心并行化。如果您真的想自己添加并行优化,this other question 会很有趣。
我正在编写一个简单的脚本,它将两个 TSV 文件(file_a.tsv
和 file_b.tsv
)作为输入并解析 file_a.tsv
以检查它们是否包含在 file_b.tsv
中指定的范围内。这是脚本:
import os
import sys
import argparse
import pandas as pd
# Defining the main function:
def myfunc() -> tuple:
ap = argparse.ArgumentParser()
ap.add_argument("-a", "--file_a", help="path to file_a")
ap.add_argument("-b", "--file_b", help="path to file_b")
return ap.parse_args()
args = myfunc()
file_a = args.file_a
file_b = args.file_b
# Initialising of file_a and file_b as dataframes
with open(file_a, 'r') as a:
file_a_df = pd.read_table(a, header=None)
with open(file_b, 'r') as b:
file_b_df = pd.read_table(b, header=None)
file_b_df.columns = ["seqname", "source", "feature", "start", "end", "score", "strand", "frame", "attribute"]
# Here's two list to be used
contained = []
not_contained = []
# Defining a function for the file_a parsing
def fileparser(val):
for index, row in file_b_df.iterrows():
if val >= row['start'] and val <= row['end']:
contained.append(val)
return True
not_contained.append(val)
# Apply fileparser
file_a.iloc[:, 1].apply(fileparser)
print("Contained: ", len(contained))
print("Not contained: ", len(not_contained)
运行 它从终端看起来像这样:
python my_script.py -a "path/to/file_a" -b "path/to/file_b"
问题来了:file_a
有将近 700 万个值,file_b
有数千个范围需要检查,所以要用主线程完成这是一个相当大的过程。
我想像下面这样向我的管道添加一个 -p
标志,以实现多线程选项并加快进程:
python my_script.py -p 8 -a "path/to/file_a" -b "path/to/file_b"
我知道可以导入 threading
库,如何将它添加到我的脚本中?谢谢。
如果您有数百万行代码,最好在并行化之前先优化算法的复杂性 (big-o)。在这里,您将所有 a
项目与所有 b
范围进行比较,并且使用慢速方法 .apply()
并手动迭代行 - 让我们也将所有内容替换为 pandas/numpy基元。
免费的好处是 您不再需要自己处理并行性,numpy 会在后台为您完成。例如,参见“使用并行基元”部分下的 this guide。
我假设出于优化目的,您在 file_a 和 M
不同的间隔中有 N
个值。
1。我们可以首先将所有 b
间隔合并为最小数量的不同间隔
>>> starts = df['start'].value_counts().sort_index()
>>> ends = df['end'].value_counts().sort_index()
>>> ends.index += 1
>>> idx = ends.index.join(starts.index, how='outer')
>>> depth = (starts.reindex(idx, fill_value=0) - ends.reindex(idx, fill_value=0)).cumsum()
>>> depth[depth.eq(0) | depth.eq(0).shift(fill_value=True)]
0 1
36 0
38 1
40 0
41 1
86 0
87 1
103 0
dtype: int64
>>> min_intervals = pd.DataFrame({
... 'start': idx[depth.eq(0).shift(fill_value=True)],
... 'end': idx[depth.eq(0)] - 1
... })
>>> min_intervals
start end
0 0 35
1 38 39
2 41 85
3 87 102
这里的主要微妙之处是 +1
和 -1
到结束边界,因为边界是包容性的,我们想要正确计算连续间隔的并集,即 [3, 5]
[6, 8]
应该是 [3, 8]
当然,如果您知道您的间隔已经分开,那么您可以跳过所有这些并执行 min_intervals = file_b_df[['start', 'end']].sort_values('start')
。
2。将值与排序边界进行比较,O(N×M) -> O(N×log(M))
请注意,现在 min_intervals
已排序,如果我们将其堆叠,我们将按排序顺序获得边界。现在让我们加回 +1
,我们可以使用 pd.Series.searchsorted()
来找出在这个边界序列中应该插入一个数字以保持顺序的位置:
>>> bounds = min_intervals.stack()
>>> bounds.loc[slice(None), ['end']] += 1
>>> bounds
0 start 0
end 36
1 start 38
end 40
2 start 41
end 86
3 start 87
end 103
dtype: int64
>>> bounds.searchsorted([0, 1, 35, 36, 100, 86], 'right')
array([1, 1, 1, 2, 7, 6])
如您所见,区间内的数字([0, 35] 中的 0、1 和 35,[87, 103] 中的 100)return 是奇数,非奇数在一个区间 (36, 86) return 一个偶数。
>>> file_a = pd.DataFrame({'val': [0, 1, 35, 36, 100, 86]})
>>> contained = pd.Series(bounds.searchsorted(file_a['val'], 'right'), index=file_a.index).mod(2).eq(1)
>>> file_a[contained]
val
0 0
1 1
2 35
4 100
>>> file_a[~contained]
val
3 36
5 86
如果您不想修改 file_a
,这很有效,当然如果您愿意排序 file_a
,您可以获得更快的结果(假设 N > M)。
3。将排序后的值与区间边界进行比较,O(N×log(M)) -> O(log(N)×M)
>>> file_a.sort_values('val', inplace=True)
从那里我们现在可以在 file_a
上使用 searchsorted
,例如计算包含的值行:
>>> rows = pd.Series(file_a['val'].searchsorted(bounds, 'left'), index=bounds.index)
>>> rows
0 start 0
end 3
1 start 4
end 4
2 start 4
end 4
3 start 5
end 6
dtype: int64
>>> pd.concat([file_a.iloc[slice(*v)] for k, v in rows.groupby(level=0)])
val
0 0
1 1
2 35
4 100
或者我们可以用与间隔交集相同的方式构造布尔索引器:
>>> contained = rows.xs('start', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained -= rows.xs('end', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained = contained.cumsum().reindex(np.arange(len(file_a))).ffill().eq(1)
>>> contained.index = file_a.index # Index was row numbers, use file_a’s index
>>> contained
0 True
1 True
2 True
3 False
5 False
4 True
dtype: bool
>>> file_a[contained]
val
0 0
1 1
2 35
4 100
>>> file_a[~contained]
val
3 36
5 86
现在这个策略的另一个好处是我们编写的代码中没有循环。所以除了允许 numpy 使用它之外,我们不需要关心并行化。如果您真的想自己添加并行优化,this other question 会很有趣。