在 AWS Kinesis 的 KCL Java 库的情况下,如何使用 requestShutdown 和 shutdown 进行正常关闭
How do I use the requestShutdown and shutdown to do graceful shutdown in the case of KCL Java library for AWS Kinesis
我正在尝试使用 Java 中 KCL 库的新功能让 AWS Kinesis 正常关机,方法是注册关机挂钩以停止所有记录处理器,然后正常停止工作人员。新库提供了一个新的接口,需要实现记录处理器。但是它是如何被调用的呢?
尝试先调用 worker.requestShutdown(),然后调用 worker.shutdown(),它起作用了。但这是使用它的任何预期方式吗?那么两者同时使用有什么用处呢?
启动消费者
您可能知道,当您创建 Worker
时,它
1) 在 dynamodb
中创建 consumer offset table
2) 创建租约,安排 租约人和续租人在 configured interval of time
如果你有两个分区,那么在同一个dynamodb中会有两条记录table,这意味着分区需要一个租约。
例如
{
"checkpoint": "TRIM_HORIZON",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 38,
"leaseKey": "shardId-000000000000",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
{
"checkpoint": "49570828493343584144205257440727957974505808096533676050",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 40,
"leaseKey": "shardId-000000000001",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
- 租赁协调员 ScheduledExecutorService(称为
leaseCoordinatorThreadPool
)负责安排和续订租赁
3) 然后对于流中的每个分区,Worker
创建一个内部 PartitionConsumer, which actually fetches the events, and dispatches to your RecordProcessor#processRecords
. see ProcessTask#call
4) 关于你的问题,你必须将你的 IRecordProcessorFactory
impl 注册到 worker
,这会给一个 ProcessorFactoryImpl
每个 PartitionConsumer
.
例如。 see example here, which might be helpful
KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
"consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
.withKinesisClientConfig(getHttpConfiguration())
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream
Worker consumerWorker = new Worker.Builder()
.recordProcessorFactory(new DavidsEventProcessorFactory())
.config(streamConfig)
.dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
.build();
public class DavidsEventProcessorFactory implements IRecordProcessorFactory {
private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);
@Override
public IRecordProcessor createProcessor() {
logger.info("Creating an EventProcessor.");
return new DavidsEventPartitionProcessor();
}
}
class DavidsEventPartitionProcessor implements IRecordProcessor {
private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);
//TODO add consumername ?
private String partitionId;
private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;
public KinesisEventPartitionProcessor() {
}
@Override
public void initialize(InitializationInput initializationInput) {
this.partitionId = initializationInput.getShardId();
logger.info("Initialised partition {} for streaming.", partitionId);
}
@Override
public void processRecords(ProcessRecordsInput recordsInput) {
recordsInput.getRecords().forEach(nativeEvent -> {
String eventPayload = new String(nativeEvent.getData().array());
logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);
//update offset after configured amount of retries
try {
recordsInput.getCheckpointer().checkpoint();
logger.debug("Persisted the consumer offset to {} for partition {}",
nativeEvent.getSequenceNumber(), partitionId);
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
});
}
@Override
public void shutdown(ShutdownInput shutdownReason) {
logger.debug("Shutting down event processor for {}", partitionId);
if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
try {
shutdownReason.getCheckpointer().checkpoint();
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
}
}
}
// 然后启动一个消费者
consumerWorker.run();
停止消费者
现在,当您想要停止您的 Consumer 实例 (Worker
) 时,您不需要对每个 PartitionConsumer
做太多处理,Worker
会处理这些事情一旦你要求它关闭。
和shutdown
,它要求leaseCoordinatorThreadPool
停止,后者负责续租和租约,并等待终止。
另一方面,requestShutdown
取消租户,AND 通知 PartitionConsumer
关闭。
requestShutdown
更重要的是,如果您想在 RecordProcessor
上收到通知,那么您也可以实施 IShutdownNotificationAware
。这样,当您的 RecordProcessor
正在处理事件但 worker 即将关闭时出现竞争条件,您仍然应该能够提交偏移量然后关闭。
requestShutdown
returns一个ShutdownFuture
,然后回调worker.shutdown
您必须在 RecordProcessor
上实施以下方法才能在 requestShutdown
、
上收到通知
class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {
private String partitionId;
// few implementations
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
logger.debug("Shutdown requested for {}", partitionId);
}
}
但是,如果您在通知之前解除租约,那么它可能不会被调用。
问题总结
The new library provides a new interface which record processors needs
to be implemented. But how does it get invoked?
- 实施
IRecordProcessorFactory
和 IRecordProcessor
。
- 然后将您的
RecordProcessorFactory
连接到您的 Worker
。
Tried invoking first the worker.requestShutdown() then
worker.shutdown() and it works. But is it any intended way to use it?
你应该使用 requestShutdown()
作为 graceful shutdown, which will take care of race-condition. It was introduced in kinesis-client-1.7.1
我正在尝试使用 Java 中 KCL 库的新功能让 AWS Kinesis 正常关机,方法是注册关机挂钩以停止所有记录处理器,然后正常停止工作人员。新库提供了一个新的接口,需要实现记录处理器。但是它是如何被调用的呢?
尝试先调用 worker.requestShutdown(),然后调用 worker.shutdown(),它起作用了。但这是使用它的任何预期方式吗?那么两者同时使用有什么用处呢?
启动消费者
您可能知道,当您创建 Worker
时,它
1) 在 dynamodb
中创建 consumer offset table2) 创建租约,安排 租约人和续租人在 configured interval of time
如果你有两个分区,那么在同一个dynamodb中会有两条记录table,这意味着分区需要一个租约。
例如
{
"checkpoint": "TRIM_HORIZON",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 38,
"leaseKey": "shardId-000000000000",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
{
"checkpoint": "49570828493343584144205257440727957974505808096533676050",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 40,
"leaseKey": "shardId-000000000001",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
- 租赁协调员 ScheduledExecutorService(称为
leaseCoordinatorThreadPool
)负责安排和续订租赁
3) 然后对于流中的每个分区,Worker
创建一个内部 PartitionConsumer, which actually fetches the events, and dispatches to your RecordProcessor#processRecords
. see ProcessTask#call
4) 关于你的问题,你必须将你的 IRecordProcessorFactory
impl 注册到 worker
,这会给一个 ProcessorFactoryImpl
每个 PartitionConsumer
.
例如。 see example here, which might be helpful
KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
"consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
.withKinesisClientConfig(getHttpConfiguration())
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream
Worker consumerWorker = new Worker.Builder()
.recordProcessorFactory(new DavidsEventProcessorFactory())
.config(streamConfig)
.dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
.build();
public class DavidsEventProcessorFactory implements IRecordProcessorFactory {
private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);
@Override
public IRecordProcessor createProcessor() {
logger.info("Creating an EventProcessor.");
return new DavidsEventPartitionProcessor();
}
}
class DavidsEventPartitionProcessor implements IRecordProcessor {
private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);
//TODO add consumername ?
private String partitionId;
private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;
public KinesisEventPartitionProcessor() {
}
@Override
public void initialize(InitializationInput initializationInput) {
this.partitionId = initializationInput.getShardId();
logger.info("Initialised partition {} for streaming.", partitionId);
}
@Override
public void processRecords(ProcessRecordsInput recordsInput) {
recordsInput.getRecords().forEach(nativeEvent -> {
String eventPayload = new String(nativeEvent.getData().array());
logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);
//update offset after configured amount of retries
try {
recordsInput.getCheckpointer().checkpoint();
logger.debug("Persisted the consumer offset to {} for partition {}",
nativeEvent.getSequenceNumber(), partitionId);
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
});
}
@Override
public void shutdown(ShutdownInput shutdownReason) {
logger.debug("Shutting down event processor for {}", partitionId);
if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
try {
shutdownReason.getCheckpointer().checkpoint();
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
}
}
}
// 然后启动一个消费者
consumerWorker.run();
停止消费者
现在,当您想要停止您的 Consumer 实例 (Worker
) 时,您不需要对每个 PartitionConsumer
做太多处理,Worker
会处理这些事情一旦你要求它关闭。
和
shutdown
,它要求leaseCoordinatorThreadPool
停止,后者负责续租和租约,并等待终止。
另一方面,requestShutdown
取消租户,AND 通知PartitionConsumer
关闭。
requestShutdown
更重要的是,如果您想在 RecordProcessor
上收到通知,那么您也可以实施 IShutdownNotificationAware
。这样,当您的 RecordProcessor
正在处理事件但 worker 即将关闭时出现竞争条件,您仍然应该能够提交偏移量然后关闭。
requestShutdown
returns一个ShutdownFuture
,然后回调worker.shutdown
您必须在 RecordProcessor
上实施以下方法才能在 requestShutdown
、
class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {
private String partitionId;
// few implementations
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
logger.debug("Shutdown requested for {}", partitionId);
}
}
但是,如果您在通知之前解除租约,那么它可能不会被调用。
问题总结
The new library provides a new interface which record processors needs to be implemented. But how does it get invoked?
- 实施
IRecordProcessorFactory
和IRecordProcessor
。 - 然后将您的
RecordProcessorFactory
连接到您的Worker
。
Tried invoking first the worker.requestShutdown() then worker.shutdown() and it works. But is it any intended way to use it?
你应该使用 requestShutdown()
作为 graceful shutdown, which will take care of race-condition. It was introduced in kinesis-client-1.7.1