Spring Cloud DataFlow http 轮询和去重
Spring Cloud DataFlow http polling and deduplication
我阅读了很多 Spring Cloud DataFlow 和相关文档,以便生成一个数据摄取解决方案,该解决方案将 运行 我组织的 Cloud Foundry 部署。目标是轮询 HTTP 服务以获取数据,为了便于讨论,可能每天三次,然后 insert/update 将数据存储在 PostgreSQL 数据库中。 HTTP 服务似乎每天提供数十万条记录。
到目前为止,一个混淆点是在 DataFlow 管道上下文中对轮询记录进行重复数据删除的最佳实践。源数据没有时间戳字段来帮助跟踪轮询,只有粗略的日级日期字段。我也不保证记录不会被追溯更新。这些记录似乎有一个唯一的 ID,所以我可以用这种方式删除记录,但我只是不确定根据文档如何最好地在 DataFlow 中实现该逻辑。据我所知,Spring Cloud Stream starters do not provide for this out-of-the-box. I was reading about Spring Integration's smart polling,但我不确定这是否能解决我的担忧。
我的直觉是在 DataFlow Stream 中创建一个自定义处理器 Java 组件,该组件执行数据库查询以确定是否已插入轮询记录,然后将适当的记录插入目标数据库,或者通过他们在下游。在 Stream 应用程序中可以接受在中间步骤中查询目标数据库吗?或者,我可以在 Spring Cloud Task 中将这一切实现为基于某个时间表触发的批处理操作。
继续使用 DataFlow 应用程序的最佳方法是什么?如我在 DataFlow/Stream/Task/Integration 应用程序中所述,实现重复数据删除的 common/best 做法是什么?我应该复制入门应用程序的设置还是从头开始,因为我相当确定我需要编写自定义代码?我什至需要 Spring Cloud DataFlow,因为我不确定我是否会使用它的 DSL?很抱歉提出所有问题,但作为 Cloud Foundry 和所有这些 Spring 项目的新手,将它们拼凑在一起是一项艰巨的任务。
在此先感谢您的帮助。
您走在正确的轨道上,鉴于您的要求,您很可能需要创建自定义处理器。您需要跟踪已插入的内容以避免重复。
没有什么可以阻止您在流应用程序中编写此类处理器,但是性能可能会受到影响,因为您将针对每条记录发出一个数据库查询。
如果顺序不重要,您可以并行化查询,以便处理多个并发消息,但最终您的数据库仍要付出代价。
另一种方法是使用 bloomfilter,它可以极大地帮助加快检查插入记录的速度。
您可以从克隆入门应用程序开始,您可以让一个轮询器触发一个 http 客户端处理器来获取您的数据,然后通过您的自定义代码处理器,最后到达 jdbc-sink。类似于 stream create time --triger.cron=<CRON_EXPRESSION> | httpclient --httpclient.url-expression=<remote_endpoint> | customProcessor | jdbc
使用 SCDF 的优势之一是您可以通过 deployer.customProcessor.count=8
等部署属性独立扩展自定义处理器
Spring Cloud Data Flow 基于 Spring Cloud Stream 构建数据集成流,而 Cloud Stream 又完全基于 Spring Integration。所有原则都存在于 Spring 集成可以在 SCDF 级别的任何地方应用。
这确实可能是您无法避免一些编码的情况,但是您需要的是在 EIP 中调用的 Idempotent Receiver. And Spring Integration 为我们提供了一个:
@ServiceActivator(inputChannel = "processChannel")
@IdempotentReceiver("idempotentReceiverInterceptor")
public void handle(Message<?> message)
我阅读了很多 Spring Cloud DataFlow 和相关文档,以便生成一个数据摄取解决方案,该解决方案将 运行 我组织的 Cloud Foundry 部署。目标是轮询 HTTP 服务以获取数据,为了便于讨论,可能每天三次,然后 insert/update 将数据存储在 PostgreSQL 数据库中。 HTTP 服务似乎每天提供数十万条记录。
到目前为止,一个混淆点是在 DataFlow 管道上下文中对轮询记录进行重复数据删除的最佳实践。源数据没有时间戳字段来帮助跟踪轮询,只有粗略的日级日期字段。我也不保证记录不会被追溯更新。这些记录似乎有一个唯一的 ID,所以我可以用这种方式删除记录,但我只是不确定根据文档如何最好地在 DataFlow 中实现该逻辑。据我所知,Spring Cloud Stream starters do not provide for this out-of-the-box. I was reading about Spring Integration's smart polling,但我不确定这是否能解决我的担忧。
我的直觉是在 DataFlow Stream 中创建一个自定义处理器 Java 组件,该组件执行数据库查询以确定是否已插入轮询记录,然后将适当的记录插入目标数据库,或者通过他们在下游。在 Stream 应用程序中可以接受在中间步骤中查询目标数据库吗?或者,我可以在 Spring Cloud Task 中将这一切实现为基于某个时间表触发的批处理操作。
继续使用 DataFlow 应用程序的最佳方法是什么?如我在 DataFlow/Stream/Task/Integration 应用程序中所述,实现重复数据删除的 common/best 做法是什么?我应该复制入门应用程序的设置还是从头开始,因为我相当确定我需要编写自定义代码?我什至需要 Spring Cloud DataFlow,因为我不确定我是否会使用它的 DSL?很抱歉提出所有问题,但作为 Cloud Foundry 和所有这些 Spring 项目的新手,将它们拼凑在一起是一项艰巨的任务。
在此先感谢您的帮助。
您走在正确的轨道上,鉴于您的要求,您很可能需要创建自定义处理器。您需要跟踪已插入的内容以避免重复。
没有什么可以阻止您在流应用程序中编写此类处理器,但是性能可能会受到影响,因为您将针对每条记录发出一个数据库查询。
如果顺序不重要,您可以并行化查询,以便处理多个并发消息,但最终您的数据库仍要付出代价。
另一种方法是使用 bloomfilter,它可以极大地帮助加快检查插入记录的速度。
您可以从克隆入门应用程序开始,您可以让一个轮询器触发一个 http 客户端处理器来获取您的数据,然后通过您的自定义代码处理器,最后到达 jdbc-sink。类似于 stream create time --triger.cron=<CRON_EXPRESSION> | httpclient --httpclient.url-expression=<remote_endpoint> | customProcessor | jdbc
使用 SCDF 的优势之一是您可以通过 deployer.customProcessor.count=8
Spring Cloud Data Flow 基于 Spring Cloud Stream 构建数据集成流,而 Cloud Stream 又完全基于 Spring Integration。所有原则都存在于 Spring 集成可以在 SCDF 级别的任何地方应用。
这确实可能是您无法避免一些编码的情况,但是您需要的是在 EIP 中调用的 Idempotent Receiver. And Spring Integration 为我们提供了一个:
@ServiceActivator(inputChannel = "processChannel")
@IdempotentReceiver("idempotentReceiverInterceptor")
public void handle(Message<?> message)