Dapr PubSub 与 dotnet SDK
Dapr PubSub with dotnet SDK
我正在尝试 运行 使用 dotnet 的基本 Dapr 设置。我遵循了文档和示例项目,但现在运气不好。
我使用 net5.0 创建了一个简单的 dotnet web API 应用程序。 API 有一个控制器和三对 get/post 端点。每对都针对特定的发布-订阅提供商(nats、rabbit、Redis)。
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Dapr;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Request.Body.Peeker;
namespace live
{
[ApiController]
[Route("/")]
public class HomeController : ControllerBase
{
private readonly ILogger<HomeController> logger;
private readonly DaprClient dapr;
public HomeController(ILogger<HomeController> logger, DaprClient dapr)
{
this.dapr = dapr;
this.logger = logger;
}
[HttpGet]
public async Task<ActionResult> Produce()
{
var message = new Message() { Payload = "Nats Jetstream poruka" };
await this.dapr.PublishEventAsync<Message>("nats-pubsub", "orders.new", message);
return Ok("Sent!");
}
[HttpPost("nats/subscribe")]
[Topic("nats-pubsub", "orders.new")]
public async Task<ActionResult> SubscribeAsync(Message message)
{
this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
return Ok("Received!");
}
[HttpGet("rabbit")]
public async Task<ActionResult> ProduceRabbit()
{
var message = new Message() { Payload = "Rabbit MQ poruka" };
await this.dapr.PublishEventAsync<Message>("rabbit-pubsub", "orders.new", message);
return Ok("Sent!");
}
//[HttpPost("rabbit/subscribe")]
[Route("rabbit/subscribe")]
[HttpPost()]
[Topic("rabbit-pubsub", "orders.new")]
public async Task<ActionResult> SubscribeRabbitAsync(Message message)
{
this.logger.LogInformation("ssage received: " + JsonConvert.SerializeObject(message));
return Ok("Received!");
}
[HttpGet("redis")]
public async Task<ActionResult> ProduceRedis()
{
var message = new Message() { Payload = "Redis poruka" };
await this.dapr.PublishEventAsync<Message>("redis-pubsub", "orders.new", message);
return Ok("Sent!");
}
[HttpPost("redis/subscribe")]
[Topic("redis-pubsub", "orders.new")]
public async Task<ActionResult> SubscribeRedisAsync(Message message)
{
this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
return Ok("Received!");
}
}
public class Message
{
public string Payload { get; set; }
}
}
Startup.cs 应用看起来像
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace Live
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers()
.AddNewtonsoftJson()
.AddDapr();
services.AddHttpClient();
services.AddDaprClient(); //Really no need for this
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
//app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
endpoints.MapControllers();
});
}
}
}
Dapr 配置
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: redis-pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6380
- name: redisPassword
value: ""
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: nats-pubsub
namespace: default
spec:
type: pubsub.jetstream
version: v1
metadata:
- name: natsURL
value: "nats://localhost:4222"
- name: name
value: "alan"
- name: durableName
value: "conversation-durable"
- name: queueGroupName
value: "conversation-group"
# - name: startSequence
# value: 1
# - name: startTime # in Unix format
# value: 1630349391
# - name: deliverAll
# value: false
# - name: flowControl
# value: false
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: rabbit-pubsub
namespace: default
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://localhost:5672"
我的docker-撰写文件
version: '3.4'
services:
nats:
container_name: "Nats"
image: nats
command: [ "-js", "-m", "8222", "-D", "-V" ]
ports:
- "4222:4222"
- "8222:8222"
- "6222:6222"
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- "5672:5672"
- "15672:15672"
postgres:
container_name: "PostgreSQL"
image: postgres
environment:
- POSTGRES_PASSWORD=rotring123
- PGDATA=/var/lib/postgresql/data/pgdata
# volumes:
# - .\docker-volumes\postgreSQL:/var/lib/postgresql/data
ports:
- "8081:8080"
- "5432:5432"
redis:
container_name: Redis
image: redis
ports:
- "6380:6379"
# volumes:
# - .\docker-volumes\redis:/usr/local/etc/redis
# dapr-placement:
# container_name: Dapr-service-descovery
# image: "daprio/dapr:1.0.0"
# command: ["./placement", "-port", "50000", "-log-level", "debug"]
# ports:
# - "50000:50000"
# zipkin:
# image: openzipkin/zipkin-slim
# ports:
# - "5411:9411"
我正在使用命令从 CLI 启动应用程序 dapr run -a live -p 5226 dotnet run
应用程序已启动,当我去获取端点消息时已发送。我可以确认消息已发送到消息代理并且负载正常。此外,Dapr 调用我的 post 端点(每个人的兔子、nats 和 redis),但在方法参数中我收到 Message
的 Payload
属性 的 null
值class.
我遵循了 TrafficControll 示例,在我看来一切都设置正确。
Dapr 运行时间版本:1.4.3
这是日志的屏幕截图:https://prnt.sc/1xa8s14
非常感谢任何帮助!
将 [FromBody]
属性添加到操作方法参数。
例如:
public async Task<ActionResult> SubscribeAsync([FromBody] Message message)
我正在尝试 运行 使用 dotnet 的基本 Dapr 设置。我遵循了文档和示例项目,但现在运气不好。
我使用 net5.0 创建了一个简单的 dotnet web API 应用程序。 API 有一个控制器和三对 get/post 端点。每对都针对特定的发布-订阅提供商(nats、rabbit、Redis)。
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Dapr;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Request.Body.Peeker;
namespace live
{
[ApiController]
[Route("/")]
public class HomeController : ControllerBase
{
private readonly ILogger<HomeController> logger;
private readonly DaprClient dapr;
public HomeController(ILogger<HomeController> logger, DaprClient dapr)
{
this.dapr = dapr;
this.logger = logger;
}
[HttpGet]
public async Task<ActionResult> Produce()
{
var message = new Message() { Payload = "Nats Jetstream poruka" };
await this.dapr.PublishEventAsync<Message>("nats-pubsub", "orders.new", message);
return Ok("Sent!");
}
[HttpPost("nats/subscribe")]
[Topic("nats-pubsub", "orders.new")]
public async Task<ActionResult> SubscribeAsync(Message message)
{
this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
return Ok("Received!");
}
[HttpGet("rabbit")]
public async Task<ActionResult> ProduceRabbit()
{
var message = new Message() { Payload = "Rabbit MQ poruka" };
await this.dapr.PublishEventAsync<Message>("rabbit-pubsub", "orders.new", message);
return Ok("Sent!");
}
//[HttpPost("rabbit/subscribe")]
[Route("rabbit/subscribe")]
[HttpPost()]
[Topic("rabbit-pubsub", "orders.new")]
public async Task<ActionResult> SubscribeRabbitAsync(Message message)
{
this.logger.LogInformation("ssage received: " + JsonConvert.SerializeObject(message));
return Ok("Received!");
}
[HttpGet("redis")]
public async Task<ActionResult> ProduceRedis()
{
var message = new Message() { Payload = "Redis poruka" };
await this.dapr.PublishEventAsync<Message>("redis-pubsub", "orders.new", message);
return Ok("Sent!");
}
[HttpPost("redis/subscribe")]
[Topic("redis-pubsub", "orders.new")]
public async Task<ActionResult> SubscribeRedisAsync(Message message)
{
this.logger.LogInformation("Message received: " + JsonConvert.SerializeObject(message));
return Ok("Received!");
}
}
public class Message
{
public string Payload { get; set; }
}
}
Startup.cs 应用看起来像
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace Live
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers()
.AddNewtonsoftJson()
.AddDapr();
services.AddHttpClient();
services.AddDaprClient(); //Really no need for this
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
//app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
endpoints.MapControllers();
});
}
}
}
Dapr 配置
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: redis-pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6380
- name: redisPassword
value: ""
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: nats-pubsub
namespace: default
spec:
type: pubsub.jetstream
version: v1
metadata:
- name: natsURL
value: "nats://localhost:4222"
- name: name
value: "alan"
- name: durableName
value: "conversation-durable"
- name: queueGroupName
value: "conversation-group"
# - name: startSequence
# value: 1
# - name: startTime # in Unix format
# value: 1630349391
# - name: deliverAll
# value: false
# - name: flowControl
# value: false
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: rabbit-pubsub
namespace: default
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://localhost:5672"
我的docker-撰写文件
version: '3.4'
services:
nats:
container_name: "Nats"
image: nats
command: [ "-js", "-m", "8222", "-D", "-V" ]
ports:
- "4222:4222"
- "8222:8222"
- "6222:6222"
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- "5672:5672"
- "15672:15672"
postgres:
container_name: "PostgreSQL"
image: postgres
environment:
- POSTGRES_PASSWORD=rotring123
- PGDATA=/var/lib/postgresql/data/pgdata
# volumes:
# - .\docker-volumes\postgreSQL:/var/lib/postgresql/data
ports:
- "8081:8080"
- "5432:5432"
redis:
container_name: Redis
image: redis
ports:
- "6380:6379"
# volumes:
# - .\docker-volumes\redis:/usr/local/etc/redis
# dapr-placement:
# container_name: Dapr-service-descovery
# image: "daprio/dapr:1.0.0"
# command: ["./placement", "-port", "50000", "-log-level", "debug"]
# ports:
# - "50000:50000"
# zipkin:
# image: openzipkin/zipkin-slim
# ports:
# - "5411:9411"
我正在使用命令从 CLI 启动应用程序 dapr run -a live -p 5226 dotnet run
应用程序已启动,当我去获取端点消息时已发送。我可以确认消息已发送到消息代理并且负载正常。此外,Dapr 调用我的 post 端点(每个人的兔子、nats 和 redis),但在方法参数中我收到 Message
的 Payload
属性 的 null
值class.
我遵循了 TrafficControll 示例,在我看来一切都设置正确。
Dapr 运行时间版本:1.4.3
这是日志的屏幕截图:https://prnt.sc/1xa8s14
非常感谢任何帮助!
将 [FromBody]
属性添加到操作方法参数。
例如:
public async Task<ActionResult> SubscribeAsync([FromBody] Message message)