尝试使用多处理将数据从 APIClient 发送到函数时出错

Error when trying to send data from APIClient to a function using multiprocessing

轻量级 API: https://github.com/betcode-org/betfair

要使用此模块,需要传递 API 客户端数据并登录:

trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()

为了加快数据收集过程,我使用了多处理:

from multiprocessing import Pool

trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()

def main():
    matches_bf = # DataFrame...
    try:
        max_process = multiprocessing.cpu_count()-1 or 1
        pool = multiprocessing.Pool(max_process)
        list_pool = pool.map(data_event, matches_bf.iterrows())
    finally:
        pool.close()
        pool.join()

    trading.logout()

def data_event(event_bf):
    _, event_bf = event_bf
    event_id = event_bf['event_id']
    filter_catalog_markets = betfairlightweight.filters.market_filter(
        event_ids=[event_id],
        market_type_codes = [
            'MATCH_ODDS'
            ]
        )

    catalog_markets = trading.betting.list_market_catalogue(
        filter=filter_catalog_markets,
        max_results='100',
        sort='FIRST_TO_START',
        market_projection=['RUNNER_METADATA']
    )

     ... # some more code
     ... # some more code
     ... # some more code

这样 12 次登录。对于访问 API,这不是理想的方式。

为什么要登录 12 次?

当我激活代码时,它会进行 1 次登录,而当创建多处理池时,它会再生成 11 次登录,每个进程一次。如果我将 print(trading) 放在 trading.login() 的正下方,当代码开始 运行 时,终端中会出现一个打印语句,然后在创建池时同时发生另一个 11。

所以我需要找到一种方法来仅使用 ONE login.

来完成同样的服务

我尝试将 login 放入 main() 并添加为调用函数的参数:

from multiprocessing import Pool
from itertools import repeat

def main():
    trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
    trading.login()

    matches_bf = # DataFrame...
    try:
        max_process = multiprocessing.cpu_count()-1 or 1
        pool = multiprocessing.Pool(max_process)
        list_pool = pool.map(data_event, zip(repeat(trading),matches_bf.iterrows()))
    finally:
        pool.close()
        pool.join()

    trading.logout()

def data_event(trading,event_bf):
    trading = trading
    _, event_bf = event_bf
    event_id = event_bf['event_id']
    filter_catalog_markets = betfairlightweight.filters.market_filter(
        event_ids=[event_id],
        market_type_codes = [
            'MATCH_ODDS'
            ]
        )

    catalog_markets = trading.betting.list_market_catalogue(
        filter=filter_catalog_markets,
        max_results='100',
        sort='FIRST_TO_START',
        market_projection=['RUNNER_METADATA']
    )

     ... # some more code
     ... # some more code
     ... # some more code

但是遇到的错误是:

TypeError: cannot pickle 'module' object

我试图将 trading 放入函数 data_event:

from multiprocessing import Pool

def main():
    trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
    trading.login()

    matches_bf = # DataFrame...
    try:
        max_process = multiprocessing.cpu_count()-1 or 1
        pool = multiprocessing.Pool(max_process)
        list_pool = pool.map(data_event, matches_bf.iterrows())
    finally:
        pool.close()
        pool.join()

    trading.logout()

def data_event(event_bf):
    trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
    _, event_bf = event_bf
    event_id = event_bf['event_id']
    filter_catalog_markets = betfairlightweight.filters.market_filter(
        event_ids=[event_id],
        market_type_codes = [
            'MATCH_ODDS'
            ]
        )

    catalog_markets = trading.betting.list_market_catalogue(
        filter=filter_catalog_markets,
        max_results='100',
        sort='FIRST_TO_START',
        market_projection=['RUNNER_METADATA']
    )

     ... # some more code
     ... # some more code
     ... # some more code

但是遇到的错误是:

errorCode': 'INVALID_SESSION_INFORMATION'

原因是合乎逻辑的:multiprocessing没有登录。

我应该如何进行才能只使用一个登录并可以完成我需要的一切,而不必被迫一项一项地工作(没有多处理的情况下逐行进行太长时间,不可行)?

附加信息:

每个线程都在执行 login 操作,因此将登录逻辑放在通过 if __name__ == ... 调用的函数中应该可以通过屏蔽线程来解决问题。

from multiprocessing import Pool

def main():
    trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
    trading.login()
    trading.keep_alive()

    matches_bf = # DataFrame...
    try:
        max_process = multiprocessing.cpu_count()-1 or 1
        pool = multiprocessing.Pool(max_process)
        m = multiprocessing.Manager()
        queue = m.Queue()
        queue.put(trading)
        list_pool = pool.starmap(data_event, [(queue, row) for row in matches_bf.iterrows()])
    finally:
        pool.close()
        pool.join()

    trading.logout()

def data_event(queue, event_bf):
    trading = queue.get()
    _, event_bf = event_bf
     ... # some more code
    queue.put(trading)

if __name__ == "main":
    main()

编辑

这里的主要问题是,trading 对象(基本上是交易库返回的 API 客户端)无法序列化,因此 multiprocessing 无法对其进行 pickle并将其发送给流程。在我看来,您的问题没有“直接”解决方案;但是,您可以尝试以下任一解决方法:

  1. 尝试使用 pathos.multiprocessing 而不是多处理,它使用 dill 而不是 pickle
  2. 您可以使用 multiprocessing.pool.ThreadPool 而不是 multiprocessing.pool.Pool。由于它与主线程共享内存,因此每个 sub-process 都不需要创建新的交易对象。不过,与 Pool 相比, 可能 会带来性能损失。

通过 session 使 betfairlightweight.APIClient 实例可腌制。

trading = betfairlightweight.APIClient(
    username,
    pw,
    app_key=app_key,
    cert_files=('blablabla.crt','blablabla.key'),
    session=requests.Session(),  # Add this
)

说明

TypeError: cannot pickle 'module' object

APIClient (BaseClient) 默认 self.sessionrequests 模块。

class APIClient(BaseClient):
    ...
class BaseClient:
    ...

    def __init__(
        self,
        username: str,
        password: str = None,
        app_key: str = None,
        certs: str = None,
        locale: str = None,
        cert_files: Union[Tuple[str], str, None] = None,
        lightweight: bool = False,
        session: requests.Session = None,
    ):
        ...

        self.session = session if session else requests
        ...