抽象 Spring Cloud Stream 生产者和消费者代码
Abstracting Spring Cloud Stream Producer and Consumer code
我有一项服务正在生成和使用来自不同 Spring 云流通道(绑定到 EventHub/Kafka 主题)的消息。有几个这样的服务,它们的设置相似。
配置如下所示
public interface MessageStreams {
String WORKSPACE = "workspace";
String UPLOADNOTIFICATION = "uploadnotification";
String BLOBNOTIFICATION = "blobnotification";
String INGESTIONSTATUS = "ingestionstatusproducer";
@Input(WORKSPACE)
SubscribableChannel workspaceChannel();
@Output(UPLOADNOTIFICATION)
MessageChannel uploadNotificationChannel();
@Input(BLOBNOTIFICATION)
SubscribableChannel blobNotificationChannel();
@Output(INGESTIONSTATUS)
MessageChannel ingestionStatusChannel();
}
@EnableBinding(MessageStreams.class)
public class EventHubStreamsConfiguration {
}
Producer/Publisher 代码如下所示
@Service
@Slf4j
public class IngestionStatusEventPublisher {
private final MessageStreams messageStreams;
public IngestionStatusEventPublisher(MessageStreams messageStreams) {
this.messageStreams = messageStreams;
}
public void sendIngestionStatusEvent() {
log.info("Sending ingestion status event");
System.out.println("Sending ingestion status event");
MessageChannel messageChannel = messageStreams.ingestionStatusChannel();
boolean messageSent = messageChannel.send(MessageBuilder
.withPayload(IngestionStatusMessage.builder()
.correlationId("some-correlation-id")
.status("done")
.source("some-source")
.eventTime(OffsetDateTime.now())
.build())
.setHeader("tenant-id", "some-tenant")
.build());
log.info("Ingestion status event sent successfully {}", messageSent);
}
}
同样,我还有多个发布到不同事件的其他发布者 Hubs/Topics。请注意,每个已发布的消息都设置了 tenant-id header。这是我的 multi-tenant 应用程序特有的东西,用于跟踪租户上下文。另请注意,我在发送消息时正在获取要发布到的频道。
我的消费者代码如下所示
@Component
@Slf4j
public class IngestionStatusEventHandler {
private AtomicInteger eventCount = new AtomicInteger();
@StreamListener(TestMessageStreams.INGESTIONSTATUS)
public void handleEvent(@Payload IngestionStatusMessage message, @Header(name = "tenant-id") String tenantId) throws Exception {
log.info("New ingestion status event received: {} in Consumer: {}", message, Thread.currentThread().getName());
// set the tenant context as thread local from the header.
}
同样,我有几个这样的消费者,并且每个消费者都有一个基于发布者发送的传入 tenant-id header 设置的租户上下文。
我的问题是
如何摆脱在 Publisher 中设置 tenant-id header 和在 Consumer 中设置租户上下文的样板代码,方法是将其抽象到一个可以包含在所有库中的库中我拥有的不同服务。
另外,有没有一种方法可以根据发布的消息类型动态识别频道。 for ex IngestionStatusMessage.class in the given scenario
要在公共代码中设置 tenant-id
header 并避免在每个微服务中使用 copy/pasting,您可以使用 ChannelInterceptor
并将其设为全局代码@GlobalChannelInterceptor
及其 patterns
选项。
在 Spring 中查看更多信息集成:https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/core.html#channel-interceptors
您不能通过有效负载类型进行频道选择,因为有效负载类型实际上是由 @StreamListener
方法签名确定的。
您可以尝试使用具有 Message<?>
期望的通用 @Router
,然后 return 根据该请求消息上下文路由的特定频道名称。
我有一项服务正在生成和使用来自不同 Spring 云流通道(绑定到 EventHub/Kafka 主题)的消息。有几个这样的服务,它们的设置相似。
配置如下所示
public interface MessageStreams {
String WORKSPACE = "workspace";
String UPLOADNOTIFICATION = "uploadnotification";
String BLOBNOTIFICATION = "blobnotification";
String INGESTIONSTATUS = "ingestionstatusproducer";
@Input(WORKSPACE)
SubscribableChannel workspaceChannel();
@Output(UPLOADNOTIFICATION)
MessageChannel uploadNotificationChannel();
@Input(BLOBNOTIFICATION)
SubscribableChannel blobNotificationChannel();
@Output(INGESTIONSTATUS)
MessageChannel ingestionStatusChannel();
}
@EnableBinding(MessageStreams.class)
public class EventHubStreamsConfiguration {
}
Producer/Publisher 代码如下所示
@Service
@Slf4j
public class IngestionStatusEventPublisher {
private final MessageStreams messageStreams;
public IngestionStatusEventPublisher(MessageStreams messageStreams) {
this.messageStreams = messageStreams;
}
public void sendIngestionStatusEvent() {
log.info("Sending ingestion status event");
System.out.println("Sending ingestion status event");
MessageChannel messageChannel = messageStreams.ingestionStatusChannel();
boolean messageSent = messageChannel.send(MessageBuilder
.withPayload(IngestionStatusMessage.builder()
.correlationId("some-correlation-id")
.status("done")
.source("some-source")
.eventTime(OffsetDateTime.now())
.build())
.setHeader("tenant-id", "some-tenant")
.build());
log.info("Ingestion status event sent successfully {}", messageSent);
}
}
同样,我还有多个发布到不同事件的其他发布者 Hubs/Topics。请注意,每个已发布的消息都设置了 tenant-id header。这是我的 multi-tenant 应用程序特有的东西,用于跟踪租户上下文。另请注意,我在发送消息时正在获取要发布到的频道。
我的消费者代码如下所示
@Component
@Slf4j
public class IngestionStatusEventHandler {
private AtomicInteger eventCount = new AtomicInteger();
@StreamListener(TestMessageStreams.INGESTIONSTATUS)
public void handleEvent(@Payload IngestionStatusMessage message, @Header(name = "tenant-id") String tenantId) throws Exception {
log.info("New ingestion status event received: {} in Consumer: {}", message, Thread.currentThread().getName());
// set the tenant context as thread local from the header.
}
同样,我有几个这样的消费者,并且每个消费者都有一个基于发布者发送的传入 tenant-id header 设置的租户上下文。
我的问题是
如何摆脱在 Publisher 中设置 tenant-id header 和在 Consumer 中设置租户上下文的样板代码,方法是将其抽象到一个可以包含在所有库中的库中我拥有的不同服务。
另外,有没有一种方法可以根据发布的消息类型动态识别频道。 for ex IngestionStatusMessage.class in the given scenario
要在公共代码中设置 tenant-id
header 并避免在每个微服务中使用 copy/pasting,您可以使用 ChannelInterceptor
并将其设为全局代码@GlobalChannelInterceptor
及其 patterns
选项。
在 Spring 中查看更多信息集成:https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/core.html#channel-interceptors
您不能通过有效负载类型进行频道选择,因为有效负载类型实际上是由 @StreamListener
方法签名确定的。
您可以尝试使用具有 Message<?>
期望的通用 @Router
,然后 return 根据该请求消息上下文路由的特定频道名称。