Flink 有状态函数,使用 python SDK 进行异步调用

Flink stateful functions, async calls with the python SDK

我正在使用 Python SDK 试用 Stateful Functions 2.1 API,但我看不出如何对外部进行异步调用的明确方法 api'不会阻止应用程序。

这可能吗,或者有人可以让我走上正确的道路吗?

从 StateFun 2.2.0 开始,您可以使用 AsyncRequestReplyHandler。

来自docs

The Python SDK ships with an additional handler, AsyncRequestReplyHandler, that supports Python’s awaitable functions (coroutines). This handler can be used with asynchronous Python frameworks, for example aiohttp.

@functions.bind("example/hello")
async def hello(context, message):
   response = await compute_greeting(message)
   context.reply(response)

from aiohttp import web

handler = AsyncRequestReplyHandler(functions)

async def handle(request):
   req = await request.read()
   res = await handler(req)
   return web.Response(body=res, content_type="application/octet-stream")

app = web.Application()
app.add_routes([web.post('/statefun', handle)])

if __name__ == '__main__':
  web.run_app(app, port=5000)

你可以看到here一个完整的例子。