构建微服务事件总线和 REST api (python / flask)
Building Microservices Event Bus and REST api (python / flask)
背景
我正在使用微服务架构构建我的第一个应用程序。我将主要使用 Flask 在 Python 中工作。
我正在考虑实施 event/message 总线来协调服务之间的操作。我打算实现的一些服务是:Auth、Users、Posts 和 Chat。该应用程序有两个实体('User' 和 'Group'),几乎每个服务都使用它们。我为每个服务都有一个单独的数据库,每个数据库都有自己的 users
和 groups
table 来管理特定于该服务的 user/group 数据。现在,当我考虑像创建新用户这样的事件时,每个服务都需要在 users
table 中创建一个新条目,这就是我考虑使用事件总线的原因。
我阅读了 ,其中讨论了 CQRS 和使用 HTTP (REST) 进行服务之间的外部通信,同时使用事件总线进行内部通信。服务处理 (HTTP) 请求,并发出有关数据更改的事件(例如,Auth 服务创建新用户)。其他服务消耗可能触发其他进程(和更多事件)的事件。
问题
我被挂断的地方是如何实际实现(在 Python 中)监听 HTTP 请求和一组订阅频道中的新事件的服务。我知道您需要使用像 redis/rabbitMQ 这样的工具,但是是否可以在同一进程中处理两种类型的请求,或者您是否需要 运行 两台服务器(一个用于 REST 请求,另一个用于其他用于事件处理)?
此外,如果您对上述一般approach/architecture有任何意见,我洗耳恭听。
因此,在进行更多研究并构建原型之后,单个服务器可以侦听来自消息代理的 HTTP 请求和事件。但是,它需要 运行 两个独立的进程(一个 Web 服务器进程用于侦听 HTTP,一个事件进程用于侦听消息代理)。
这是我为原型开发的架构:
核心模块(由文件夹图标表示)代表服务的主体,这是实际更改数据的所有代码。 HTTP Server 和 Event Worker 都从核心模块调用方法。无论是 HTTP Server 还是 Event Worker 都不会产生事件,只有核心模块会产生事件。
这是一个文件结构:
Project
|-Foo
| |- foo.py
| |- web.py
| |- worker.py
| |- revent.py
|-Bar
| |- bar.py
| |- web.py
| |- worker.py
| |- revent.py
web.py
文件是简单的烧瓶应用程序:
# bar.py
from flask import Flask, request
from bar import Bar
app = Flask(__name__)
@app.route('/bar')
def bar():
return Bar.bar_action()
if __name__ == "__main__":
app.run(port=5001, debug=1)
对于事件工作者和核心模块,我使用了我创建的模块revent.py
(redis + 事件)。它由三个 类:
- 事件 -- 事件的抽象
- 生产者 -- 核心模块使用的 service/class 将事件产生到它们的事件流中。
- Worker -- 一个事件服务器,您可以将事件映射到函数(有点像 Flask 中的路由 HTTP 端点),它还 运行 事件循环来侦听事件。
在幕后,这个模块正在使用 redis streams。我将在下面粘贴 revent.py
的代码。
但首先,这是 bar.py
的示例示例,它由 http 服务器和 worker 调用来完成工作,并向 redis 中的“bar”流发出有关其正在执行的工作的事件。
# Bar/bar.py
from revent import Producer
import redis
class Bar():
ep = Producer("bar", host="localhost", port=6379, db=0)
@ep.event("update")
def bar_action(self, foo, **kwargs):
print("BAR ACTION")
#ep.send_event("update", {"test": str(True)})
return "BAR ACTION"
if __name__ == '__main__':
Bar().bar_action("test", test="True")
最后,这是一个示例 worker,它将侦听“bar”流上的事件 Foo/worker.py
。
# Foo/worker.py
from revent import Worker
worker = Worker()
@worker.on('bar', "update")
def test(foo, test=False):
if bool(test) == False:
print('test')
else:
print('tested')
if __name__ == "__main__":
worker.listen(host='127.0.0.1', port=6379, db=0)
正如所承诺的,这是我构建的 revent.py
模块的代码。向 pypl 添加一个更进一步开发的版本可能是值得的,但我只是使用 sym link 来保持我的两个版本同步。
# revent.py
import redis
from datetime import datetime
import functools
class Worker:
# streams = {
# "bar": {
# "update": Foo.foo_action
# },
# }
def __init__(self):
self._events = {}
def on(self, stream, action, **options):
"""
Wrapper to register a function to an event
"""
def decorator(func):
self.register_event(stream, action, func, **options)
return func
return decorator
def register_event(self, stream, action, func, **options):
"""
Map an event to a function
"""
if stream in self._events.keys():
self._events[stream][action] = func
else:
self._events[stream] = {action: func}
def listen(self, host, port, db):
"""
Main event loop
Establish redis connection from passed parameters
Wait for events from the specified streams
Dispatch to appropriate event handler
"""
self._r = redis.Redis(host=host, port=port, db=db)
streams = " ".join(self._events.keys())
while True:
event = self._r.xread({streams: "$"}, None, 0)
# Call function that is mapped to this event
self._dispatch(event)
def _dispatch(self, event):
"""
Call a function given an event
If the event has been registered, the registered function will be called with the passed params.
"""
e = Event(event=event)
if e.action in self._events[e.stream].keys():
func = self._events[e.stream][e.action]
print(f"{datetime.now()} - Stream: {e.stream} - {e.event_id}: {e.action} {e.data}")
return func(**e.data)
class Event():
"""
Abstraction for an event
"""
def __init__(self, stream="", action="", data={}, event=None):
self.stream = stream
self.action = action
self.data = data
self.event_id=None
if event:
self.parse_event(event)
def parse_event(self, event):
# event = [[b'bar', [(b'1594764770578-0', {b'action': b'update', b'test': b'True'})]]]
self.stream = event[0][0].decode('utf-8')
self.event_id = event[0][1][0][0].decode('utf-8')
self.data = event[0][1][0][1]
self.action = self.data.pop(b'action').decode('utf-8')
params = {}
for k, v in self.data.items():
params[k.decode('utf-8')] = v.decode('utf-8')
self.data = params
def publish(self, r):
body = {
"action": self.action
}
for k, v in self.data.items():
body[k] = v
r.xadd(self.stream, body)
class Producer:
"""
Abstraction for a service (module) that publishes events about itself
Manages stream information and can publish events
"""
# stream = None
# _r = redis.Redis(host="localhost", port=6379, db=0)
def __init__(self, stream_name, host, port, db):
self.stream = stream_name
self._r = redis.Redis(host="localhost", port=6379, db=0)
def send_event(self, action, data):
e = Event(stream=self.stream, action=action, data=data)
e.publish(self._r)
def event(self, action, data={}):
def decorator(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
result = func(*args, **kwargs)
arg_keys = func.__code__.co_varnames[1:-1]
for i in range(1, len(args)):
kwargs[arg_keys[i-1]] = args[i]
self.send_event(action, kwargs)
return result
return wrapped
return decorator
所以,把它们放在一起。 foo.py
和 bar.py
模块分别完成 Foo 和 Bar 服务的实际工作。它们的方法由 HTTP 服务器和事件工作者调用以处理 requests/events。在开展工作时,这两个模块会发出有关其状态更改的事件,以便其他感兴趣的服务可以相应地采取行动。 HTTP 服务器只是一个普通的网络应用程序,例如使用烧瓶。事件工作者在概念上类似于 Web 服务器,它在 Redis 中侦听事件而不是 http 请求。这两个进程(Web 服务器和事件工作者)都需要分别 运行。因此,如果您在本地开发,则需要 运行 在不同的终端 windows 或使用 container/process 编排器。
很多。希望对大家有所帮助,如有疑问请在评论中告诉我。
编辑
我将 revent.py 文件作为一个包上传到 pypi -- redisevents。我将在本周晚些时候添加更多关于如何 use/extend 它的文档。
背景
我正在使用微服务架构构建我的第一个应用程序。我将主要使用 Flask 在 Python 中工作。
我正在考虑实施 event/message 总线来协调服务之间的操作。我打算实现的一些服务是:Auth、Users、Posts 和 Chat。该应用程序有两个实体('User' 和 'Group'),几乎每个服务都使用它们。我为每个服务都有一个单独的数据库,每个数据库都有自己的 users
和 groups
table 来管理特定于该服务的 user/group 数据。现在,当我考虑像创建新用户这样的事件时,每个服务都需要在 users
table 中创建一个新条目,这就是我考虑使用事件总线的原因。
我阅读了
问题
我被挂断的地方是如何实际实现(在 Python 中)监听 HTTP 请求和一组订阅频道中的新事件的服务。我知道您需要使用像 redis/rabbitMQ 这样的工具,但是是否可以在同一进程中处理两种类型的请求,或者您是否需要 运行 两台服务器(一个用于 REST 请求,另一个用于其他用于事件处理)?
此外,如果您对上述一般approach/architecture有任何意见,我洗耳恭听。
因此,在进行更多研究并构建原型之后,单个服务器可以侦听来自消息代理的 HTTP 请求和事件。但是,它需要 运行 两个独立的进程(一个 Web 服务器进程用于侦听 HTTP,一个事件进程用于侦听消息代理)。
这是我为原型开发的架构:
核心模块(由文件夹图标表示)代表服务的主体,这是实际更改数据的所有代码。 HTTP Server 和 Event Worker 都从核心模块调用方法。无论是 HTTP Server 还是 Event Worker 都不会产生事件,只有核心模块会产生事件。
这是一个文件结构:
Project
|-Foo
| |- foo.py
| |- web.py
| |- worker.py
| |- revent.py
|-Bar
| |- bar.py
| |- web.py
| |- worker.py
| |- revent.py
web.py
文件是简单的烧瓶应用程序:
# bar.py
from flask import Flask, request
from bar import Bar
app = Flask(__name__)
@app.route('/bar')
def bar():
return Bar.bar_action()
if __name__ == "__main__":
app.run(port=5001, debug=1)
对于事件工作者和核心模块,我使用了我创建的模块revent.py
(redis + 事件)。它由三个 类:
- 事件 -- 事件的抽象
- 生产者 -- 核心模块使用的 service/class 将事件产生到它们的事件流中。
- Worker -- 一个事件服务器,您可以将事件映射到函数(有点像 Flask 中的路由 HTTP 端点),它还 运行 事件循环来侦听事件。
在幕后,这个模块正在使用 redis streams。我将在下面粘贴 revent.py
的代码。
但首先,这是 bar.py
的示例示例,它由 http 服务器和 worker 调用来完成工作,并向 redis 中的“bar”流发出有关其正在执行的工作的事件。
# Bar/bar.py
from revent import Producer
import redis
class Bar():
ep = Producer("bar", host="localhost", port=6379, db=0)
@ep.event("update")
def bar_action(self, foo, **kwargs):
print("BAR ACTION")
#ep.send_event("update", {"test": str(True)})
return "BAR ACTION"
if __name__ == '__main__':
Bar().bar_action("test", test="True")
最后,这是一个示例 worker,它将侦听“bar”流上的事件 Foo/worker.py
。
# Foo/worker.py
from revent import Worker
worker = Worker()
@worker.on('bar', "update")
def test(foo, test=False):
if bool(test) == False:
print('test')
else:
print('tested')
if __name__ == "__main__":
worker.listen(host='127.0.0.1', port=6379, db=0)
正如所承诺的,这是我构建的 revent.py
模块的代码。向 pypl 添加一个更进一步开发的版本可能是值得的,但我只是使用 sym link 来保持我的两个版本同步。
# revent.py
import redis
from datetime import datetime
import functools
class Worker:
# streams = {
# "bar": {
# "update": Foo.foo_action
# },
# }
def __init__(self):
self._events = {}
def on(self, stream, action, **options):
"""
Wrapper to register a function to an event
"""
def decorator(func):
self.register_event(stream, action, func, **options)
return func
return decorator
def register_event(self, stream, action, func, **options):
"""
Map an event to a function
"""
if stream in self._events.keys():
self._events[stream][action] = func
else:
self._events[stream] = {action: func}
def listen(self, host, port, db):
"""
Main event loop
Establish redis connection from passed parameters
Wait for events from the specified streams
Dispatch to appropriate event handler
"""
self._r = redis.Redis(host=host, port=port, db=db)
streams = " ".join(self._events.keys())
while True:
event = self._r.xread({streams: "$"}, None, 0)
# Call function that is mapped to this event
self._dispatch(event)
def _dispatch(self, event):
"""
Call a function given an event
If the event has been registered, the registered function will be called with the passed params.
"""
e = Event(event=event)
if e.action in self._events[e.stream].keys():
func = self._events[e.stream][e.action]
print(f"{datetime.now()} - Stream: {e.stream} - {e.event_id}: {e.action} {e.data}")
return func(**e.data)
class Event():
"""
Abstraction for an event
"""
def __init__(self, stream="", action="", data={}, event=None):
self.stream = stream
self.action = action
self.data = data
self.event_id=None
if event:
self.parse_event(event)
def parse_event(self, event):
# event = [[b'bar', [(b'1594764770578-0', {b'action': b'update', b'test': b'True'})]]]
self.stream = event[0][0].decode('utf-8')
self.event_id = event[0][1][0][0].decode('utf-8')
self.data = event[0][1][0][1]
self.action = self.data.pop(b'action').decode('utf-8')
params = {}
for k, v in self.data.items():
params[k.decode('utf-8')] = v.decode('utf-8')
self.data = params
def publish(self, r):
body = {
"action": self.action
}
for k, v in self.data.items():
body[k] = v
r.xadd(self.stream, body)
class Producer:
"""
Abstraction for a service (module) that publishes events about itself
Manages stream information and can publish events
"""
# stream = None
# _r = redis.Redis(host="localhost", port=6379, db=0)
def __init__(self, stream_name, host, port, db):
self.stream = stream_name
self._r = redis.Redis(host="localhost", port=6379, db=0)
def send_event(self, action, data):
e = Event(stream=self.stream, action=action, data=data)
e.publish(self._r)
def event(self, action, data={}):
def decorator(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
result = func(*args, **kwargs)
arg_keys = func.__code__.co_varnames[1:-1]
for i in range(1, len(args)):
kwargs[arg_keys[i-1]] = args[i]
self.send_event(action, kwargs)
return result
return wrapped
return decorator
所以,把它们放在一起。 foo.py
和 bar.py
模块分别完成 Foo 和 Bar 服务的实际工作。它们的方法由 HTTP 服务器和事件工作者调用以处理 requests/events。在开展工作时,这两个模块会发出有关其状态更改的事件,以便其他感兴趣的服务可以相应地采取行动。 HTTP 服务器只是一个普通的网络应用程序,例如使用烧瓶。事件工作者在概念上类似于 Web 服务器,它在 Redis 中侦听事件而不是 http 请求。这两个进程(Web 服务器和事件工作者)都需要分别 运行。因此,如果您在本地开发,则需要 运行 在不同的终端 windows 或使用 container/process 编排器。
很多。希望对大家有所帮助,如有疑问请在评论中告诉我。
编辑
我将 revent.py 文件作为一个包上传到 pypi -- redisevents。我将在本周晚些时候添加更多关于如何 use/extend 它的文档。