如何从一个应用程序调用另一个 Sanic 应用程序
How to call another Sanic Application from One
我在微服务架构中分别部署了两个 Python Web 框架 (Sanic)。
如何在框架 B 中调用框架 A 的端点?
我正在寻找不需要提供完整 URL 的东西。
类似
@app.route(/data_from_framework_a)
def foo(request):
response = request.app.ctx.framework_a_client.get("/get_data")
其中 framework_a_client
已经通过身份验证可以使用来自框架 A 的服务并且知道配置的主机和端口。
注意:两个框架都是独立的,托管在不同的机器上,并且需要身份验证才能访问它们
这是一个没有单一答案的宽泛问题。应用程序实例彼此完全分开。您当然可以构建一个服务,实例化它,并提供所有必要的细节来说明如何与其他应用程序交互。但是,这完全取决于您的应用程序需求。
话虽如此,这是一个准系统服务,您可以使用它来遵循我过去使用过的模式。
class ClientA:
def __init__(self, base_url, authentication_details):
self.base_url = base_url
self._store_authentication_details(authentication_details)
def _prepare_authentication_headers(self):
return {
"authorization": "..."
}
async def get(self, path: str) -> httpx.Response:
async with httpx.AsyncClient() as client:
return await client.get(
"/".join([self.base_url, path]),
headers=self._prepare_authentication_headers()
)
实现如下:
@app.before_server_start
async def start_client_a(app):
app.ctx.framework_a_client = ClientA(...)
@app.route(/data_from_framework_a)
def foo(request):
response = await request.app.ctx.framework_a_client.get("/get_data")
或者使用 Sanic Extensions...
@app.before_server_start
async def start_client_a(app):
app.ext.dependency(ClientA(...))
@app.route(/data_from_framework_a)
def foo(request, client_a: ClientA): # Benefit of this is we now have a fully typed object
response = await client_a.get("/get_data")
我在微服务架构中分别部署了两个 Python Web 框架 (Sanic)。
如何在框架 B 中调用框架 A 的端点?
我正在寻找不需要提供完整 URL 的东西。
类似
@app.route(/data_from_framework_a)
def foo(request):
response = request.app.ctx.framework_a_client.get("/get_data")
其中 framework_a_client
已经通过身份验证可以使用来自框架 A 的服务并且知道配置的主机和端口。
注意:两个框架都是独立的,托管在不同的机器上,并且需要身份验证才能访问它们
这是一个没有单一答案的宽泛问题。应用程序实例彼此完全分开。您当然可以构建一个服务,实例化它,并提供所有必要的细节来说明如何与其他应用程序交互。但是,这完全取决于您的应用程序需求。
话虽如此,这是一个准系统服务,您可以使用它来遵循我过去使用过的模式。
class ClientA:
def __init__(self, base_url, authentication_details):
self.base_url = base_url
self._store_authentication_details(authentication_details)
def _prepare_authentication_headers(self):
return {
"authorization": "..."
}
async def get(self, path: str) -> httpx.Response:
async with httpx.AsyncClient() as client:
return await client.get(
"/".join([self.base_url, path]),
headers=self._prepare_authentication_headers()
)
实现如下:
@app.before_server_start
async def start_client_a(app):
app.ctx.framework_a_client = ClientA(...)
@app.route(/data_from_framework_a)
def foo(request):
response = await request.app.ctx.framework_a_client.get("/get_data")
或者使用 Sanic Extensions...
@app.before_server_start
async def start_client_a(app):
app.ext.dependency(ClientA(...))
@app.route(/data_from_framework_a)
def foo(request, client_a: ClientA): # Benefit of this is we now have a fully typed object
response = await client_a.get("/get_data")