适当使用 Spring 集成 Java DSL 加 AmazonS3InboundSynchronizationMessageSource

Appropriate use of Spring Integration Java DSL plus AmazonS3InboundSynchronizationMessageSource

我正在使用 AmazonS3InboundSynchronizationMessageSource 读取散布在 type >> year >> month >> day >> hour >> {filename}-{uniqueid}.gz 组织的 S3 存储桶子目录中的数百万个文件。理想情况下,我想轮询和写入并让同步器记住我在后续轮询中读取的最后位置以检索后续批次。然而,这不是上面 MessageSource 的设计方式。

无论如何,我可以通过选择一个范围并阅读内容来解决这个问题。

除此之外,如果我采用一种简单的方法并在第一次轮询时从一个目录中读取文件;我想在那之后关闭(System.exit)(实际上是在下面的评论中进行了一些处理之后)。

所以,类似于这里的问题:

我只想轮询一次并在第一次轮询后退出。 (也许有不同的方法来解决这个问题?我愿意接受建议)。

应用bootstrap

@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class DataMigrationApp extends SpringBootServletInitializer {

@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
    return application.sources(DataMigrationApp.class);
}

public static void main(String[] args) {
    SpringApplication.run(DataMigrationApp.class, args);
}

}

已更新 (2015-09-06)

代码示例

@Configuration
public class DataMigrationModule {

private final Logger log = LoggerFactory.getLogger(getClass());

@Value("${cloud.aws.credentials.accessKey}")
private String accessKey;

@Value("${cloud.aws.credentials.secretKey}")
private String secretKey;

@Value("${cloud.aws.s3.bucket}")
private String bucket;

@Value("${cloud.aws.s3.max-objects-per-batch:1024}")
private int maxObjectsPerBatch;

@Value("${cloud.aws.s3.accept-subfolders:false}")
private String acceptSubFolders;

@Value("${cloud.aws.s3.remote-directory}")
private String remoteDirectory;

@Value("${cloud.aws.s3.local-directory:target/s3-dump}")
private String localDirectory;

@Value("${cloud.aws.s3.filename-wildcard:}")
private String fileNameWildcard;

@Value("${app.persistent-type:}")
private String persistentType;

@Value("${app.repository-type:}")
private String repositoryType;

@Value("${app.persistence-batch-size:2500}")
private int persistenceBatchSize;

@Autowired
private ListableBeanFactory beanFactory;

private final AtomicBoolean invoked = new AtomicBoolean();

public Date nextExecutionTime(TriggerContext triggerContext) {
    return this.invoked.getAndSet(true) ? null : new Date();
}

private FileToInputStreamTransformer unzipTransformer() {
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
    transformer.setDeleteFiles(true);
    return transformer;
}

private Class<?> repositoryType() {
    try {
        return Class.forName(repositoryType);
    } catch (ClassNotFoundException cnfe) {
        log.error("DataMigrationModule.failure -- (Unknown repository implementation!)", cnfe);
        System.exit(0);
    }
    return null;
}

private Class<?> persistentType() {
    try {
        return Class.forName(persistentType);
    } catch (ClassNotFoundException cnfe) {
        log.error("DataMigrationModule.failure -- (Unsupported type!)", cnfe);
        System.exit(0);
    }
    return null;
}

@Bean
public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
    AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
    AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
    messageSource.setCredentials(credentials);
    messageSource.setBucket(bucket);
    messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
    messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
    messageSource.setRemoteDirectory(remoteDirectory);
    if (!fileNameWildcard.isEmpty()) {
        messageSource.setFileNameWildcard(fileNameWildcard);
    }
    String directory = System.getProperty("java.io.tmpdir");
    if (!localDirectory.startsWith("/")) {
        localDirectory = "/" + localDirectory;
    }
    if (!localDirectory.endsWith("/")) {
        localDirectory = localDirectory + "/";
    }
    directory = directory + localDirectory;
    FileUtils.mkdir(directory);
    messageSource.setDirectory(new LiteralExpression(directory));
    return messageSource;
}

@Bean
DirectChannel inputChannel() {
    return new DirectChannel();
}

@Bean 
JdbcRepositoryHandler jdbcRepositoryHandler() {
    return new JdbcRepositoryHandler(repositoryType(), beanFactory);
}

@Bean
public IntegrationFlow flow() {
    // formatter:off
    return IntegrationFlows
            .from(
                    this.amazonS3InboundSynchronizationMessageSource(),
                    e -> e.poller(p -> p.trigger(this::nextExecutionTime))
            )
            .transform(unzipTransformer())
            // TODO add advised PollableChannel to deal with possible decompression issues

            .split(f -> new FileSplitter())
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .transform(Transformers.fromJson(persistentType()))
            // TODO add advised PollableChannel to deal with possible transform issues

            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(a -> 
                            a.releaseStrategy(g -> g.size() == persistenceBatchSize)
                            .expireGroupsUponCompletion(true)
                            .sendPartialResultOnExpiry(true)
                            .groupTimeoutExpression("size() ge 2 ? 10000 : -1")
                            , null
            )
            .handle(jdbcRepositoryHandler())
            // TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
            .get();
    // formatter:on
}

public class JdbcRepositoryHandler extends AbstractReplyProducingMessageHandler {

    private final Logger log = LoggerFactory.getLogger(getClass());

    @SuppressWarnings("rawtypes")
    private Insertable repository;

    public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
        repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
    }

    @Override
    protected Object handleRequestMessage(Message<?> message) {
        List<?> result = null;
        try {
            result = repository.insert((List<?>) message.getPayload());
        } catch (TransactionSystemException | DataAccessException e) {
            // TODO Quite a bit more work to add retry capability for records that didn't cause failure
            log.error("DataMigrationModule.failure -- (Could not persist batch!)", ExceptionUtils.getStackTrace(e));
        }
        return result;
    }

}

public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

    @Override
    protected InputStream transformFile(File payload) throws Exception {
        return new GZIPInputStream(new FileInputStream(payload));
    }
}

}

其实不知道你的问题是什么。

对了,你走对了。

对于 OnlyOnceTrigger,您可以使用我的测试用例中的类似内容:

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Date nextExecutionTime(TriggerContext triggerContext) {
        return this.invoked.getAndSet(true) ? null : new Date();
    }

...

    e -> e.poller(p -> p.trigger(this::nextExecutionTime))

要解压缩文件,您应该这样做:

.<File, InputStream>transform(p -> new GZIPInputStream(new FileInputStream(p)))

您必须这样做,因为有一个 FileSplitter 开箱即用的组件可以逐行读取文件并为每个文件发出消息。那个支持 InputStream 作为有效负载,让您避免将整个文件加载到内存中。

因此,您 IntegrationFlow 中的下一个 EIP 方法如下:

.split(new FileSplitter())

之后不确定是否需要将每个域对象聚合到某个列表以进行进一步的批量插入,因为您可以通过 ExecutorChannel

一个一个地分发它们

如您所见,delete unpacked file 步骤中没有任何原因。

以及最后 delete all *.gz files 步骤。只是因为您可能依赖 AcceptOnceFileListFilter 来避免在下一个轮询任务中重新读取同一文件。

如果我遗漏了什么,请告诉我。