通过 RPyC 将 *args 和 **kwargs 正确传递给底层模块
Issue properly passing *args and **kwargs through RPyC to underlying modules
背景
我正在尝试通过任务函数实现 RPyC proof-of-concept example provided in the APScheduler github repository in order to deploy my application using gunicorn with more than one worker (an issue pointed out in APScheduler's FAQ section). Moreover, I'm trying to do so using Flask-APScheduler in order to be able to easily work within app_contexts
。
问题
在调用通过 RPyC 公开的调度程序服务时,我似乎无法正确提供参数。更具体地说,当传递参数 args
和 kwargs
(文字变量)以及存储在暴露的 *args
和 **kwargs
中的变量时,它似乎成为一个问题RPyC 函数。
基本上,我通常在直接调用 scheduler.add_job()
时使用的参数在通过 RPyC 路由时不起作用,而是在尝试将 RPyC 公开方法接收的参数传递给底层调度程序实例。我该如何解决这个问题?
最小的,工作示例
运行 python app.py
在一个终端和 python scheduler.py
在另一个终端
# app.py
from flask import Flask, current_app, jsonify
from flask_apscheduler import APScheduler
import rpyc
scheduler = APScheduler()
def create_app():
app = Flask(__name__)
app.scheduler = rpyc.connect("localhost", 12345)
app.scheduler = app.scheduler.root # just so current_app.scheduler can be used like normal
@app.route("/add_job/<report_id>", methods=["GET"])
def add_job(report_id):
"""
This works as expected when using
from app import scheduler
scheduler.add_job(...)
"""
current_app.scheduler.add_job(
func="app.tasks:run_report",
args=(report_id,),
kwargs={"email_results": True},
executor="threadpool",
trigger="cron",
day="*/1",
id="reconcile_accounts"
)
return jsonify({"status": "scheduled"})
return app
if __name__ == "__main__":
app = create_app()
app.run(debug=True)
# scheduler.py
from rpyc.utils.server import ThreadedServer
import rpyc
from app import create_app, scheduler
class SchedulerService(rpyc.Service):
def __init__(self):
self._app = None
self._scheduler = None
def on_connect(self, conn):
# code that runs when a connection is created
# (to init the service, if needed)
self._app = create_app()
self._scheduler = scheduler
def exposed_add_job(self, func, *args, **kwargs):
# Problem occurs below when sending *args and **kwargs to Flask-APScheduler, which sends them to APScheduler
job_id = kwargs.pop("id", None)
return self._scheduler.add_job(job_id, func, *args, **kwargs)
if __name__ == "__main__":
server = ThreadedServer(SchedulerService, port=12345, protocol_config={"allow_public_attrs": True})
try:
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
scheduler.shutdown()
Tracebook 来自 self._scheduler.add_job(job_id, func, *args, **kwargs)
127.0.0.1 - - [08/Jul/2021 10:29:43] "GET /reports/2/run HTTP/1.1" 500 -
Traceback (most recent call last):
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2088, in __call__
return self.wsgi_app(environ, start_response)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2073, in wsgi_app
response = self.handle_exception(e)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2070, in wsgi_app
response = self.full_dispatch_request()
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1515, in full_dispatch_request
rv = self.handle_user_exception(e)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1513, in full_dispatch_request
rv = self.dispatch_request()
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1499, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
File "C:\Users\mhill\PycharmProjects\reporting\app\views.py", line 189, in run_report
kwargs={"config": json.dumps(report.serialize())}
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\netref.py", line 240, in __call__
return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\netref.py", line 63, in syncreq
return conn.sync_request(handler, proxy, *args)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 473, in sync_request
return self.async_request(handler, *args, timeout=timeout).value
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\async_.py", line 102, in value
raise self._obj
_get_exception_class.<locals>.Derived: dictionary update sequence element #0 has length 6; 2 is required
========= Remote Traceback (1) =========
Traceback (most recent call last):
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 324, in _dispatch_request
res = self._HANDLERS[handler](self, *args)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 592, in _handle_call
return obj(*args, **dict(kwargs))
File "C:/Users/mhill/PycharmProjects/reporting/scheduler.py", line 20, in exposed_add_job
return self._scheduler.add_job(func, *args, **kwargs)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask_apscheduler\scheduler.py", line 168, in add_job
return self._scheduler.add_job(**job_def)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\apscheduler\schedulers\base.py", line 429, in add_job
'kwargs': dict(kwargs) if kwargs is not None else {},
ValueError: dictionary update sequence element #0 has length 6; 2 is required
根据this rpyc issue on github,映射字典的问题可以通过在服务器端和客户端启用allow_public_attrs
来解决。由于默认情况下,rpyc 不会公开 dict 方法来支持迭代,**kwargs
基本上无法工作,因为 kwargs
没有可访问的 dict 方法。
在您的情况下,您只需像这样更改客户端实例:
app.scheduler = rpyc.connect("localhost", 12345, config={ 'allow_public_attrs': True })
背景
我正在尝试通过任务函数实现 RPyC proof-of-concept example provided in the APScheduler github repository in order to deploy my application using gunicorn with more than one worker (an issue pointed out in APScheduler's FAQ section). Moreover, I'm trying to do so using Flask-APScheduler in order to be able to easily work within app_contexts
。
问题
在调用通过 RPyC 公开的调度程序服务时,我似乎无法正确提供参数。更具体地说,当传递参数 args
和 kwargs
(文字变量)以及存储在暴露的 *args
和 **kwargs
中的变量时,它似乎成为一个问题RPyC 函数。
基本上,我通常在直接调用 scheduler.add_job()
时使用的参数在通过 RPyC 路由时不起作用,而是在尝试将 RPyC 公开方法接收的参数传递给底层调度程序实例。我该如何解决这个问题?
最小的,工作示例
运行 python app.py
在一个终端和 python scheduler.py
在另一个终端
# app.py
from flask import Flask, current_app, jsonify
from flask_apscheduler import APScheduler
import rpyc
scheduler = APScheduler()
def create_app():
app = Flask(__name__)
app.scheduler = rpyc.connect("localhost", 12345)
app.scheduler = app.scheduler.root # just so current_app.scheduler can be used like normal
@app.route("/add_job/<report_id>", methods=["GET"])
def add_job(report_id):
"""
This works as expected when using
from app import scheduler
scheduler.add_job(...)
"""
current_app.scheduler.add_job(
func="app.tasks:run_report",
args=(report_id,),
kwargs={"email_results": True},
executor="threadpool",
trigger="cron",
day="*/1",
id="reconcile_accounts"
)
return jsonify({"status": "scheduled"})
return app
if __name__ == "__main__":
app = create_app()
app.run(debug=True)
# scheduler.py
from rpyc.utils.server import ThreadedServer
import rpyc
from app import create_app, scheduler
class SchedulerService(rpyc.Service):
def __init__(self):
self._app = None
self._scheduler = None
def on_connect(self, conn):
# code that runs when a connection is created
# (to init the service, if needed)
self._app = create_app()
self._scheduler = scheduler
def exposed_add_job(self, func, *args, **kwargs):
# Problem occurs below when sending *args and **kwargs to Flask-APScheduler, which sends them to APScheduler
job_id = kwargs.pop("id", None)
return self._scheduler.add_job(job_id, func, *args, **kwargs)
if __name__ == "__main__":
server = ThreadedServer(SchedulerService, port=12345, protocol_config={"allow_public_attrs": True})
try:
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
scheduler.shutdown()
Tracebook 来自 self._scheduler.add_job(job_id, func, *args, **kwargs)
127.0.0.1 - - [08/Jul/2021 10:29:43] "GET /reports/2/run HTTP/1.1" 500 -
Traceback (most recent call last):
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2088, in __call__
return self.wsgi_app(environ, start_response)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2073, in wsgi_app
response = self.handle_exception(e)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2070, in wsgi_app
response = self.full_dispatch_request()
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1515, in full_dispatch_request
rv = self.handle_user_exception(e)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1513, in full_dispatch_request
rv = self.dispatch_request()
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1499, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
File "C:\Users\mhill\PycharmProjects\reporting\app\views.py", line 189, in run_report
kwargs={"config": json.dumps(report.serialize())}
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\netref.py", line 240, in __call__
return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\netref.py", line 63, in syncreq
return conn.sync_request(handler, proxy, *args)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 473, in sync_request
return self.async_request(handler, *args, timeout=timeout).value
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\async_.py", line 102, in value
raise self._obj
_get_exception_class.<locals>.Derived: dictionary update sequence element #0 has length 6; 2 is required
========= Remote Traceback (1) =========
Traceback (most recent call last):
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 324, in _dispatch_request
res = self._HANDLERS[handler](self, *args)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 592, in _handle_call
return obj(*args, **dict(kwargs))
File "C:/Users/mhill/PycharmProjects/reporting/scheduler.py", line 20, in exposed_add_job
return self._scheduler.add_job(func, *args, **kwargs)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask_apscheduler\scheduler.py", line 168, in add_job
return self._scheduler.add_job(**job_def)
File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\apscheduler\schedulers\base.py", line 429, in add_job
'kwargs': dict(kwargs) if kwargs is not None else {},
ValueError: dictionary update sequence element #0 has length 6; 2 is required
根据this rpyc issue on github,映射字典的问题可以通过在服务器端和客户端启用allow_public_attrs
来解决。由于默认情况下,rpyc 不会公开 dict 方法来支持迭代,**kwargs
基本上无法工作,因为 kwargs
没有可访问的 dict 方法。
在您的情况下,您只需像这样更改客户端实例:
app.scheduler = rpyc.connect("localhost", 12345, config={ 'allow_public_attrs': True })