在 AWS Kinesis 的 KCL Java 库的情况下,如何使用 requestShutdown 和 shutdown 进行正常关闭

How do I use the requestShutdown and shutdown to do graceful shutdown in the case of KCL Java library for AWS Kinesis

我正在尝试使用 Java 中 KCL 库的新功能让 AWS Kinesis 正常关机,方法是注册关机挂钩以停止所有记录处理器,然后正常停止工作人员。新库提供了一个新的接口,需要实现记录处理器。但是它是如何被调用的呢?

尝试先调用 worker.requestShutdown(),然后调用 worker.shutdown(),它起作用了。但这是使用它的任何预期方式吗?那么两者同时使用有什么用处呢?

启动消费者

您可能知道,当您创建 Worker 时,它

1) 在 dynamodb

中创建 consumer offset table

2) 创建租约,安排 租约人和续租人在 configured interval of time

如果你有两个分区,那么在同一个dynamodb中会有两条记录table,这意味着分区需要一个租约。

例如

{
  "checkpoint": "TRIM_HORIZON",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 38,
  "leaseKey": "shardId-000000000000",
  "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
  "ownerSwitchesSinceCheckpoint": 0
}

{
  "checkpoint": "49570828493343584144205257440727957974505808096533676050",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 40,
  "leaseKey": "shardId-000000000001",
  "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
  "ownerSwitchesSinceCheckpoint": 0
}
  • 租赁协调员 ScheduledExecutorService(称为 leaseCoordinatorThreadPool)负责安排和续订租赁

3) 然后对于流中的每个分区,Worker 创建一个内部 PartitionConsumer, which actually fetches the events, and dispatches to your RecordProcessor#processRecords. see ProcessTask#call

4) 关于你的问题,你必须将你的 IRecordProcessorFactory impl 注册到 worker,这会给一个 ProcessorFactoryImpl每个 PartitionConsumer.

例如。 see example here, which might be helpful

KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
 "consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
            .withKinesisClientConfig(getHttpConfiguration())
            .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream

Worker consumerWorker = new Worker.Builder()
            .recordProcessorFactory(new DavidsEventProcessorFactory())
            .config(streamConfig)
            .dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
            .build();


public class DavidsEventProcessorFactory implements IRecordProcessorFactory {

    private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);

    @Override
    public IRecordProcessor createProcessor() {
        logger.info("Creating an EventProcessor.");
        return new DavidsEventPartitionProcessor();
    }
}

class DavidsEventPartitionProcessor implements IRecordProcessor {

    private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);

    //TODO add consumername ?

    private String partitionId;

    private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;

    public KinesisEventPartitionProcessor() {
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.partitionId = initializationInput.getShardId();
        logger.info("Initialised partition {} for streaming.", partitionId);
    }

    @Override
    public void processRecords(ProcessRecordsInput recordsInput) {
        recordsInput.getRecords().forEach(nativeEvent -> {
            String eventPayload = new String(nativeEvent.getData().array());
            logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);

            //update offset after configured amount of retries
            try {
                recordsInput.getCheckpointer().checkpoint();
                logger.debug("Persisted the consumer offset to {} for partition {}",
                        nativeEvent.getSequenceNumber(), partitionId);
            } catch (InvalidStateException e) {
                logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                e.printStackTrace();
            } catch (ShutdownException e) {
                logger.error("Consumer Shutting down", e);
                e.printStackTrace();
            }
        });
    }

    @Override
    public void shutdown(ShutdownInput shutdownReason) {
        logger.debug("Shutting down event processor for {}", partitionId);

        if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
            try {
                shutdownReason.getCheckpointer().checkpoint();
            } catch (InvalidStateException e) {
                logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                e.printStackTrace();
            } catch (ShutdownException e) {
                logger.error("Consumer Shutting down", e);
                e.printStackTrace();
            }
        }
    }

}

// 然后启动一个消费者

consumerWorker.run();

停止消费者

现在,当您想要停止您的 Consumer 实例 (Worker) 时,您不需要对每个 PartitionConsumer 做太多处理,Worker 会处理这些事情一旦你要求它关闭。

  • shutdown,它要求leaseCoordinatorThreadPool停止,后者负责续租和租约,并等待终止。

  • 另一方面,
  • requestShutdown 取消租户,AND 通知 PartitionConsumer 关闭。

requestShutdown 更重要的是,如果您想在 RecordProcessor 上收到通知,那么您也可以实施 IShutdownNotificationAware。这样,当您的 RecordProcessor 正在处理事件但 worker 即将关闭时出现竞争条件,您仍然应该能够提交偏移量然后关闭。

requestShutdown returns一个ShutdownFuture,然后回调worker.shutdown

您必须在 RecordProcessor 上实施以下方法才能在 requestShutdown

上收到通知
class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {

   private String partitionId;

   // few implementations

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        logger.debug("Shutdown requested for {}", partitionId);
    }

}

但是,如果您在通知之前解除租约,那么它可能不会被调用。

问题总结

The new library provides a new interface which record processors needs to be implemented. But how does it get invoked?

  • 实施 IRecordProcessorFactoryIRecordProcessor
  • 然后将您的 RecordProcessorFactory 连接到您的 Worker

Tried invoking first the worker.requestShutdown() then worker.shutdown() and it works. But is it any intended way to use it?

你应该使用 requestShutdown() 作为 graceful shutdown, which will take care of race-condition. It was introduced in kinesis-client-1.7.1