无法使用带有 python 的 grpc 订阅 dapr 中的主题

Not able to subscribe to a topic in dapr using grpc with python

我很难找到 python 的 gRPC 发布-订阅订阅者模板。

我正在尝试的是这个,但似乎没有成功。

class DaprClientServicer(daprclient_services.DaprClientServicer):
    def OnTopicEvent(self, request, context):
        if request.topic=="TOPIC_A":
            print("Do something")
            response = "some response"
        return response

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
daprclient_services.add_DaprClientServicer_to_server(DaprClientServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()

try:
    while True:
        time.sleep(86400)
except KeyboardInterrupt:
    server.stop(0)

我的发布声明如下所示:

client.PublishEvent(dapr_messages.PublishEventEnvelope(topic='TOPIC_A', data=data))

旧答案(Dapr 的 Python-SDK API 在此之后发生了显着变化)

经过一些研究后,我发现我跳过了这一步。 那么订阅者的工作方式是这样的:

  1. 订阅主题。 (缺少步骤)

  2. 处理发布到订阅主题的消息。

这样做对我有用:

# Our server methods
class DaprClientServicer(daprclient_services.DaprClientServicer):
    def GetTopicSubscriptions(self, request, context):
        # Dapr will call this method to get the list of topics the app
        # wants to subscribe to. In this example, we are telling Dapr
        # To subscribe to a topic named TOPIC_A
        return daprclient_messages.GetTopicSubscriptionsEnvelope(topics=['TOPIC_A'])

    def OnTopicEvent(self, request, context):
        logging.info("Event received!!")
        return empty_pb2.Empty()


# Create a gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
daprclient_services.add_DaprClientServicer_to_server(
    DaprClientServicer(), server)

# Start the gRPC server
print('Starting server. Listening on port 50051.')
server.add_insecure_port('[::]:50051')
server.start()

# Since server.start() doesn't block, we need to do a sleep loop
try:
    while True:
        time.sleep(86400)
except KeyboardInterrupt:
    server.stop(0)