尝试使用多处理将数据从 APIClient 发送到函数时出错
Error when trying to send data from APIClient to a function using multiprocessing
轻量级 API: https://github.com/betcode-org/betfair
要使用此模块,需要传递 API 客户端数据并登录:
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
为了加快数据收集过程,我使用了多处理:
from multiprocessing import Pool
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
def main():
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, matches_bf.iterrows())
finally:
pool.close()
pool.join()
trading.logout()
def data_event(event_bf):
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
这样 12 次登录。对于访问 API,这不是理想的方式。
为什么要登录 12 次?
当我激活代码时,它会进行 1 次登录,而当创建多处理池时,它会再生成 11 次登录,每个进程一次。如果我将 print(trading)
放在 trading.login()
的正下方,当代码开始 运行 时,终端中会出现一个打印语句,然后在创建池时同时发生另一个 11。
所以我需要找到一种方法来仅使用 ONE login.
来完成同样的服务
我尝试将 login 放入 main()
并添加为调用函数的参数:
from multiprocessing import Pool
from itertools import repeat
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, zip(repeat(trading),matches_bf.iterrows()))
finally:
pool.close()
pool.join()
trading.logout()
def data_event(trading,event_bf):
trading = trading
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
但是遇到的错误是:
TypeError: cannot pickle 'module' object
我试图将 trading
放入函数 data_event
:
from multiprocessing import Pool
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, matches_bf.iterrows())
finally:
pool.close()
pool.join()
trading.logout()
def data_event(event_bf):
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
但是遇到的错误是:
errorCode': 'INVALID_SESSION_INFORMATION'
原因是合乎逻辑的:multiprocessing没有登录。
我应该如何进行才能只使用一个登录并可以完成我需要的一切,而不必被迫一项一项地工作(没有多处理的情况下逐行进行太长时间,不可行)?
附加信息:
- betfairlightweight login如果有助于理解案例:
每个线程都在执行 login
操作,因此将登录逻辑放在通过 if __name__ == ...
调用的函数中应该可以通过屏蔽线程来解决问题。
from multiprocessing import Pool
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
trading.keep_alive()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
m = multiprocessing.Manager()
queue = m.Queue()
queue.put(trading)
list_pool = pool.starmap(data_event, [(queue, row) for row in matches_bf.iterrows()])
finally:
pool.close()
pool.join()
trading.logout()
def data_event(queue, event_bf):
trading = queue.get()
_, event_bf = event_bf
... # some more code
queue.put(trading)
if __name__ == "main":
main()
编辑
这里的主要问题是,trading
对象(基本上是交易库返回的 API 客户端)无法序列化,因此 multiprocessing
无法对其进行 pickle并将其发送给流程。在我看来,您的问题没有“直接”解决方案;但是,您可以尝试以下任一解决方法:
- 尝试使用 pathos.multiprocessing 而不是多处理,它使用
dill
而不是 pickle
。
- 您可以使用
multiprocessing.pool.ThreadPool
而不是 multiprocessing.pool.Pool
。由于它与主线程共享内存,因此每个 sub-process 都不需要创建新的交易对象。不过,与 Pool
相比, 可能 会带来性能损失。
通过 session
使 betfairlightweight.APIClient
实例可腌制。
trading = betfairlightweight.APIClient(
username,
pw,
app_key=app_key,
cert_files=('blablabla.crt','blablabla.key'),
session=requests.Session(), # Add this
)
说明
TypeError: cannot pickle 'module' object
APIClient
(BaseClient
) 默认 self.session
到 requests
模块。
class APIClient(BaseClient):
...
class BaseClient:
...
def __init__(
self,
username: str,
password: str = None,
app_key: str = None,
certs: str = None,
locale: str = None,
cert_files: Union[Tuple[str], str, None] = None,
lightweight: bool = False,
session: requests.Session = None,
):
...
self.session = session if session else requests
...
轻量级 API: https://github.com/betcode-org/betfair
要使用此模块,需要传递 API 客户端数据并登录:
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
为了加快数据收集过程,我使用了多处理:
from multiprocessing import Pool
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
def main():
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, matches_bf.iterrows())
finally:
pool.close()
pool.join()
trading.logout()
def data_event(event_bf):
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
这样 12 次登录。对于访问 API,这不是理想的方式。
为什么要登录 12 次?
当我激活代码时,它会进行 1 次登录,而当创建多处理池时,它会再生成 11 次登录,每个进程一次。如果我将 print(trading)
放在 trading.login()
的正下方,当代码开始 运行 时,终端中会出现一个打印语句,然后在创建池时同时发生另一个 11。
所以我需要找到一种方法来仅使用 ONE login.
来完成同样的服务我尝试将 login 放入 main()
并添加为调用函数的参数:
from multiprocessing import Pool
from itertools import repeat
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, zip(repeat(trading),matches_bf.iterrows()))
finally:
pool.close()
pool.join()
trading.logout()
def data_event(trading,event_bf):
trading = trading
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
但是遇到的错误是:
TypeError: cannot pickle 'module' object
我试图将 trading
放入函数 data_event
:
from multiprocessing import Pool
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, matches_bf.iterrows())
finally:
pool.close()
pool.join()
trading.logout()
def data_event(event_bf):
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
但是遇到的错误是:
errorCode': 'INVALID_SESSION_INFORMATION'
原因是合乎逻辑的:multiprocessing没有登录。
我应该如何进行才能只使用一个登录并可以完成我需要的一切,而不必被迫一项一项地工作(没有多处理的情况下逐行进行太长时间,不可行)?
附加信息:
- betfairlightweight login如果有助于理解案例:
每个线程都在执行 login
操作,因此将登录逻辑放在通过 if __name__ == ...
调用的函数中应该可以通过屏蔽线程来解决问题。
from multiprocessing import Pool
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
trading.keep_alive()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
m = multiprocessing.Manager()
queue = m.Queue()
queue.put(trading)
list_pool = pool.starmap(data_event, [(queue, row) for row in matches_bf.iterrows()])
finally:
pool.close()
pool.join()
trading.logout()
def data_event(queue, event_bf):
trading = queue.get()
_, event_bf = event_bf
... # some more code
queue.put(trading)
if __name__ == "main":
main()
编辑
这里的主要问题是,trading
对象(基本上是交易库返回的 API 客户端)无法序列化,因此 multiprocessing
无法对其进行 pickle并将其发送给流程。在我看来,您的问题没有“直接”解决方案;但是,您可以尝试以下任一解决方法:
- 尝试使用 pathos.multiprocessing 而不是多处理,它使用
dill
而不是pickle
。 - 您可以使用
multiprocessing.pool.ThreadPool
而不是multiprocessing.pool.Pool
。由于它与主线程共享内存,因此每个 sub-process 都不需要创建新的交易对象。不过,与Pool
相比, 可能 会带来性能损失。
通过 session
使 betfairlightweight.APIClient
实例可腌制。
trading = betfairlightweight.APIClient(
username,
pw,
app_key=app_key,
cert_files=('blablabla.crt','blablabla.key'),
session=requests.Session(), # Add this
)
说明
TypeError: cannot pickle 'module' object
APIClient
(BaseClient
) 默认 self.session
到 requests
模块。
class APIClient(BaseClient):
...
class BaseClient:
...
def __init__(
self,
username: str,
password: str = None,
app_key: str = None,
certs: str = None,
locale: str = None,
cert_files: Union[Tuple[str], str, None] = None,
lightweight: bool = False,
session: requests.Session = None,
):
...
self.session = session if session else requests
...