拆分、丰富项目然后将每个项目发送到另一个渠道的正确方法?
Right way to split, enrich items then send each item to another channel?
这是拆分项目列表、丰富每个项目然后将每个丰富的项目发送到另一个渠道的正确方法吗?
似乎即使每个项目都在丰富,但只有最后一个被发送到输出通道...
这是我测试的截图,我从流程中看到只调用了 page2。
this.sitePackage = new Package();
this.sitePackage.add(page1);
this.sitePackage.add(page2);
this.sitePackage.add(page3);
//Publish using gateway
this.publishingService.publish(sitePackage);
如果我这样做...
this.sitePackage.add(page1);
this.sitePackage.add(page1);
this.sitePackage.add(page2);
this.sitePackage.add(page2);
this.sitePackage.add(page3);
this.sitePackage.add(page3);
我看到 所有 正在发布的页面,但最后一个是第 2 页而不是第 3 页(尽管从调试中我可以看到该实例具有第 3 页属性)。
似乎所有其他项目都被流量看到了...
我的流程是这样的...
从 PublishPackage 流程开始。这是主要的入口流程,旨在将项目从包中拆分出来,并在丰富有效负载后将每个项目发送到附加到 publishPackageItem 通道的流程...
@Bean
IntegrationFlow flowPublishPackage()
{
return flow -> flow
.channel(this.publishPackageChannel())
.<Package>handle((p, h) -> this.savePackage(p))
.split(Package.class, this::splitPackage)
.channel(this.publishPackageItemChannel());
}
@Bean
@PublishPackageChannel
MessageChannel publishPackageChannel()
{
return MessageChannels.direct().get();
}
@Bean
@PublishPackageItemChannel
MessageChannel publishPackageItemChannel()
{
return MessageChannels.direct().get();
}
@Splitter
List<PackageEntry> splitPackage(final Package bundle)
{
final List<PackageEntry> enrichedEntries = new ArrayList<>();
for (final PackageEntry entry : bundle.getItems())
{
enrichedEntries.add(entry);
}
return enrichedEntries;
}
@Bean
GatewayProxyFactoryBean publishingGateway()
{
final GatewayProxyFactoryBean proxy = new GatewayProxyFactoryBean(PublishingService.class);
proxy.setBeanFactory(this.beanFactory);
proxy.setDefaultRequestChannel(this.publishPackageChannel());
proxy.setDefaultReplyChannel(this.publishPackageChannel());
proxy.afterPropertiesSet();
return proxy;
}
接下来,CMS发布流附加到publishPackageItem通道,并根据拆分后的类型,路由到特定的元素通道进行处理。拆分页面后只有特定的元素类型可能有订阅流。
@Inject
public CmsPublishFlow(@PublishPackageItemChannel final MessageChannel channelPublishPackageItem)
{
this.channelPublishPackageItem = channelPublishPackageItem;
}
@Bean
@PublishPageChannel
MessageChannel channelPublishPage()
{
return MessageChannels.direct().get();
}
@Bean
IntegrationFlow flowPublishContent()
{
return flow -> flow
.channel(this.channelPublishPackageItem)
.filter(PackageEntry.class, p -> p.getEntry() instanceof Page)
.transform(PackageEntry.class, PackageEntry::getEntry)
.split(Page.class, this::traversePageElements)
.<Content, String>route(Content::getType, mapping -> mapping
.resolutionRequired(false)
.subFlowMapping(PAGE, sf -> sf.channel(channelPublishPage()))
.subFlowMapping(IMAGE, sf -> sf.channel(channelPublishAsset()))
.defaultOutputToParentFlow());
//.channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}
最后,我的目标是订阅频道并相应地处理每个元素。我将此流订阅到 channelPublishPage。每个订阅者可能会以不同方式处理元素。
@Inject
@PublishPageChannel
private MessageChannel channelPublishPage;
@Bean
IntegrationFlow flowPublishPage()
{
return flow -> flow
.channel(this.channelPublishPage)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<Page>handle((p, h) -> this
.generatePage(p))));
}
我怎么感觉问题出在这里:
proxy.setDefaultRequestChannel(this.publishPackageChannel());
proxy.setDefaultReplyChannel(this.publishPackageChannel());
考虑不要对请求和等待回复使用相同的渠道。这样你会带来一些循环和真正意想不到的行为。
这是拆分项目列表、丰富每个项目然后将每个丰富的项目发送到另一个渠道的正确方法吗?
似乎即使每个项目都在丰富,但只有最后一个被发送到输出通道...
这是我测试的截图,我从流程中看到只调用了 page2。
this.sitePackage = new Package();
this.sitePackage.add(page1);
this.sitePackage.add(page2);
this.sitePackage.add(page3);
//Publish using gateway
this.publishingService.publish(sitePackage);
如果我这样做...
this.sitePackage.add(page1);
this.sitePackage.add(page1);
this.sitePackage.add(page2);
this.sitePackage.add(page2);
this.sitePackage.add(page3);
this.sitePackage.add(page3);
我看到 所有 正在发布的页面,但最后一个是第 2 页而不是第 3 页(尽管从调试中我可以看到该实例具有第 3 页属性)。
似乎所有其他项目都被流量看到了...
我的流程是这样的...
从 PublishPackage 流程开始。这是主要的入口流程,旨在将项目从包中拆分出来,并在丰富有效负载后将每个项目发送到附加到 publishPackageItem 通道的流程...
@Bean
IntegrationFlow flowPublishPackage()
{
return flow -> flow
.channel(this.publishPackageChannel())
.<Package>handle((p, h) -> this.savePackage(p))
.split(Package.class, this::splitPackage)
.channel(this.publishPackageItemChannel());
}
@Bean
@PublishPackageChannel
MessageChannel publishPackageChannel()
{
return MessageChannels.direct().get();
}
@Bean
@PublishPackageItemChannel
MessageChannel publishPackageItemChannel()
{
return MessageChannels.direct().get();
}
@Splitter
List<PackageEntry> splitPackage(final Package bundle)
{
final List<PackageEntry> enrichedEntries = new ArrayList<>();
for (final PackageEntry entry : bundle.getItems())
{
enrichedEntries.add(entry);
}
return enrichedEntries;
}
@Bean
GatewayProxyFactoryBean publishingGateway()
{
final GatewayProxyFactoryBean proxy = new GatewayProxyFactoryBean(PublishingService.class);
proxy.setBeanFactory(this.beanFactory);
proxy.setDefaultRequestChannel(this.publishPackageChannel());
proxy.setDefaultReplyChannel(this.publishPackageChannel());
proxy.afterPropertiesSet();
return proxy;
}
接下来,CMS发布流附加到publishPackageItem通道,并根据拆分后的类型,路由到特定的元素通道进行处理。拆分页面后只有特定的元素类型可能有订阅流。
@Inject
public CmsPublishFlow(@PublishPackageItemChannel final MessageChannel channelPublishPackageItem)
{
this.channelPublishPackageItem = channelPublishPackageItem;
}
@Bean
@PublishPageChannel
MessageChannel channelPublishPage()
{
return MessageChannels.direct().get();
}
@Bean
IntegrationFlow flowPublishContent()
{
return flow -> flow
.channel(this.channelPublishPackageItem)
.filter(PackageEntry.class, p -> p.getEntry() instanceof Page)
.transform(PackageEntry.class, PackageEntry::getEntry)
.split(Page.class, this::traversePageElements)
.<Content, String>route(Content::getType, mapping -> mapping
.resolutionRequired(false)
.subFlowMapping(PAGE, sf -> sf.channel(channelPublishPage()))
.subFlowMapping(IMAGE, sf -> sf.channel(channelPublishAsset()))
.defaultOutputToParentFlow());
//.channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}
最后,我的目标是订阅频道并相应地处理每个元素。我将此流订阅到 channelPublishPage。每个订阅者可能会以不同方式处理元素。
@Inject
@PublishPageChannel
private MessageChannel channelPublishPage;
@Bean
IntegrationFlow flowPublishPage()
{
return flow -> flow
.channel(this.channelPublishPage)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<Page>handle((p, h) -> this
.generatePage(p))));
}
我怎么感觉问题出在这里:
proxy.setDefaultRequestChannel(this.publishPackageChannel());
proxy.setDefaultReplyChannel(this.publishPackageChannel());
考虑不要对请求和等待回复使用相同的渠道。这样你会带来一些循环和真正意想不到的行为。