AWS Kinesis KCL 跳过启动前添加的记录

AWS Kinesis KCL skips records added before startup

我开始同时使用 KPLKCL 在服务之间交换数据。但是每当 consumer service 离线时, KPL 发送的所有数据将永远丢失。所以我只得到那些在 consumer service 启动并且它的 shardConsumer 准备就绪时发送的数据块。我需要从最后消耗的点或以其他方式处理留下的数据

开始

这是我的 ShardProcessor 代码:

@Override
    public void initialize(InitializationInput initializationInput) {

    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.records()
                .forEach(record -> {
                    //my logic
                });
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {

    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shard Ended", e);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shutdown Requested", e);

        }
    }

和配置代码:

public void configure(String streamName, ShardRecordProcessorFactory factory) {

        Region region = Region.of(awsRegion);

        KinesisAsyncClient kinesisAsyncClient =
                KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder =
                new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
                        UUID.randomUUID().toString(), factory);

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
                        .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

有两种方法可以解决这个问题。一、问题

默认情况下,KCL 配置为从 LATEST 开始读取流。此设置告诉流 reader 在“当前”时间戳获取流。

在您的情况下,您在该流中拥有“现在”之前放置在那里的数据。为了读取该数据,您可能需要考虑读取流中最早的数据。如果您设置默认流,该流将存储数据 24 小时。

要从该流的“开头”读取数据,或者在启动 KCL 应用程序前 24 小时,您需要将流 reader 设置为 TRIM_HORIZON。此设置称为 initialPositionInStream。你可以阅读它 here. There are three different settings documented in the API.

要解决您的问题,如第一个 link 中所述,首选方法是向属性文件添加一个条目。如果您不使用属性文件,您只需将其添加到您的 Scheduler ctor:

Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordinatorConfig(),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
        .initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))
        .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);

此设置要记住的一件事是当流中有数据并且从 TRIM_HORIZON 开始时的启动功能。在这种情况下,RecordProcessor 将尽可能快地遍历记录。这可能会在 Kinesis API 甚至下游系统(无论您在 RecordProcessor 拥有数据后将数据发送到何处)造成性能问题,