IPython-IPyparallel导入错误
IPython-IPyparallel import error
我已将所有代码复制到我所有引擎机器上的工作目录中。我的代码是:
my_test.py
my_startegy.py
main.py
因此,main.py
将在 Client Machine
上变为 运行,main.py
中的代码为:
from ipyparallel import Client
import my_test
import my_strategy as strategy
class Beck_Test_Parallel(object):
"""
"""
def __init__(self):
self.rc = None
self.dview = None
def start_client(self, path):
self.rc = Client(path)
self.dview = self.rc[:]
#self.dview.push(dict(
# Account=my_test.Account,
# dataImport=my_test.dataImport
# ))
def parallel_map(self, deal_function, accounts):
import my_test
return self.dview.map_sync(deal_function, accounts)
def create_accounts(time_list, account):
accounts = []
for index, time in enumerate(time_list):
acc = my_test.Account(
strategy.start,
strategy.end,
strategy.freq,
strategy.universe_code,
strategy.capital_base,
strategy.short_capital,
strategy.benchmark,
strategy.self_defined
)
account.share_data(acc)
acc.iniData2()
acc.iniData3()
acc.current_time = time
acc.days_counts = index+1
acc.dynamic_record['capital'] = acc.capital_base
del acc.connect
accounts.append(acc)
return accounts
def let_us_deal(account):
account = strategy.handle_data(account)
print ' >>>', account.current_time
return account
if __name__ == '__main__':
account = my_test.Account(
strategy.start,
strategy.end,
strategy.freq,
strategy.universe_code,
strategy.capital_base,
strategy.short_capital,
strategy.benchmark,
strategy.self_defined
)
account.iniData()
account.iniData2()
account.iniData3()
time_list = my_test.get_deal_time_list(account)
accounts = parallel.create_accounts(time_list, account)
back_test_parallel = parallel.Beck_Test_Parallel()
back_test_parallel.start_client(
'/home/fit/.ipython/profile_default/security/ipcontroller-client.json')
back_test_parallel.dview.execute('import my_test')
back_test_parallel.dview.execute('import my_strategy as strategy')
# get the result
result = back_test_parallel.parallel_map(let_us_deal, accounts)
for acc in result.get():
print acc.reselected_stocks, acc.current_time
并且我在 Class Back_Test_Parallel
中的 parallel_map()
函数中导入了 my_test
模块,并且我还在 back_test_parallel.dview.execute('import my_test')
中导入了 my_test
模块.
并且对应的模块在引擎机器的工作目录下。我已将 ipcontroller-client.json
和 ipcontroller-engine.json
复制到 engine machine
上的工作目录。
但是当它运行时,错误是ImportError: No module named my_test
,因为模块my_test.py
已经在工作目录中了。这真的让我感到沮丧!
---------------------------------------------------------------------------
CompositeError Traceback (most recent call last)
/home/fit/log/1027/back_test/main.py in <module>()
119 import ipdb
120 ipdb.set_trace()
--> 121 for acc in result.get():
122 print acc.reselected_stocks, acc.current_time
123
/usr/local/lib/python2.7/dist-packages/ipyparallel/client/asyncresult.pyc in get(self, timeout)
102 return self._result
103 else:
--> 104 raise self._exception
105 else:
106 raise error.TimeoutError("Result not ready.")
CompositeError: one or more exceptions from call to method: let_us_deal
[0:apply]: ImportError: No module named my_test
[1:apply]: ImportError: No module named my_test
关于 result
的事情:
In [2]: result
Out[2]: <AsyncMapResult: finished>
In [3]: type(result)
Out[3]: ipyparallel.client.asyncresult.AsyncMapResult
请注意,使用ipcluster start -n 8
在单机上运行时,它工作正常,没有任何错误。
提前致谢
我认为我的 CWD 不在正确的目录中。所以你可以检查你的 CWD
>>> import os
>>> print(dview.apply_sync(os.getcwd).get())
如果在错误的目录下,在并行计算之前,你可以设置正确的CWD来确保你的ipyparallel env在正确的工作目录下:
>>> import os
>>> dview.map(os.chdir, ['/path/to/my/project/on/engine']*number_of_engines)
>>> print(dview.apply_sync(os.getcwd).get())
您还可以通过
查看引擎名称
>>> import socket
>>> print(dview.apply_sync(socket.gethostname))
而且效果很好!
pip install ipyparallel --upgrade
我已将所有代码复制到我所有引擎机器上的工作目录中。我的代码是:
my_test.py
my_startegy.py
main.py
因此,main.py
将在 Client Machine
上变为 运行,main.py
中的代码为:
from ipyparallel import Client
import my_test
import my_strategy as strategy
class Beck_Test_Parallel(object):
"""
"""
def __init__(self):
self.rc = None
self.dview = None
def start_client(self, path):
self.rc = Client(path)
self.dview = self.rc[:]
#self.dview.push(dict(
# Account=my_test.Account,
# dataImport=my_test.dataImport
# ))
def parallel_map(self, deal_function, accounts):
import my_test
return self.dview.map_sync(deal_function, accounts)
def create_accounts(time_list, account):
accounts = []
for index, time in enumerate(time_list):
acc = my_test.Account(
strategy.start,
strategy.end,
strategy.freq,
strategy.universe_code,
strategy.capital_base,
strategy.short_capital,
strategy.benchmark,
strategy.self_defined
)
account.share_data(acc)
acc.iniData2()
acc.iniData3()
acc.current_time = time
acc.days_counts = index+1
acc.dynamic_record['capital'] = acc.capital_base
del acc.connect
accounts.append(acc)
return accounts
def let_us_deal(account):
account = strategy.handle_data(account)
print ' >>>', account.current_time
return account
if __name__ == '__main__':
account = my_test.Account(
strategy.start,
strategy.end,
strategy.freq,
strategy.universe_code,
strategy.capital_base,
strategy.short_capital,
strategy.benchmark,
strategy.self_defined
)
account.iniData()
account.iniData2()
account.iniData3()
time_list = my_test.get_deal_time_list(account)
accounts = parallel.create_accounts(time_list, account)
back_test_parallel = parallel.Beck_Test_Parallel()
back_test_parallel.start_client(
'/home/fit/.ipython/profile_default/security/ipcontroller-client.json')
back_test_parallel.dview.execute('import my_test')
back_test_parallel.dview.execute('import my_strategy as strategy')
# get the result
result = back_test_parallel.parallel_map(let_us_deal, accounts)
for acc in result.get():
print acc.reselected_stocks, acc.current_time
并且我在 Class Back_Test_Parallel
中的 parallel_map()
函数中导入了 my_test
模块,并且我还在 back_test_parallel.dview.execute('import my_test')
中导入了 my_test
模块.
并且对应的模块在引擎机器的工作目录下。我已将 ipcontroller-client.json
和 ipcontroller-engine.json
复制到 engine machine
上的工作目录。
但是当它运行时,错误是ImportError: No module named my_test
,因为模块my_test.py
已经在工作目录中了。这真的让我感到沮丧!
---------------------------------------------------------------------------
CompositeError Traceback (most recent call last)
/home/fit/log/1027/back_test/main.py in <module>()
119 import ipdb
120 ipdb.set_trace()
--> 121 for acc in result.get():
122 print acc.reselected_stocks, acc.current_time
123
/usr/local/lib/python2.7/dist-packages/ipyparallel/client/asyncresult.pyc in get(self, timeout)
102 return self._result
103 else:
--> 104 raise self._exception
105 else:
106 raise error.TimeoutError("Result not ready.")
CompositeError: one or more exceptions from call to method: let_us_deal
[0:apply]: ImportError: No module named my_test
[1:apply]: ImportError: No module named my_test
关于 result
的事情:
In [2]: result
Out[2]: <AsyncMapResult: finished>
In [3]: type(result)
Out[3]: ipyparallel.client.asyncresult.AsyncMapResult
请注意,使用ipcluster start -n 8
在单机上运行时,它工作正常,没有任何错误。
提前致谢
我认为我的 CWD 不在正确的目录中。所以你可以检查你的 CWD
>>> import os
>>> print(dview.apply_sync(os.getcwd).get())
如果在错误的目录下,在并行计算之前,你可以设置正确的CWD来确保你的ipyparallel env在正确的工作目录下:
>>> import os
>>> dview.map(os.chdir, ['/path/to/my/project/on/engine']*number_of_engines)
>>> print(dview.apply_sync(os.getcwd).get())
您还可以通过
查看引擎名称>>> import socket
>>> print(dview.apply_sync(socket.gethostname))
而且效果很好!
pip install ipyparallel --upgrade