在带有 Spring Boot 的 Apache Camel 中,如何将许多外部回调(使用 "from().to()")加入到单个响应中?
In Apache Camel with Spring Boot, how to Join many external callbacks (using "from().to()") into a single response?
已解决!向下滚动到解决方案。
我的实体 Person 在 table A 上有一些基本数据,在 tables B、C、D 等上有更具体的数据(例如地址)。
PersonResponseDTO(汇总):
{
"id": 1,
"name": "Test"
}
AddressResponseDTO(汇总):
{
"person_id": 1,
"street": "Test St."
}
这些数据来自使用 from("direct:getPersonById").to(getPersonUrl)
和 from("direct:getAddressByPersonId").to(getAddressUrl)
(汇总)调用的外部 API。
我创建了第三个对象 AggregatedPersonResponseDTO:
{
"person": {
"id": 1,
"name": "Test"
},
"address": {
"person_id": 1,
"street": "Test St."
}
}
是否有一种简单的方法可以在一个请求中加入两个响应,返回一个 AggregatedPersonResponseDTO 类型的对象,只使用 Camel API?我想使用两个响应对象来构建第三个。我将来会有超过两个“连接”的用例。
解法说明
不需要将 streamCaching 设置为 true 或 false。
不需要设置HTTP_PATH。
骆驼路线中的代码:
from("direct:getFullPersonByIdService")
.toD("http:{{endpoints.get-person-by-id}}?bridgeEndpoint=true")
.pollEnrich(
simple("http:{{endpoints.get-address-by-person-id}}?bridgeEndpoint=true"),
5000,
new PersonAggregationStrategy(),
false
)
.unmarshal(new JacksonDataFormat(GetAggregatedPersonResponseDTO.class))
双花括号之间的内容是从application.yml或application.properties读取的。
- 整个PersonAggregationStrategy class:
@Log4j2
public class PersonAggregationStrategy implements AggregationStrategy {
@SneakyThrows
@Override
public Exchange aggregate(final Exchange exchangePerson,
final Exchange exchangeAddress) {
log.info("Aggregating Person and Address...");
ObjectMapper objectMapper = new ObjectMapper();
final GetAggregatedPersonResponseDTO aggregatedPerson = new GetAggregatedPersonResponseDTO();
aggregatedPerson.setPerson(objectMapper.readValue(exchangePerson.getIn().getBody(String.class), GetPersonResponseDTO.class));
aggregatedPerson.setAddress(objectMapper.readValue(exchangeAddress.getIn().getBody(String.class), GetAddressResponseDTO.class));
exchangePerson.getIn().setBody(objectMapper.writeValueAsString(aggregatedPerson));
log.info("Aggregated object => {}", objectMapper.writeValueAsString(aggregatedPerson));
return exchangePerson;
}
}
- 我还必须为聚合的结果对象实现 TypeConverters 接口:
@Component
public class AggregatedPersonConverter implements TypeConverters {
private final ObjectMapper mapper;
@Autowired
public AggregatedPersonConverter(ObjectMapper mapper) {
this.mapper = mapper;
}
@Converter
public InputStream getAggregatedPersonResponseDTOToInputStream(GetAggregatedPersonResponseDTO source) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(source);
oos.flush();
oos.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return new ByteArrayInputStream(baos.toByteArray());
}
}
- 我不知道它是否适用于两个以上的回调。也许它需要 AggregationStrategy 的其他实现。有一天我会测试这个用例。
您必须建立一个路由模型,用第二个 Web 服务结果丰富第一个 Web 服务结果。必须在 AggregationStrategy
实例中指定合并两个响应的方式。
查看丰富的EIP:
https://camel.apache.org/components/3.14.x/eips/enrich-eip.html
已解决!向下滚动到解决方案。
我的实体 Person 在 table A 上有一些基本数据,在 tables B、C、D 等上有更具体的数据(例如地址)。
PersonResponseDTO(汇总):
{
"id": 1,
"name": "Test"
}
AddressResponseDTO(汇总):
{
"person_id": 1,
"street": "Test St."
}
这些数据来自使用 from("direct:getPersonById").to(getPersonUrl)
和 from("direct:getAddressByPersonId").to(getAddressUrl)
(汇总)调用的外部 API。
我创建了第三个对象 AggregatedPersonResponseDTO:
{
"person": {
"id": 1,
"name": "Test"
},
"address": {
"person_id": 1,
"street": "Test St."
}
}
是否有一种简单的方法可以在一个请求中加入两个响应,返回一个 AggregatedPersonResponseDTO 类型的对象,只使用 Camel API?我想使用两个响应对象来构建第三个。我将来会有超过两个“连接”的用例。
解法说明
不需要将 streamCaching 设置为 true 或 false。
不需要设置HTTP_PATH。
骆驼路线中的代码:
from("direct:getFullPersonByIdService")
.toD("http:{{endpoints.get-person-by-id}}?bridgeEndpoint=true")
.pollEnrich(
simple("http:{{endpoints.get-address-by-person-id}}?bridgeEndpoint=true"),
5000,
new PersonAggregationStrategy(),
false
)
.unmarshal(new JacksonDataFormat(GetAggregatedPersonResponseDTO.class))
双花括号之间的内容是从application.yml或application.properties读取的。
- 整个PersonAggregationStrategy class:
@Log4j2
public class PersonAggregationStrategy implements AggregationStrategy {
@SneakyThrows
@Override
public Exchange aggregate(final Exchange exchangePerson,
final Exchange exchangeAddress) {
log.info("Aggregating Person and Address...");
ObjectMapper objectMapper = new ObjectMapper();
final GetAggregatedPersonResponseDTO aggregatedPerson = new GetAggregatedPersonResponseDTO();
aggregatedPerson.setPerson(objectMapper.readValue(exchangePerson.getIn().getBody(String.class), GetPersonResponseDTO.class));
aggregatedPerson.setAddress(objectMapper.readValue(exchangeAddress.getIn().getBody(String.class), GetAddressResponseDTO.class));
exchangePerson.getIn().setBody(objectMapper.writeValueAsString(aggregatedPerson));
log.info("Aggregated object => {}", objectMapper.writeValueAsString(aggregatedPerson));
return exchangePerson;
}
}
- 我还必须为聚合的结果对象实现 TypeConverters 接口:
@Component
public class AggregatedPersonConverter implements TypeConverters {
private final ObjectMapper mapper;
@Autowired
public AggregatedPersonConverter(ObjectMapper mapper) {
this.mapper = mapper;
}
@Converter
public InputStream getAggregatedPersonResponseDTOToInputStream(GetAggregatedPersonResponseDTO source) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(source);
oos.flush();
oos.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return new ByteArrayInputStream(baos.toByteArray());
}
}
- 我不知道它是否适用于两个以上的回调。也许它需要 AggregationStrategy 的其他实现。有一天我会测试这个用例。
您必须建立一个路由模型,用第二个 Web 服务结果丰富第一个 Web 服务结果。必须在 AggregationStrategy
实例中指定合并两个响应的方式。
查看丰富的EIP: https://camel.apache.org/components/3.14.x/eips/enrich-eip.html