Python 并行线性回归 - Scoop
Python Linear Regression in parallel - Scoop
我正在尝试 运行 使用 Python 的 Scoop 库对从正态分布随机生成的 10000000 个数据点(4 个特征,1 个目标变量)进行并行线性回归。这是代码:
import pandas as pd
import numpy as np
import random
from scoop import futures
import statsmodels.api as sm
from time import time
def linreg(vals):
global model
model = sm.OLS(y_vals,X_vals).fit()
return model
print(model.summary())
if __name__ == '__main__':
random.seed(42)
vals = pd.DataFrame(np.random.normal(loc = 3, scale = 100, size =(10000000,5)))
vals.columns = ['dep', 'ind1', 'ind2', 'ind3', 'ind4']
y_vals = vals['dep']
X_vals = vals[['ind1', 'ind2', 'ind3', 'ind4']]
bt = time()
model_vals = list(map(linreg, [1,2,3]))
mval = model_vals[0]
print(mval.summary())
serial_time = time() - bt
bt1 = time()
model_vals_1 = list(futures.map(linreg, [1,2,3]))
mval_1 = model_vals_1[0]
print(mval_1.summary())
parallel_time = time() - bt1
print(serial_time, parallel_time)`
然而,之后回归摘要确实是连续生成的 - 通过 Python 的标准映射函数 - 错误:
Traceback (most recent call last): File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py", line 193, in _run_module_as_main "main", mod_spec) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py", line 85, in _run_code exec(code, run_globals) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py", line 302, in b.main() File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py", line 92, in main self.run() File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py", line 290, in run futures_startup() File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py", line 271, in futures_startup run_name="main" File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py", line 64, in _startup result = _controller.switch(rootFuture, *args, **kargs) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop_control.py", line 253, in runController raise future.exceptionValue File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop_control.py", line 127, in runFuture future.resultValue = future.callable(*future.args, **future.kargs) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py", line 263, in run_path pkg_name=pkg_name, script_name=fname) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py", line 96, in _run_module_code mod_name, mod_spec, pkg_name, script_name) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py", line 85, in _run_code exec(code, run_globals) File "Scoop_map_linear_regression1.py", line 33, in model_vals_1 = list(futures.map(linreg, [1,2,3])) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py", line 102, in _mapGenerator for future in _waitAll(*futures): File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py", line 358, in _waitAll for f in _waitAny(future): File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py", line 335, in _waitAny raise childFuture.exceptionValue NameError: name 'y_vals' is not defined
是之后产生的。这意味着代码停止在 model_vals_1 = list(futures.map(linreg, [1,2,3]))
请注意,为了能够 运行 并行执行代码,必须从指定 -m scoop 参数的命令行启动它,如下所示:
python -m scoop Scoop_map_linear_regression1.py
确实,如果没有 -m scoop 参数启动它,它不会被并行化,实际上 运行,但只使用内置 Python 的 map 函数的两倍(因此,运行连续两次),就像您在警告中被报告的方式一样。也就是说,在启动时不指定 -m scoop 参数,futures.map 将被 map 替换,而目标实际上是使用 futures.map.[=16 并行 运行 它=]
这样做是为了避免人们回答说他们通过简单地启动没有 -m scoop 参数的代码就解决了问题,就像这里已经发生的那样:
Python Parallel Computing - Scoop
因此,该问题被错误地搁置为题外话,因为无法重现。
非常感谢,非常感谢和欢迎任何评论。
解决方案是作为 futures.map(但不一定是 map)的第二个参数,仅传递 [1]
.
确实,即使linreg函数不使用传递给map的第二个参数,它仍然决定了linreg函数的次数运行。例如,考虑以下基本示例:
def welcome(x):
print('Hello world!')
if __name__ == '__main__':
a = list(map(welcome, [1,2]))
函数 welcome 实际上不需要任何参数,但输出仍然是
Hello world!
Hello world!
重复两次,即作为第二个参数传递的列表的长度。
在这种特定情况下,这意味着线性回归将 运行 映射 3 次,尽管回归输出将只出现一次,因为在地图外调用摘要。
重点是,运行 多次线性回归 futures.map 是不可能的。问题是,显然,在第一个 运行 之后,它实际上删除了使用过的数据集,从中不可能继续第二个和第三个 运行,以及随后的
NameError: name 'y_vals' is not defined
在跟踪结束时抛出。这应该通过导航可见:scoop.futures source code
没有遍历所有内容,但我想问题应该与 greenlet 切换器有关。
我正在尝试 运行 使用 Python 的 Scoop 库对从正态分布随机生成的 10000000 个数据点(4 个特征,1 个目标变量)进行并行线性回归。这是代码:
import pandas as pd
import numpy as np
import random
from scoop import futures
import statsmodels.api as sm
from time import time
def linreg(vals):
global model
model = sm.OLS(y_vals,X_vals).fit()
return model
print(model.summary())
if __name__ == '__main__':
random.seed(42)
vals = pd.DataFrame(np.random.normal(loc = 3, scale = 100, size =(10000000,5)))
vals.columns = ['dep', 'ind1', 'ind2', 'ind3', 'ind4']
y_vals = vals['dep']
X_vals = vals[['ind1', 'ind2', 'ind3', 'ind4']]
bt = time()
model_vals = list(map(linreg, [1,2,3]))
mval = model_vals[0]
print(mval.summary())
serial_time = time() - bt
bt1 = time()
model_vals_1 = list(futures.map(linreg, [1,2,3]))
mval_1 = model_vals_1[0]
print(mval_1.summary())
parallel_time = time() - bt1
print(serial_time, parallel_time)`
然而,之后回归摘要确实是连续生成的 - 通过 Python 的标准映射函数 - 错误:
Traceback (most recent call last): File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py", line 193, in _run_module_as_main "main", mod_spec) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py", line 85, in _run_code exec(code, run_globals) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py", line 302, in b.main() File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py", line 92, in main self.run() File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py", line 290, in run futures_startup() File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py", line 271, in futures_startup run_name="main" File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py", line 64, in _startup result = _controller.switch(rootFuture, *args, **kargs) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop_control.py", line 253, in runController raise future.exceptionValue File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop_control.py", line 127, in runFuture future.resultValue = future.callable(*future.args, **future.kargs) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py", line 263, in run_path pkg_name=pkg_name, script_name=fname) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py", line 96, in _run_module_code mod_name, mod_spec, pkg_name, script_name) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py", line 85, in _run_code exec(code, run_globals) File "Scoop_map_linear_regression1.py", line 33, in model_vals_1 = list(futures.map(linreg, [1,2,3])) File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py", line 102, in _mapGenerator for future in _waitAll(*futures): File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py", line 358, in _waitAll for f in _waitAny(future): File "C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py", line 335, in _waitAny raise childFuture.exceptionValue NameError: name 'y_vals' is not defined
是之后产生的。这意味着代码停止在 model_vals_1 = list(futures.map(linreg, [1,2,3]))
请注意,为了能够 运行 并行执行代码,必须从指定 -m scoop 参数的命令行启动它,如下所示:
python -m scoop Scoop_map_linear_regression1.py
确实,如果没有 -m scoop 参数启动它,它不会被并行化,实际上 运行,但只使用内置 Python 的 map 函数的两倍(因此,运行连续两次),就像您在警告中被报告的方式一样。也就是说,在启动时不指定 -m scoop 参数,futures.map 将被 map 替换,而目标实际上是使用 futures.map.[=16 并行 运行 它=]
这样做是为了避免人们回答说他们通过简单地启动没有 -m scoop 参数的代码就解决了问题,就像这里已经发生的那样:
Python Parallel Computing - Scoop
因此,该问题被错误地搁置为题外话,因为无法重现。
非常感谢,非常感谢和欢迎任何评论。
解决方案是作为 futures.map(但不一定是 map)的第二个参数,仅传递 [1]
.
确实,即使linreg函数不使用传递给map的第二个参数,它仍然决定了linreg函数的次数运行。例如,考虑以下基本示例:
def welcome(x):
print('Hello world!')
if __name__ == '__main__':
a = list(map(welcome, [1,2]))
函数 welcome 实际上不需要任何参数,但输出仍然是
Hello world!
Hello world!
重复两次,即作为第二个参数传递的列表的长度。
在这种特定情况下,这意味着线性回归将 运行 映射 3 次,尽管回归输出将只出现一次,因为在地图外调用摘要。
重点是,运行 多次线性回归 futures.map 是不可能的。问题是,显然,在第一个 运行 之后,它实际上删除了使用过的数据集,从中不可能继续第二个和第三个 运行,以及随后的
NameError: name 'y_vals' is not defined
在跟踪结束时抛出。这应该通过导航可见:scoop.futures source code
没有遍历所有内容,但我想问题应该与 greenlet 切换器有关。