无法使用带有 python 的 grpc 订阅 dapr 中的主题
Not able to subscribe to a topic in dapr using grpc with python
我很难找到 python 的 gRPC 发布-订阅订阅者模板。
我正在尝试的是这个,但似乎没有成功。
class DaprClientServicer(daprclient_services.DaprClientServicer):
def OnTopicEvent(self, request, context):
if request.topic=="TOPIC_A":
print("Do something")
response = "some response"
return response
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
daprclient_services.add_DaprClientServicer_to_server(DaprClientServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)
我的发布声明如下所示:
client.PublishEvent(dapr_messages.PublishEventEnvelope(topic='TOPIC_A', data=data))
旧答案(Dapr 的 Python-SDK API 在此之后发生了显着变化)
经过一些研究后,我发现我跳过了这一步。
那么订阅者的工作方式是这样的:
订阅主题。 (缺少步骤)
处理发布到订阅主题的消息。
这样做对我有用:
# Our server methods
class DaprClientServicer(daprclient_services.DaprClientServicer):
def GetTopicSubscriptions(self, request, context):
# Dapr will call this method to get the list of topics the app
# wants to subscribe to. In this example, we are telling Dapr
# To subscribe to a topic named TOPIC_A
return daprclient_messages.GetTopicSubscriptionsEnvelope(topics=['TOPIC_A'])
def OnTopicEvent(self, request, context):
logging.info("Event received!!")
return empty_pb2.Empty()
# Create a gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
daprclient_services.add_DaprClientServicer_to_server(
DaprClientServicer(), server)
# Start the gRPC server
print('Starting server. Listening on port 50051.')
server.add_insecure_port('[::]:50051')
server.start()
# Since server.start() doesn't block, we need to do a sleep loop
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)
我很难找到 python 的 gRPC 发布-订阅订阅者模板。
我正在尝试的是这个,但似乎没有成功。
class DaprClientServicer(daprclient_services.DaprClientServicer):
def OnTopicEvent(self, request, context):
if request.topic=="TOPIC_A":
print("Do something")
response = "some response"
return response
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
daprclient_services.add_DaprClientServicer_to_server(DaprClientServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)
我的发布声明如下所示:
client.PublishEvent(dapr_messages.PublishEventEnvelope(topic='TOPIC_A', data=data))
旧答案(Dapr 的 Python-SDK API 在此之后发生了显着变化)
经过一些研究后,我发现我跳过了这一步。 那么订阅者的工作方式是这样的:
订阅主题。 (缺少步骤)
处理发布到订阅主题的消息。
这样做对我有用:
# Our server methods
class DaprClientServicer(daprclient_services.DaprClientServicer):
def GetTopicSubscriptions(self, request, context):
# Dapr will call this method to get the list of topics the app
# wants to subscribe to. In this example, we are telling Dapr
# To subscribe to a topic named TOPIC_A
return daprclient_messages.GetTopicSubscriptionsEnvelope(topics=['TOPIC_A'])
def OnTopicEvent(self, request, context):
logging.info("Event received!!")
return empty_pb2.Empty()
# Create a gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
daprclient_services.add_DaprClientServicer_to_server(
DaprClientServicer(), server)
# Start the gRPC server
print('Starting server. Listening on port 50051.')
server.add_insecure_port('[::]:50051')
server.start()
# Since server.start() doesn't block, we need to do a sleep loop
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)