使用 Spring 批处理和 Spring 云数据流构建文件 Polling/Ingest 任务

Building a File Polling/Ingest Task with Spring Batch and Spring Cloud Data Flow

我们计划创建一个新的处理机制,包括监听几个目录 e.g: /opt/dir1, /opt/dirN 并且对于在这些目录中创建的每个文档,启动一个例程来处理,将其注册表保存在数据库中(通过REST 调用现有的 CRUD API) 并生成到另一个目录的协议文件。

出于测试目的,我没有使用任何现代的(甚至体面的)framework/approach,只是一个常规的 Spring带有 WatchService 实现的启动应用程序,它监听这些目录并轮询文件创建后立即处理。它有效,但很明显,当我转向生产并开始接收数十个要并行处理的文件时,我肯定会在某个时候对性能产生一些影响,这在我的示例中不是现实。

经过一些研究和一些同事的提示,我发现 Spring Batch + Spring Cloud Data Flow 是满足我需求的最佳组合。但是,我以前从未处理过 Batch 或 Data Flow,我有点困惑我应该构建这些块的内容和方式,以便以最简单和高效的方式运行此例程。我有几个关于它的附加值和架构的问题,非常希望听到您的想法!




我设法通过 SCDF REST API 启动了我的任务,因此我可以保留我原来的 SpringBoot 应用程序,使用 WatchService 通过 Feign 或 XXX 启动新任务。我仍然知道这远不是我应该在这里做的。经过更多研究后,我认为使用文件源和接收器创建流将是我的方式,除非有人有任何其他意见,但我无法将入站通道适配器设置为从多个目录轮询,我不能多个流,因为这个平台应该扩展到我们有成千上万的参与者(或从中轮询文件的目录)的程度。


I managed to create and run a sample batch file ingest task based on this section of Spring Docs. How can I launch a task every time a file is created in a directory? Do I need a Stream for that?

如果您必须在上游事件(例如:新文件)发生时自动启动它,是的,您可以通过流执行此操作(请参阅 example). If the events are coming off of a message-broker, you can directly consume them in the batch-job, too (eg: AmqpItemReader)。

If I do, How can I create a stream application that launches my task programmaticaly for each new file passing it's path as argument? Should I use RabbitMQ for this purpose?

希望上面的例子能说明问题。如果你想以编程方式启动任务(不是通过 DSL/REST/UI),你可以使用新的 Java DSL 支持来实现,它是在 1.3 中添加的。

How can I keep some variables externalized for my task e.g directories path? Can I have these streams and tasks read an application.yml somewhere else than inside it's jar?

推荐的方法是使用配置服务器。根据编排的平台,您必须向任务及其 sub-tasks 提供 config-server 凭据,包括 batch-jobs。在 Cloud Foundry 中,我们只需将 config-server 服务实例绑定到每个任务,并在运行时自动解析外部化属性。

Why should I use Spring Cloud Data Flow alongside Spring Batch and not only a batch application? Just because it spans parallel tasks for each file or do I get any other benefit?

作为 Spring Batch Admin 的替代品,SCDF 为 Tasks/Batch-Jobs 提供监控和管理。执行、步骤、step-progress 和错误时的堆栈跟踪将被保留并可从仪表板进行探索。您也可以直接使用 SCDF 的 REST 端点来检查此信息。

Talking purely about performance, how would this solution compare to my WatchService + plain processing implementation if you think only about the sequential processing scenario, where I'd receive only 1 file per hour or so?

这是特定于实现的。我们没有任何基准可以分享。但是,如果需要性能,您可以在 Spring Batch 中探索 remote-partitioning 支持。您可以使用 "n" 数量的工作人员来划分摄取或数据处理任务,这样您就可以实现并行性。