Flink 有状态函数,使用 python SDK 进行异步调用
Flink stateful functions, async calls with the python SDK
我正在使用 Python SDK 试用 Stateful Functions 2.1 API,但我看不出如何对外部进行异步调用的明确方法 api'不会阻止应用程序。
这可能吗,或者有人可以让我走上正确的道路吗?
从 StateFun 2.2.0 开始,您可以使用 AsyncRequestReplyHandler。
来自docs:
The Python SDK ships with an additional handler,
AsyncRequestReplyHandler, that supports Python’s awaitable functions
(coroutines). This handler can be used with asynchronous Python
frameworks, for example aiohttp.
@functions.bind("example/hello")
async def hello(context, message):
response = await compute_greeting(message)
context.reply(response)
from aiohttp import web
handler = AsyncRequestReplyHandler(functions)
async def handle(request):
req = await request.read()
res = await handler(req)
return web.Response(body=res, content_type="application/octet-stream")
app = web.Application()
app.add_routes([web.post('/statefun', handle)])
if __name__ == '__main__':
web.run_app(app, port=5000)
你可以看到here一个完整的例子。
我正在使用 Python SDK 试用 Stateful Functions 2.1 API,但我看不出如何对外部进行异步调用的明确方法 api'不会阻止应用程序。
这可能吗,或者有人可以让我走上正确的道路吗?
从 StateFun 2.2.0 开始,您可以使用 AsyncRequestReplyHandler。
来自docs:
The Python SDK ships with an additional handler, AsyncRequestReplyHandler, that supports Python’s awaitable functions (coroutines). This handler can be used with asynchronous Python frameworks, for example aiohttp.
@functions.bind("example/hello")
async def hello(context, message):
response = await compute_greeting(message)
context.reply(response)
from aiohttp import web
handler = AsyncRequestReplyHandler(functions)
async def handle(request):
req = await request.read()
res = await handler(req)
return web.Response(body=res, content_type="application/octet-stream")
app = web.Application()
app.add_routes([web.post('/statefun', handle)])
if __name__ == '__main__':
web.run_app(app, port=5000)
你可以看到here一个完整的例子。