如何使用 Faust Python 包将 kafka 主题与 Web 端点连接起来?
How to connect kafka topic with web endpoint using Faust Python package?
我有一个简单的应用程序,有两个功能,一个用于收听主题,另一个用于网络端点。我想创建服务器端事件流 (SSE),即 text/event-stream,以便在客户端我可以使用 EventSource 收听它。
我现在有以下代码,其中每个函数都在执行其特定的工作:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
@app.agent(test_topic)
async def test_topic_agent(stream):
async for value in stream:
print(f"test_topic_agent RECEIVED -- {value!r}")
yield value
@app.page("/")
async def index(self, request):
return self.text("yey")
现在,我想在索引中使用类似于此代码的内容,但使用 faust:
import asyncio
from aiohttp import web
from aiohttp.web import Response
from aiohttp_sse import sse_response
from datetime import datetime
async def hello(request):
loop = request.app.loop
async with sse_response(request) as resp:
while True:
data = 'Server Time : {}'.format(datetime.now())
print(data)
await resp.send(data)
await asyncio.sleep(1, loop=loop)
return resp
async def index(request):
d = """
<html>
<body>
<script>
var evtSource = new EventSource("/hello");
evtSource.onmessage = function(e) {
document.getElementById('response').innerText = e.data
}
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return Response(text=d, content_type='text/html')
app = web.Application()
app.router.add_route('GET', '/hello', hello)
app.router.add_route('GET', '/', index)
web.run_app(app, host='127.0.0.1', port=8080)
我试过这个:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
# @app.agent(test_topic)
# async def test_topic_agent(stream):
# async for value in stream:
# print(f"test_topic_agent RECEIVED -- {value!r}")
# yield value
@app.page("/", name="t1")
@app.agent(test_topic, name="t")
async def index(self, request):
return self.text("yey")
但它给我以下错误:
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 299, in find_app
val = symbol_by_name(app, imp=imp)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 262, in symbol_by_name
module = imp( # type: ignore
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
return imp(module, package=package)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
async def index(self, request):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
view = view_base.from_handler(cast(ViewHandlerFun, fun))
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/faust_demo/bin/faust", line 8, in <module>
sys.exit(cli())
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 781, in main
with self.make_context(prog_name, args, **extra) as ctx:
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 407, in make_context
self._maybe_import_app()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 372, in _maybe_import_app
find_app(appstr)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 303, in find_app
val = imp(app)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
return imp(module, package=package)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
async def index(self, request):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
view = view_base.from_handler(cast(ViewHandlerFun, fun))
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'
我试过这个:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
# @app.agent(test_topic)
# async def test_topic_agent(stream):
# async for value in stream:
# print(f"test_topic_agent RECEIVED -- {value!r}")
# yield value
@app.agent(test_topic, name="t")
@app.page("/", name="t1")
async def index(self, request):
return self.text("yey")
但是我得到以下错误:
[2020-03-28 10:32:50,676] [29976] [INFO] [^--Producer]: Creating topic 'app1-__assignor-__leader'
[2020-03-28 10:32:50,695] [29976] [INFO] [^--ReplyConsumer]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^--AgentManager]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^---Agent: app1.index]: Starting...
[2020-03-28 10:32:50,696] [29976] [ERROR] [^Worker]: Error: TypeError("__init__() missing 1 required positional argument: 'web'")
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/worker.py", line 273, in execute_from_commandline
self.loop.run_until_complete(self._starting_fut)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py", line 612, in run_until_complete
return future.result()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
await child.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
await child.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
await self.on_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/manager.py", line 58, in on_start
await agent.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
await self.on_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 282, in on_start
await self._on_start_supervisor()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 312, in _on_start_supervisor
res = await self._start_one(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 251, in _start_one
return await self._start_task(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 617, in _start_task
actor = self(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 525, in __call__
return self.actor_from_stream(stream,
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 552, in actor_from_stream
res = self.fun(actual_stream)
TypeError: __init__() missing 1 required positional argument: 'web'
[2020-03-28 10:32:50,703] [29976] [INFO] [^Worker]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Flush producer buffer...
[2020-03-28 10:32:50,703] [29976] [INFO] [^--TableManager]: Stopping...
有什么办法吗?
提前致谢!
阅读 faust 网络文档,它似乎无法处理 SSE。
@app.agent
在消费kafka消息时回调,@app.page
在处理http请求时回调。将它们组合起来可能是不可能的。
另一种使用 faust.web
的方法是从 javascript.
进行轮询
例如使用 :
import faust
from faust.web import Response
app = faust.App("myapp", broker="kafka://kafka:9092", value_serializer="raw")
test_topic = app.topic("test")
app.lastmsg = ""
@app.agent(test_topic)
async def test_topic_agent(stream):
async for value in stream:
app.lastmsg = str(value)
yield value
@app.page("/msg")
async def msg(self, request):
return self.text(app.lastmsg)
@app.page("/")
async def index(self, request):
body = """
<html>
<body>
<script>
setInterval(()=>{
let xhr = new XMLHttpRequest();
xhr.open('GET', '/msg');
xhr.send();
xhr.onload = function() {
if (xhr.status == 200) {
document.getElementById('response').innerText = xhr.response
}
}
},1000);
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return self.html(body)
这个天真的实现将 kafka 消息存储到 app.lastmsg
,可以通过 api /msg
.
获取
为了使用 SSE,您可以对 Web 部件使用 asyncio
,对 kafka 消费者使用 faust
。
Faust worker 还将在每个实例上公开一个 Web 服务器,默认情况下在端口 6066 上运行。
服务器将使用 aiohttp HTTP 服务器库,您可以利用它创建一个服务器端事件流 (SSE),就像您的示例代码中那样。
您可以创建一个代理,它将从 Kafka 主题 test
中读取,并使用主题中的最后一条消息更新变量 last_message_from_topic
,该变量也可以从您的网页中看到。
在索引页 (@app.page('/')
) 中,EventSource 接口用于接收服务器发送的事件。它通过 HTTP 连接到服务器并从页面 /hello
接收 text/event-stream 格式的事件而不关闭连接。
网页 /hello
每秒发送一条消息文本,其中包含来自 Kafka 主题 test
的最后一条消息以及来自服务器的当前时间。
这是我的文件my_worker.py
代码:
import asyncio
from datetime import datetime
import faust
from aiohttp.web import Response
from aiohttp_sse import sse_response
app = faust.App(
"app1",
broker='kafka://localhost:9092',
value_serializer='json',
)
test_topic = app.topic("test")
last_message_from_topic = ['No messages yet']
@app.agent(test_topic)
async def greet(greetings):
async for greeting in greetings:
last_message_from_topic[0] = greeting
@app.page('/hello')
async def hello(self, request):
loop = request.app.loop
async with sse_response(request) as resp:
while True:
data = f'last message from topic_test: {last_message_from_topic[0]} | '
data += f'Server Time : {datetime.now()}'
print(data)
await resp.send(data)
await asyncio.sleep(1, loop=loop)
return resp
@app.page('/')
async def index(self, request):
d = """
<html>
<body>
<script>
var evtSource = new EventSource("/hello");
evtSource.onmessage = function(e) {
document.getElementById('response').innerText = e.data
}
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return Response(text=d, content_type='text/html')
现在您必须使用以下命令启动 Faust worker:
faust -A my_worker worker -l info
在您的网络浏览器上,您可以访问 http://localhost:6066/
:
这里是关于主题 test
向 Kafka 发送消息的代码(来自另一个 python 文件):
import time
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for i in range(220):
time.sleep(1)
producer.send('test', value=f'Some message from kafka id {i}')
我能够获得 Faust 服务、代理和 Web 端点交互。这分为三个部分,一个处理应用程序生命周期的服务 class,一个将消息转发到业务 class 的 Kafka 和门户端点,以及将消息转发给业务 class 的业务 class一些功能并通过服务 class 将结果传回 kafka
注意:这是伪代码,您需要填写一些基本内容
import MyBusinessClass
# This is my class handling the business functions, events are routed to it
# from kafka or the web portal and it takes an action by sending a message
# back on kafka via the Service class
actor = MyBusinessClass()
# Kafka topic to receive messages and hand over to business class
topic = app.topic('mytopic', value_serializer='raw')
@app.service
class MyService:
async def on_start(self):
# Give a reference of your self to Business class
actor.setService(self)
... do startup stuff
async def on_stop(self):
... do shutdown stuff
# Kafka consumption end point
@app.agent(topic)
async def process_kafka(stream):
async for payload in stream:
obj = from_dict(data_class=MsgPojo, data=loads(payload))
# hand over the message to the business class
await actor.onKafkaMsg(obj)
# Web portal endpoint
@app.page('/main')
async def portal_main(self, request):
... do something
# Send portal actions over the internal channel to
await actor.onPortalMsg(request)
# This function is called by the Business class to send messages over
# kafka, in response to an action on the web portal or a message from
# kafka
async def sendOrder(self, order)
await topic.send(value=order)
class MyBusinessClass:
# gets a handle to the service class for network communication
def setService(self, service:MyService):
self.service = service
def onKafkaMsg(self, obj)
... do something with obj
await self.service.sendOrder(result)
def onPortalMsg(self, portalreq)
... do something with portalreq
await self.service.sendOrder(result)
我有一个简单的应用程序,有两个功能,一个用于收听主题,另一个用于网络端点。我想创建服务器端事件流 (SSE),即 text/event-stream,以便在客户端我可以使用 EventSource 收听它。
我现在有以下代码,其中每个函数都在执行其特定的工作:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
@app.agent(test_topic)
async def test_topic_agent(stream):
async for value in stream:
print(f"test_topic_agent RECEIVED -- {value!r}")
yield value
@app.page("/")
async def index(self, request):
return self.text("yey")
现在,我想在索引中使用类似于此代码的内容,但使用 faust:
import asyncio
from aiohttp import web
from aiohttp.web import Response
from aiohttp_sse import sse_response
from datetime import datetime
async def hello(request):
loop = request.app.loop
async with sse_response(request) as resp:
while True:
data = 'Server Time : {}'.format(datetime.now())
print(data)
await resp.send(data)
await asyncio.sleep(1, loop=loop)
return resp
async def index(request):
d = """
<html>
<body>
<script>
var evtSource = new EventSource("/hello");
evtSource.onmessage = function(e) {
document.getElementById('response').innerText = e.data
}
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return Response(text=d, content_type='text/html')
app = web.Application()
app.router.add_route('GET', '/hello', hello)
app.router.add_route('GET', '/', index)
web.run_app(app, host='127.0.0.1', port=8080)
我试过这个:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
# @app.agent(test_topic)
# async def test_topic_agent(stream):
# async for value in stream:
# print(f"test_topic_agent RECEIVED -- {value!r}")
# yield value
@app.page("/", name="t1")
@app.agent(test_topic, name="t")
async def index(self, request):
return self.text("yey")
但它给我以下错误:
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 299, in find_app
val = symbol_by_name(app, imp=imp)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 262, in symbol_by_name
module = imp( # type: ignore
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
return imp(module, package=package)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
async def index(self, request):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
view = view_base.from_handler(cast(ViewHandlerFun, fun))
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/faust_demo/bin/faust", line 8, in <module>
sys.exit(cli())
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 781, in main
with self.make_context(prog_name, args, **extra) as ctx:
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 407, in make_context
self._maybe_import_app()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 372, in _maybe_import_app
find_app(appstr)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 303, in find_app
val = imp(app)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
return imp(module, package=package)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
async def index(self, request):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
view = view_base.from_handler(cast(ViewHandlerFun, fun))
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'
我试过这个:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
# @app.agent(test_topic)
# async def test_topic_agent(stream):
# async for value in stream:
# print(f"test_topic_agent RECEIVED -- {value!r}")
# yield value
@app.agent(test_topic, name="t")
@app.page("/", name="t1")
async def index(self, request):
return self.text("yey")
但是我得到以下错误:
[2020-03-28 10:32:50,676] [29976] [INFO] [^--Producer]: Creating topic 'app1-__assignor-__leader'
[2020-03-28 10:32:50,695] [29976] [INFO] [^--ReplyConsumer]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^--AgentManager]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^---Agent: app1.index]: Starting...
[2020-03-28 10:32:50,696] [29976] [ERROR] [^Worker]: Error: TypeError("__init__() missing 1 required positional argument: 'web'")
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/worker.py", line 273, in execute_from_commandline
self.loop.run_until_complete(self._starting_fut)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py", line 612, in run_until_complete
return future.result()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
await child.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
await child.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
await self.on_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/manager.py", line 58, in on_start
await agent.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
await self.on_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 282, in on_start
await self._on_start_supervisor()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 312, in _on_start_supervisor
res = await self._start_one(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 251, in _start_one
return await self._start_task(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 617, in _start_task
actor = self(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 525, in __call__
return self.actor_from_stream(stream,
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 552, in actor_from_stream
res = self.fun(actual_stream)
TypeError: __init__() missing 1 required positional argument: 'web'
[2020-03-28 10:32:50,703] [29976] [INFO] [^Worker]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Flush producer buffer...
[2020-03-28 10:32:50,703] [29976] [INFO] [^--TableManager]: Stopping...
有什么办法吗? 提前致谢!
阅读 faust 网络文档,它似乎无法处理 SSE。
@app.agent
在消费kafka消息时回调,@app.page
在处理http请求时回调。将它们组合起来可能是不可能的。
另一种使用 faust.web
的方法是从 javascript.
进行轮询
例如使用 :
import faust
from faust.web import Response
app = faust.App("myapp", broker="kafka://kafka:9092", value_serializer="raw")
test_topic = app.topic("test")
app.lastmsg = ""
@app.agent(test_topic)
async def test_topic_agent(stream):
async for value in stream:
app.lastmsg = str(value)
yield value
@app.page("/msg")
async def msg(self, request):
return self.text(app.lastmsg)
@app.page("/")
async def index(self, request):
body = """
<html>
<body>
<script>
setInterval(()=>{
let xhr = new XMLHttpRequest();
xhr.open('GET', '/msg');
xhr.send();
xhr.onload = function() {
if (xhr.status == 200) {
document.getElementById('response').innerText = xhr.response
}
}
},1000);
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return self.html(body)
这个天真的实现将 kafka 消息存储到 app.lastmsg
,可以通过 api /msg
.
为了使用 SSE,您可以对 Web 部件使用 asyncio
,对 kafka 消费者使用 faust
。
Faust worker 还将在每个实例上公开一个 Web 服务器,默认情况下在端口 6066 上运行。
服务器将使用 aiohttp HTTP 服务器库,您可以利用它创建一个服务器端事件流 (SSE),就像您的示例代码中那样。
您可以创建一个代理,它将从 Kafka 主题 test
中读取,并使用主题中的最后一条消息更新变量 last_message_from_topic
,该变量也可以从您的网页中看到。
在索引页 (@app.page('/')
) 中,EventSource 接口用于接收服务器发送的事件。它通过 HTTP 连接到服务器并从页面 /hello
接收 text/event-stream 格式的事件而不关闭连接。
网页 /hello
每秒发送一条消息文本,其中包含来自 Kafka 主题 test
的最后一条消息以及来自服务器的当前时间。
这是我的文件my_worker.py
代码:
import asyncio
from datetime import datetime
import faust
from aiohttp.web import Response
from aiohttp_sse import sse_response
app = faust.App(
"app1",
broker='kafka://localhost:9092',
value_serializer='json',
)
test_topic = app.topic("test")
last_message_from_topic = ['No messages yet']
@app.agent(test_topic)
async def greet(greetings):
async for greeting in greetings:
last_message_from_topic[0] = greeting
@app.page('/hello')
async def hello(self, request):
loop = request.app.loop
async with sse_response(request) as resp:
while True:
data = f'last message from topic_test: {last_message_from_topic[0]} | '
data += f'Server Time : {datetime.now()}'
print(data)
await resp.send(data)
await asyncio.sleep(1, loop=loop)
return resp
@app.page('/')
async def index(self, request):
d = """
<html>
<body>
<script>
var evtSource = new EventSource("/hello");
evtSource.onmessage = function(e) {
document.getElementById('response').innerText = e.data
}
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return Response(text=d, content_type='text/html')
现在您必须使用以下命令启动 Faust worker:
faust -A my_worker worker -l info
在您的网络浏览器上,您可以访问 http://localhost:6066/
:
这里是关于主题 test
向 Kafka 发送消息的代码(来自另一个 python 文件):
import time
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for i in range(220):
time.sleep(1)
producer.send('test', value=f'Some message from kafka id {i}')
我能够获得 Faust 服务、代理和 Web 端点交互。这分为三个部分,一个处理应用程序生命周期的服务 class,一个将消息转发到业务 class 的 Kafka 和门户端点,以及将消息转发给业务 class 的业务 class一些功能并通过服务 class 将结果传回 kafka
注意:这是伪代码,您需要填写一些基本内容
import MyBusinessClass
# This is my class handling the business functions, events are routed to it
# from kafka or the web portal and it takes an action by sending a message
# back on kafka via the Service class
actor = MyBusinessClass()
# Kafka topic to receive messages and hand over to business class
topic = app.topic('mytopic', value_serializer='raw')
@app.service
class MyService:
async def on_start(self):
# Give a reference of your self to Business class
actor.setService(self)
... do startup stuff
async def on_stop(self):
... do shutdown stuff
# Kafka consumption end point
@app.agent(topic)
async def process_kafka(stream):
async for payload in stream:
obj = from_dict(data_class=MsgPojo, data=loads(payload))
# hand over the message to the business class
await actor.onKafkaMsg(obj)
# Web portal endpoint
@app.page('/main')
async def portal_main(self, request):
... do something
# Send portal actions over the internal channel to
await actor.onPortalMsg(request)
# This function is called by the Business class to send messages over
# kafka, in response to an action on the web portal or a message from
# kafka
async def sendOrder(self, order)
await topic.send(value=order)
class MyBusinessClass:
# gets a handle to the service class for network communication
def setService(self, service:MyService):
self.service = service
def onKafkaMsg(self, obj)
... do something with obj
await self.service.sendOrder(result)
def onPortalMsg(self, portalreq)
... do something with portalreq
await self.service.sendOrder(result)