通过 RPyC 将 *args 和 **kwargs 正确传递给底层模块

Issue properly passing *args and **kwargs through RPyC to underlying modules

背景

我正在尝试通过任务函数实现 RPyC proof-of-concept example provided in the APScheduler github repository in order to deploy my application using gunicorn with more than one worker (an issue pointed out in APScheduler's FAQ section). Moreover, I'm trying to do so using Flask-APScheduler in order to be able to easily work within app_contexts

问题

在调用通过 RPyC 公开的调度程序服务时,我似乎无法正确提供参数。更具体地说,当传递参数 argskwargs(文字变量)以及存储在暴露的 *args**kwargs 中的变量时,它似乎成为一个问题RPyC 函数。

基本上,我通常在直接调用 scheduler.add_job() 时使用的参数在通过 RPyC 路由时不起作用,而是在尝试将 RPyC 公开方法接收的参数传递给底层调度程序实例。我该如何解决这个问题?

最小的,工作示例

运行 python app.py 在一个终端和 python scheduler.py 在另一个终端

# app.py
from flask import Flask, current_app, jsonify
from flask_apscheduler import APScheduler
import rpyc

scheduler = APScheduler()

def create_app():
    app = Flask(__name__)
    app.scheduler = rpyc.connect("localhost", 12345)
    app.scheduler = app.scheduler.root  # just so current_app.scheduler can be used like normal

    @app.route("/add_job/<report_id>", methods=["GET"])
    def add_job(report_id):
        """
        This works as expected when using 
        from app import scheduler
        scheduler.add_job(...)
        """
        current_app.scheduler.add_job(
            func="app.tasks:run_report",
            args=(report_id,),
            kwargs={"email_results": True},
            executor="threadpool",
            trigger="cron",
            day="*/1",
            id="reconcile_accounts"
        )
        return jsonify({"status": "scheduled"})

    return app

if __name__ == "__main__":
    app = create_app()
    app.run(debug=True)
# scheduler.py
from rpyc.utils.server import ThreadedServer
import rpyc

from app import create_app, scheduler

class SchedulerService(rpyc.Service):
    def __init__(self):
        self._app = None
        self._scheduler = None

    def on_connect(self, conn):
        # code that runs when a connection is created
        # (to init the service, if needed)
        self._app = create_app()
        self._scheduler = scheduler

    def exposed_add_job(self, func, *args, **kwargs):
        # Problem occurs below when sending *args and **kwargs to Flask-APScheduler, which sends them to APScheduler
        job_id = kwargs.pop("id", None)
        return self._scheduler.add_job(job_id, func, *args, **kwargs)

if __name__ == "__main__":
    server = ThreadedServer(SchedulerService, port=12345, protocol_config={"allow_public_attrs": True})
    try:
        server.start()
    except (KeyboardInterrupt, SystemExit):
        pass
    finally:
        scheduler.shutdown()

Tracebook 来自 self._scheduler.add_job(job_id, func, *args, **kwargs)

127.0.0.1 - - [08/Jul/2021 10:29:43] "GET /reports/2/run HTTP/1.1" 500 -
Traceback (most recent call last):
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2088, in __call__
    return self.wsgi_app(environ, start_response)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2073, in wsgi_app
    response = self.handle_exception(e)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2070, in wsgi_app
    response = self.full_dispatch_request()
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1515, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1513, in full_dispatch_request
    rv = self.dispatch_request()
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1499, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
  File "C:\Users\mhill\PycharmProjects\reporting\app\views.py", line 189, in run_report
    kwargs={"config": json.dumps(report.serialize())}
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\netref.py", line 240, in __call__
    return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\netref.py", line 63, in syncreq
    return conn.sync_request(handler, proxy, *args)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 473, in sync_request
    return self.async_request(handler, *args, timeout=timeout).value
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\async_.py", line 102, in value
    raise self._obj
_get_exception_class.<locals>.Derived: dictionary update sequence element #0 has length 6; 2 is required

========= Remote Traceback (1) =========
Traceback (most recent call last):
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 324, in _dispatch_request
    res = self._HANDLERS[handler](self, *args)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 592, in _handle_call
    return obj(*args, **dict(kwargs))
  File "C:/Users/mhill/PycharmProjects/reporting/scheduler.py", line 20, in exposed_add_job
    return self._scheduler.add_job(func, *args, **kwargs)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask_apscheduler\scheduler.py", line 168, in add_job
    return self._scheduler.add_job(**job_def)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\apscheduler\schedulers\base.py", line 429, in add_job
    'kwargs': dict(kwargs) if kwargs is not None else {},
ValueError: dictionary update sequence element #0 has length 6; 2 is required

根据this rpyc issue on github,映射字典的问题可以通过在服务器端和客户端启用allow_public_attrs来解决。由于默认情况下,rpyc 不会公开 dict 方法来支持迭代,**kwargs 基本上无法工作,因为 kwargs 没有可访问的 dict 方法。

在您的情况下,您只需像这样更改客户端实例:

app.scheduler = rpyc.connect("localhost", 12345, config={ 'allow_public_attrs': True })