Spring 集成:使用 header 转换和路由

Spring Integration: Transform and route with header

我正在构建一个 Spring-based 库,它应该在将消息转换为正确的类型后使用消息并将其传递到配置的通道。我的图书馆可通过成对列表配置 "streamToConsume: FinalChannelDestination"

streams:
    source: destinationChannel

我想要一个像下面这样的 IntegrationFlow:

IntegrationFlows
        .from(kinesisInboundChannelAdapter(amazonKinesis(), streamNames))
        .transform(new IssuanceTransformer())
        .route(router())
        .get();

public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter(AwsHeaders.STREAM);
    consumerClientProperties.getKinesis().getStreams().forEach((k, v) ->
        router.setChannelMapping(k, v)
    );
    return router;
  }

转换事件,然后将它们传送到配置中映射到流的通道。如何在转换后保留事件 header 以便能够将其发送到正确的频道?

谢谢

我相信您担心在 IssuanceTransformer 之后不再需要 AwsHeaders.STREAM header。当您开发自定义转换器时,您需要确保将所有 header 从请求消息传输到回复消息:与许多其他组件不同,转换器不会修改来自 POJO 的回复消息。

为此,您可以使用类似的东西:

MessageBuilder.withPayload(myPayload).copyHeadersIfAbsent(requestMessage.getHeaders()).build();

注意:您可以使用 AwsHeaders.RECEIVED_STREAM,因为正是这个 KinesisMessageDrivenChannelAdapter:

private void performSend(AbstractIntegrationMessageBuilder<?> messageBuilder, Object rawRecord) {
        messageBuilder.setHeader(AwsHeaders.RECEIVED_STREAM, this.shardOffset.getStream())
                .setHeader(AwsHeaders.SHARD, this.shardOffset.getShard());

        if (CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
            messageBuilder.setHeader(AwsHeaders.CHECKPOINTER, this.checkpointer);
        }