在带有 Spring Boot 的 Apache Camel 中,如何将许多外部回调(使用 "from().to()")加入到单个响应中?

In Apache Camel with Spring Boot, how to Join many external callbacks (using "from().to()") into a single response?

已解决!向下滚动到解决方案。

我的实体 Person 在 table A 上有一些基本数据,在 tables B、C、D 等上有更具体的数据(例如地址)。

PersonResponseDTO(汇总):

{
    "id": 1,
    "name": "Test"
}

AddressResponseDTO(汇总):

{
    "person_id": 1,
    "street": "Test St."
}

这些数据来自使用 from("direct:getPersonById").to(getPersonUrl)from("direct:getAddressByPersonId").to(getAddressUrl)(汇总)调用的外部 API。

我创建了第三个对象 AggregatedPersonResponseDTO:

{
    "person": {
        "id": 1,
        "name": "Test"
    },
    "address": {
        "person_id": 1,
        "street": "Test St."
    }
}

是否有一种简单的方法可以在一个请求中加入两个响应,返回一个 AggregatedPersonResponseDTO 类型的对象,只使用 Camel API?我想使用两个响应对象来构建第三个。我将来会有超过两个“连接”的用例。

解法说明

  1. 不需要将 streamCaching 设置为 true 或 false。

  2. 不需要设置HTTP_PATH。

  3. 骆驼路线中的代码:

from("direct:getFullPersonByIdService")
    .toD("http:{{endpoints.get-person-by-id}}?bridgeEndpoint=true")
    .pollEnrich(
        simple("http:{{endpoints.get-address-by-person-id}}?bridgeEndpoint=true"),
        5000,
        new PersonAggregationStrategy(),
        false
    )
    .unmarshal(new JacksonDataFormat(GetAggregatedPersonResponseDTO.class))

双花括号之间的内容是从application.yml或application.properties读取的。

  1. 整个PersonAggregationStrategy class:
@Log4j2
public class PersonAggregationStrategy implements AggregationStrategy {

    @SneakyThrows
    @Override
    public Exchange aggregate(final Exchange exchangePerson,
                              final Exchange exchangeAddress) {
        log.info("Aggregating Person and Address...");

        ObjectMapper objectMapper = new ObjectMapper();

        final GetAggregatedPersonResponseDTO aggregatedPerson = new GetAggregatedPersonResponseDTO();
        aggregatedPerson.setPerson(objectMapper.readValue(exchangePerson.getIn().getBody(String.class), GetPersonResponseDTO.class));
        aggregatedPerson.setAddress(objectMapper.readValue(exchangeAddress.getIn().getBody(String.class), GetAddressResponseDTO.class));

        exchangePerson.getIn().setBody(objectMapper.writeValueAsString(aggregatedPerson));
        log.info("Aggregated object => {}", objectMapper.writeValueAsString(aggregatedPerson));

        return exchangePerson;
    }

}
  1. 我还必须为聚合的结果对象实现 TypeConverters 接口:
@Component
public class AggregatedPersonConverter implements TypeConverters {

    private final ObjectMapper mapper;

    @Autowired
    public AggregatedPersonConverter(ObjectMapper mapper) {
        this.mapper = mapper;
    }

    @Converter
    public InputStream getAggregatedPersonResponseDTOToInputStream(GetAggregatedPersonResponseDTO source) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();

        try {
            ObjectOutputStream oos = new ObjectOutputStream(baos);

            oos.writeObject(source);

            oos.flush();
            oos.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        return new ByteArrayInputStream(baos.toByteArray());
    }

}
  1. 我不知道它是否适用于两个以上的回调。也许它需要 AggregationStrategy 的其他实现。有一天我会测试这个用例。

您必须建立一个路由模型,用第二个 Web 服务结果丰富第一个 Web 服务结果。必须在 AggregationStrategy 实例中指定合并两个响应的方式。

查看丰富的EIP: https://camel.apache.org/components/3.14.x/eips/enrich-eip.html