Dapr PubSub 与 dotnet SDK

Dapr PubSub with dotnet SDK

我正在尝试 运行 使用 dotnet 的基本 Dapr 设置。我遵循了文档和示例项目,但现在运气不好。

我使用 net5.0 创建了一个简单的 dotnet web API 应用程序。 API 有一个控制器和三对 get/post 端点。每对都针对特定的发布-订阅提供商(nats、rabbit、Redis)。

using System.Runtime.Serialization;
using System.Threading.Tasks;
using Dapr;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Request.Body.Peeker;
namespace live
{
    [ApiController]
    [Route("/")]
    public class HomeController : ControllerBase
    {
        private readonly ILogger<HomeController> logger;
        private readonly DaprClient dapr;
        public HomeController(ILogger<HomeController> logger, DaprClient dapr)
        {
            this.dapr = dapr;
            this.logger = logger;

        }
        [HttpGet]
        public async Task<ActionResult> Produce()
        {
            var message = new Message() { Payload = "Nats Jetstream poruka" };
            await this.dapr.PublishEventAsync<Message>("nats-pubsub", "orders.new", message);

            return Ok("Sent!");
        }


        [HttpPost("nats/subscribe")]        
        [Topic("nats-pubsub", "orders.new")]
        public async Task<ActionResult> SubscribeAsync(Message message)
        {          
            
            this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
            return Ok("Received!");
        }


        [HttpGet("rabbit")]
        public async Task<ActionResult> ProduceRabbit()
        {
            var message = new Message() { Payload = "Rabbit MQ poruka" };
            await this.dapr.PublishEventAsync<Message>("rabbit-pubsub", "orders.new", message);

            return Ok("Sent!");
        }


        //[HttpPost("rabbit/subscribe")]
        [Route("rabbit/subscribe")]
        [HttpPost()]
        [Topic("rabbit-pubsub", "orders.new")]
        public async Task<ActionResult> SubscribeRabbitAsync(Message message)
        {            
            this.logger.LogInformation("ssage received: " + JsonConvert.SerializeObject(message));
            return Ok("Received!");
        }


        [HttpGet("redis")]
        public async Task<ActionResult> ProduceRedis()
        {
            var message = new Message() { Payload = "Redis poruka" };
            await this.dapr.PublishEventAsync<Message>("redis-pubsub", "orders.new", message);

            return Ok("Sent!");
        }


        [HttpPost("redis/subscribe")]
        [Topic("redis-pubsub", "orders.new")]
        public async Task<ActionResult> SubscribeRedisAsync(Message message)
        {
            this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
            return Ok("Received!");
        }
    }

    
    public class Message
    {
        public string Payload { get; set; }
    }
}

Startup.cs 应用看起来像

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Live
{
    public class Startup
    {        
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers()
                .AddNewtonsoftJson()
                .AddDapr();
            services.AddHttpClient();
            services.AddDaprClient(); //Really no need for this
        }
                
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();
            //app.UseCloudEvents();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapSubscribeHandler();
                endpoints.MapControllers();
            });
        }
    }
}

Dapr 配置

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6380
  - name: redisPassword
    value: ""

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: nats-pubsub
  namespace: default
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: natsURL
    value: "nats://localhost:4222"
  - name: name
    value: "alan"
  - name: durableName
    value: "conversation-durable"
  - name: queueGroupName
    value: "conversation-group"
  # - name: startSequence
  #   value: 1
  # - name: startTime # in Unix format
  #   value: 1630349391
  # - name: deliverAll
  #   value: false
  # - name: flowControl
  #   value: false

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rabbit-pubsub
  namespace: default
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://localhost:5672"

我的docker-撰写文件

version: '3.4'

services: 
  nats:
    container_name: "Nats"
    image: nats
    command: [ "-js", "-m", "8222", "-D", "-V" ]
    ports:
      - "4222:4222"
      - "8222:8222"
      - "6222:6222"

  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
      - "5672:5672"
      - "15672:15672"

  postgres:
    container_name: "PostgreSQL"
    image: postgres
    environment:
      - POSTGRES_PASSWORD=rotring123
      - PGDATA=/var/lib/postgresql/data/pgdata
    # volumes:
    #   - .\docker-volumes\postgreSQL:/var/lib/postgresql/data
    ports:
      - "8081:8080"
      - "5432:5432"

  redis:
    container_name: Redis
    image: redis
    ports:
      - "6380:6379"
    # volumes:
    #   - .\docker-volumes\redis:/usr/local/etc/redis

  # dapr-placement:
  #   container_name: Dapr-service-descovery
  #   image: "daprio/dapr:1.0.0"
  #   command: ["./placement", "-port", "50000", "-log-level", "debug"]
  #   ports:
  #     - "50000:50000"

  # zipkin:
  #   image: openzipkin/zipkin-slim
  #   ports:
  #     - "5411:9411"

我正在使用命令从 CLI 启动应用程序 dapr run -a live -p 5226 dotnet run

应用程序已启动,当我去获取端点消息时已发送。我可以确认消息已发送到消息代理并且负载正常。此外,Dapr 调用我的 post 端点(每个人的兔子、nats 和 redis),但在方法参数中我收到 MessagePayload 属性 的 null 值class.

我遵循了 TrafficControll 示例,在我看来一切都设置正确。

Dapr 运行时间版本:1.4.3 这是日志的屏幕截图:https://prnt.sc/1xa8s14

非常感谢任何帮助!

[FromBody] 属性添加到操作方法参数。

例如:

public async Task<ActionResult> SubscribeAsync([FromBody] Message message)