Spring 集成丰富拆分

Spring integration enrich with split

假设我有一个产品,我需要用下载的图像 ID 来丰富这个产品。

作为输入消息,我有 java pojo。为简单起见,将其表示为 json :

{
    "id" : "productId",
    "price" : 10000,
    "productPhotos" : ["http://url1", "http://url2", ...],
    "marketPhotos" : ["http://url1", "http://url2", ...]
}

我还有可轮询的频道,可以 = 下载图像并将其放在存储中的某个地方,return 下载照片 ID

@Bean
    public IntegrationFlow imageDownloadFlow() {
        return IntegrationFlows.from(inputChannel())
                .transform(Message.class, messageTransformer::transformToImageMassage, e -> e.poller(queuePoller()))
                .transform(imageDownloader::download)
                .transform(imageS3Uploader::upload)
                .channel(outputChannel())
                .get();
    }

因此,我的任务是并行 运行 "productPhotos" 和 "marketPhotos" 并使用下载的 ID 丰富产品消息。 例如

{
    "id" : "productId",
    "price" : 10000,
    "productPhotos" : ["id1", "id2", ...],
    "marketPhotos" : ["id3", "id4", ...]
}

是否可以在 IntegrationFlows 中进行丰富?

是的,在 requestChannel 和图像下载器下游的聚合器中使用 ContentEnricherPublishSubscribeChannel(带有任务执行器)。

使用 enrich() DSL 方法。

编辑

这是一个例子:

@SpringBootApplication
public class So57357544Application {

    public static void main(String[] args) {
        SpringApplication.run(So57357544Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(() -> new Pojo("one", 42, Collections.singletonList("https://localhost/foo"),
                    Collections.singletonList("https://localhost/bar")),
                    e -> e.poller(Pollers.fixedRate(50000)))
                .enrich(enr -> enr.requestChannel("enricherFlow.input")
                        .<List<Pojo>>propertyFunction("productPhotos", msg -> {
                            List<String> photos = msg.getPayload().get(0).getProductPhotos();
                            photos.addAll(msg.getPayload().get(1).getProductPhotos());
                            return photos;
                        })
                        .<List<Pojo>>propertyFunction("marketPhotos", msg -> {
                            List<String> photos = msg.getPayload().get(0).getMarketPhotos();
                            photos.addAll(msg.getPayload().get(1).getMarketPhotos());
                            return photos;
                        }))
                .log()
                .get();
    }

    @Bean
    public IntegrationFlow enricherFlow() {
        return f -> f
                .<Pojo, Pojo> transform(pojo -> new Pojo(pojo.getId(), pojo.getPrice(),
                        pojo.getProductPhotos(),
                        pojo.getMarketPhotos()))
                    .publishSubscribeChannel(exec(), ps -> ps
                            .applySequence(true)
                            .subscribe(f1 -> f1.handle("urlToId", "product").channel("aggregator.input"))
                            .subscribe(f1 -> f1.handle("urlToId", "market").channel("aggregator.input")));
    }

    @Bean
    public IntegrationFlow aggregator() {
        return f -> f.aggregate();
    }

    @Bean
    public Executor exec() {
        ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
        exec.setCorePoolSize(2);
        return exec;
    }

}

@Component
class UrlToId {

    public Pojo product(Pojo pojo) {
        List<String> productPhotos = pojo.getProductPhotos().stream()
                .map(phot -> phot.substring(phot.lastIndexOf('/')))
                .collect(Collectors.toList());
        return new Pojo(pojo.getId(), pojo.getPrice(), productPhotos, Collections.emptyList());
    }

    public Pojo market(Pojo pojo) {
        List<String> marketPhotos = pojo.getMarketPhotos().stream()
                .map(phot -> phot.substring(phot.lastIndexOf('/')))
                .collect(Collectors.toList());
        return new Pojo(pojo.getId(), pojo.getPrice(), Collections.emptyList(), marketPhotos);
    }

}

class Pojo {

    private final String id;

    private final int price;

    private final List<String> productPhotos = new ArrayList<>();

    private final List<String> marketPhotos = new ArrayList<>();

    public Pojo(String id, int price, List<String> productPhotes, List<String> marketPhotos) {
        this.id = id;
        this.price = price;
        setProductPhotos(productPhotes);
        setMarketPhotos(marketPhotos);
    }

    public String getId() {
        return this.id;
    }

    public int getPrice() {
        return this.price;
    }

    public List<String> getProductPhotos() {
        return new ArrayList<>(this.productPhotos);
    }

    public List<String> getMarketPhotos() {
        return new ArrayList<>(this.marketPhotos);
    }

    public final void setProductPhotos(List<String> photos) {
        if (photos.size() > 0) {
            this.productPhotos.clear();
            this.productPhotos.addAll(photos);
        }
    }

    public final void setMarketPhotos(List<String> photos) {
        if (photos.size() > 0) {
            this.marketPhotos.clear();
            this.marketPhotos.addAll(photos);
        }
    }

    @Override
    public String toString() {
        return "Pojo [id=" + this.id + ", price=" + this.price
                + ", productPhotos=" + this.productPhotos
                + ", marketPhotos=" + this.marketPhotos + "]";
    }

}