Python 订阅反应源的网络服务在对象中产生奇怪的行为
Python web service subscribed to reactive source produces strange behavior in object
我已经使用 Falcon 实现了 Web 服务。此服务存储一个 状态机 (pytransitions),该状态机将传递给构造函数中的服务资源。该服务使用 gunicorn.
运行
Web 服务在开始使用 RxPy 时启动一个进程。 on_next(event)
中返回的事件用于触发状态机中的转换。
错误
我希望状态机在服务和资源中都具有一致的状态,但似乎在资源中状态永远不会改变。
我们有一个尝试重现此行为的测试,但令人惊讶的是该测试有效
class TochoLevel(object):
def __init__(self, tochine):
self.tochine = tochine
def on_get(self, req, res):
res.status = falcon.HTTP_200
res.body = self.tochine.state
def get_machine():
states = ["low", "medium", "high"]
transitions = [
{'trigger': 'to_medium', 'source': ['low', 'medium', 'high'], 'dest': 'medium'},
{'trigger': 'to_high', 'source': ['low', 'medium', 'high'], 'dest': 'high'},
{'trigger': 'to_low', 'source': ['low', 'medium', 'high'], 'dest': 'low'}
]
locked_factory = MachineFactory.get_predefined(locked=True)
return locked_factory(
states=states,
transitions=transitions,
initial='low',
auto_transitions=False,
queued=False
)
def _level_observable(observer):
for i in range(1, 21):
sleep(0.1)
next_val = 'to_low'
if 8 <= i <= 15:
next_val = 'to_medium'
elif i > 15:
next_val = 'to_high'
observer.on_next(next_val)
observer.on_completed()
def get_level_observable():
return Observable.create(_level_observable)
class NotBlockingService(falcon.API):
def __init__(self):
super(NotBlockingService, self).__init__()
self.tochine = get_machine()
self.add_route('/tochez', TochoLevel(self.tochine))
def _run_machine(self, val):
self.tochine.trigger(val)
print('machine exec: {}, state: {}'.format(val, self.tochine.state))
return self.tochine.state
def start(self):
source = get_level_observable()
(source.subscribe_on(ThreadPoolScheduler(2))
.subscribe(self._run_machine))
def test_can_query_falcon_service_while_being_susbcribed_as_observer():
svc = NotBlockingService()
client = testing.TestClient(svc)
assert client.simulate_get('/tochez').text == 'low'
start = time()
svc.start()
sleep(1.2)
assert client.simulate_get('/tochez').text == 'medium'
end = time()
sleep(1.2)
assert client.simulate_get('/tochez').text == 'high'
assert (end - start) < 2
问题
当我使用 gunicorn 启动服务并传播 on_next
中的状态时,为什么状态机不更改资源 TochoLevel
中的状态rxpy?
的方法
当然,当你在开发模式下执行你的服务时,你只使用了一个fork(一个执行进程)。当您使用像 Gunicorn 这样的软件时,您正在使用预分叉策略在生产环境中提供可靠的服务。
Preforking 策略生成许多子进程来解析请求并且逻辑是独立的,在不同请求之间以独立模式工作每个 fork。
Gunicorn,多亏了 Python 中 WSGI 的标准化 App 方案(Python2_PEP-333 & Python3_PEP-3333), receives an APP object. Gunicorn launches as many instances (preforks) as indicated in its configuration. Gunicorn calls such forks workers and by default it uses 1 worker。每个 worker 将使用其状态工作,也许 Gunicorn 还会为每个请求创建新的 App 对象实例...
这就是你的状态机没有持久化的原因。
提示:首先尝试用 1 个 worker 启动 Gunicorn 并检查状态机的状态持久性。如果你实现了状态机的持久化,那么要解决的第二个问题就是所有 worker 的状态机同步。
我已经使用 Falcon 实现了 Web 服务。此服务存储一个 状态机 (pytransitions),该状态机将传递给构造函数中的服务资源。该服务使用 gunicorn.
运行Web 服务在开始使用 RxPy 时启动一个进程。 on_next(event)
中返回的事件用于触发状态机中的转换。
错误
我希望状态机在服务和资源中都具有一致的状态,但似乎在资源中状态永远不会改变。
我们有一个尝试重现此行为的测试,但令人惊讶的是该测试有效
class TochoLevel(object):
def __init__(self, tochine):
self.tochine = tochine
def on_get(self, req, res):
res.status = falcon.HTTP_200
res.body = self.tochine.state
def get_machine():
states = ["low", "medium", "high"]
transitions = [
{'trigger': 'to_medium', 'source': ['low', 'medium', 'high'], 'dest': 'medium'},
{'trigger': 'to_high', 'source': ['low', 'medium', 'high'], 'dest': 'high'},
{'trigger': 'to_low', 'source': ['low', 'medium', 'high'], 'dest': 'low'}
]
locked_factory = MachineFactory.get_predefined(locked=True)
return locked_factory(
states=states,
transitions=transitions,
initial='low',
auto_transitions=False,
queued=False
)
def _level_observable(observer):
for i in range(1, 21):
sleep(0.1)
next_val = 'to_low'
if 8 <= i <= 15:
next_val = 'to_medium'
elif i > 15:
next_val = 'to_high'
observer.on_next(next_val)
observer.on_completed()
def get_level_observable():
return Observable.create(_level_observable)
class NotBlockingService(falcon.API):
def __init__(self):
super(NotBlockingService, self).__init__()
self.tochine = get_machine()
self.add_route('/tochez', TochoLevel(self.tochine))
def _run_machine(self, val):
self.tochine.trigger(val)
print('machine exec: {}, state: {}'.format(val, self.tochine.state))
return self.tochine.state
def start(self):
source = get_level_observable()
(source.subscribe_on(ThreadPoolScheduler(2))
.subscribe(self._run_machine))
def test_can_query_falcon_service_while_being_susbcribed_as_observer():
svc = NotBlockingService()
client = testing.TestClient(svc)
assert client.simulate_get('/tochez').text == 'low'
start = time()
svc.start()
sleep(1.2)
assert client.simulate_get('/tochez').text == 'medium'
end = time()
sleep(1.2)
assert client.simulate_get('/tochez').text == 'high'
assert (end - start) < 2
问题
当我使用 gunicorn 启动服务并传播 on_next
中的状态时,为什么状态机不更改资源 TochoLevel
中的状态rxpy?
当然,当你在开发模式下执行你的服务时,你只使用了一个fork(一个执行进程)。当您使用像 Gunicorn 这样的软件时,您正在使用预分叉策略在生产环境中提供可靠的服务。
Preforking 策略生成许多子进程来解析请求并且逻辑是独立的,在不同请求之间以独立模式工作每个 fork。
Gunicorn,多亏了 Python 中 WSGI 的标准化 App 方案(Python2_PEP-333 & Python3_PEP-3333), receives an APP object. Gunicorn launches as many instances (preforks) as indicated in its configuration. Gunicorn calls such forks workers and by default it uses 1 worker。每个 worker 将使用其状态工作,也许 Gunicorn 还会为每个请求创建新的 App 对象实例...
这就是你的状态机没有持久化的原因。
提示:首先尝试用 1 个 worker 启动 Gunicorn 并检查状态机的状态持久性。如果你实现了状态机的持久化,那么要解决的第二个问题就是所有 worker 的状态机同步。