Spring 集成丰富拆分
Spring integration enrich with split
假设我有一个产品,我需要用下载的图像 ID 来丰富这个产品。
作为输入消息,我有 java pojo。为简单起见,将其表示为 json :
{
"id" : "productId",
"price" : 10000,
"productPhotos" : ["http://url1", "http://url2", ...],
"marketPhotos" : ["http://url1", "http://url2", ...]
}
我还有可轮询的频道,可以 = 下载图像并将其放在存储中的某个地方,return 下载照片 ID
@Bean
public IntegrationFlow imageDownloadFlow() {
return IntegrationFlows.from(inputChannel())
.transform(Message.class, messageTransformer::transformToImageMassage, e -> e.poller(queuePoller()))
.transform(imageDownloader::download)
.transform(imageS3Uploader::upload)
.channel(outputChannel())
.get();
}
因此,我的任务是并行 运行 "productPhotos" 和 "marketPhotos" 并使用下载的 ID 丰富产品消息。
例如
{
"id" : "productId",
"price" : 10000,
"productPhotos" : ["id1", "id2", ...],
"marketPhotos" : ["id3", "id4", ...]
}
是否可以在 IntegrationFlows 中进行丰富?
是的,在 requestChannel
和图像下载器下游的聚合器中使用 ContentEnricher
和 PublishSubscribeChannel
(带有任务执行器)。
使用 enrich()
DSL 方法。
编辑
这是一个例子:
@SpringBootApplication
public class So57357544Application {
public static void main(String[] args) {
SpringApplication.run(So57357544Application.class, args);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(() -> new Pojo("one", 42, Collections.singletonList("https://localhost/foo"),
Collections.singletonList("https://localhost/bar")),
e -> e.poller(Pollers.fixedRate(50000)))
.enrich(enr -> enr.requestChannel("enricherFlow.input")
.<List<Pojo>>propertyFunction("productPhotos", msg -> {
List<String> photos = msg.getPayload().get(0).getProductPhotos();
photos.addAll(msg.getPayload().get(1).getProductPhotos());
return photos;
})
.<List<Pojo>>propertyFunction("marketPhotos", msg -> {
List<String> photos = msg.getPayload().get(0).getMarketPhotos();
photos.addAll(msg.getPayload().get(1).getMarketPhotos());
return photos;
}))
.log()
.get();
}
@Bean
public IntegrationFlow enricherFlow() {
return f -> f
.<Pojo, Pojo> transform(pojo -> new Pojo(pojo.getId(), pojo.getPrice(),
pojo.getProductPhotos(),
pojo.getMarketPhotos()))
.publishSubscribeChannel(exec(), ps -> ps
.applySequence(true)
.subscribe(f1 -> f1.handle("urlToId", "product").channel("aggregator.input"))
.subscribe(f1 -> f1.handle("urlToId", "market").channel("aggregator.input")));
}
@Bean
public IntegrationFlow aggregator() {
return f -> f.aggregate();
}
@Bean
public Executor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(2);
return exec;
}
}
@Component
class UrlToId {
public Pojo product(Pojo pojo) {
List<String> productPhotos = pojo.getProductPhotos().stream()
.map(phot -> phot.substring(phot.lastIndexOf('/')))
.collect(Collectors.toList());
return new Pojo(pojo.getId(), pojo.getPrice(), productPhotos, Collections.emptyList());
}
public Pojo market(Pojo pojo) {
List<String> marketPhotos = pojo.getMarketPhotos().stream()
.map(phot -> phot.substring(phot.lastIndexOf('/')))
.collect(Collectors.toList());
return new Pojo(pojo.getId(), pojo.getPrice(), Collections.emptyList(), marketPhotos);
}
}
class Pojo {
private final String id;
private final int price;
private final List<String> productPhotos = new ArrayList<>();
private final List<String> marketPhotos = new ArrayList<>();
public Pojo(String id, int price, List<String> productPhotes, List<String> marketPhotos) {
this.id = id;
this.price = price;
setProductPhotos(productPhotes);
setMarketPhotos(marketPhotos);
}
public String getId() {
return this.id;
}
public int getPrice() {
return this.price;
}
public List<String> getProductPhotos() {
return new ArrayList<>(this.productPhotos);
}
public List<String> getMarketPhotos() {
return new ArrayList<>(this.marketPhotos);
}
public final void setProductPhotos(List<String> photos) {
if (photos.size() > 0) {
this.productPhotos.clear();
this.productPhotos.addAll(photos);
}
}
public final void setMarketPhotos(List<String> photos) {
if (photos.size() > 0) {
this.marketPhotos.clear();
this.marketPhotos.addAll(photos);
}
}
@Override
public String toString() {
return "Pojo [id=" + this.id + ", price=" + this.price
+ ", productPhotos=" + this.productPhotos
+ ", marketPhotos=" + this.marketPhotos + "]";
}
}
假设我有一个产品,我需要用下载的图像 ID 来丰富这个产品。
作为输入消息,我有 java pojo。为简单起见,将其表示为 json :
{
"id" : "productId",
"price" : 10000,
"productPhotos" : ["http://url1", "http://url2", ...],
"marketPhotos" : ["http://url1", "http://url2", ...]
}
我还有可轮询的频道,可以 = 下载图像并将其放在存储中的某个地方,return 下载照片 ID
@Bean
public IntegrationFlow imageDownloadFlow() {
return IntegrationFlows.from(inputChannel())
.transform(Message.class, messageTransformer::transformToImageMassage, e -> e.poller(queuePoller()))
.transform(imageDownloader::download)
.transform(imageS3Uploader::upload)
.channel(outputChannel())
.get();
}
因此,我的任务是并行 运行 "productPhotos" 和 "marketPhotos" 并使用下载的 ID 丰富产品消息。 例如
{
"id" : "productId",
"price" : 10000,
"productPhotos" : ["id1", "id2", ...],
"marketPhotos" : ["id3", "id4", ...]
}
是否可以在 IntegrationFlows 中进行丰富?
是的,在 requestChannel
和图像下载器下游的聚合器中使用 ContentEnricher
和 PublishSubscribeChannel
(带有任务执行器)。
使用 enrich()
DSL 方法。
编辑
这是一个例子:
@SpringBootApplication
public class So57357544Application {
public static void main(String[] args) {
SpringApplication.run(So57357544Application.class, args);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(() -> new Pojo("one", 42, Collections.singletonList("https://localhost/foo"),
Collections.singletonList("https://localhost/bar")),
e -> e.poller(Pollers.fixedRate(50000)))
.enrich(enr -> enr.requestChannel("enricherFlow.input")
.<List<Pojo>>propertyFunction("productPhotos", msg -> {
List<String> photos = msg.getPayload().get(0).getProductPhotos();
photos.addAll(msg.getPayload().get(1).getProductPhotos());
return photos;
})
.<List<Pojo>>propertyFunction("marketPhotos", msg -> {
List<String> photos = msg.getPayload().get(0).getMarketPhotos();
photos.addAll(msg.getPayload().get(1).getMarketPhotos());
return photos;
}))
.log()
.get();
}
@Bean
public IntegrationFlow enricherFlow() {
return f -> f
.<Pojo, Pojo> transform(pojo -> new Pojo(pojo.getId(), pojo.getPrice(),
pojo.getProductPhotos(),
pojo.getMarketPhotos()))
.publishSubscribeChannel(exec(), ps -> ps
.applySequence(true)
.subscribe(f1 -> f1.handle("urlToId", "product").channel("aggregator.input"))
.subscribe(f1 -> f1.handle("urlToId", "market").channel("aggregator.input")));
}
@Bean
public IntegrationFlow aggregator() {
return f -> f.aggregate();
}
@Bean
public Executor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(2);
return exec;
}
}
@Component
class UrlToId {
public Pojo product(Pojo pojo) {
List<String> productPhotos = pojo.getProductPhotos().stream()
.map(phot -> phot.substring(phot.lastIndexOf('/')))
.collect(Collectors.toList());
return new Pojo(pojo.getId(), pojo.getPrice(), productPhotos, Collections.emptyList());
}
public Pojo market(Pojo pojo) {
List<String> marketPhotos = pojo.getMarketPhotos().stream()
.map(phot -> phot.substring(phot.lastIndexOf('/')))
.collect(Collectors.toList());
return new Pojo(pojo.getId(), pojo.getPrice(), Collections.emptyList(), marketPhotos);
}
}
class Pojo {
private final String id;
private final int price;
private final List<String> productPhotos = new ArrayList<>();
private final List<String> marketPhotos = new ArrayList<>();
public Pojo(String id, int price, List<String> productPhotes, List<String> marketPhotos) {
this.id = id;
this.price = price;
setProductPhotos(productPhotes);
setMarketPhotos(marketPhotos);
}
public String getId() {
return this.id;
}
public int getPrice() {
return this.price;
}
public List<String> getProductPhotos() {
return new ArrayList<>(this.productPhotos);
}
public List<String> getMarketPhotos() {
return new ArrayList<>(this.marketPhotos);
}
public final void setProductPhotos(List<String> photos) {
if (photos.size() > 0) {
this.productPhotos.clear();
this.productPhotos.addAll(photos);
}
}
public final void setMarketPhotos(List<String> photos) {
if (photos.size() > 0) {
this.marketPhotos.clear();
this.marketPhotos.addAll(photos);
}
}
@Override
public String toString() {
return "Pojo [id=" + this.id + ", price=" + this.price
+ ", productPhotos=" + this.productPhotos
+ ", marketPhotos=" + this.marketPhotos + "]";
}
}