gpt4 book ai didi

java - 对于 AWS Kinesis 的 KCL Java 库,如何使用 requestShutdown 和 shutdown 进行正常关闭

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:51:05 26 4
gpt4 key购买 nike

我正在尝试使用适用于 AWS Kinesis 的 Java 中 KCL 库的新功能,通过注册关闭钩子(Hook)来正常关闭所有记录处理器,然后优雅地停止工作人员。新库提供了一个新的接口(interface),需要实现记录处理器。但是它是如何被调用的呢?

尝试先调用 worker.requestShutdown() 然后调用 worker.shutdown() 并且它有效。但这是使用它的任何预期方式吗?那么同时使用这两者有什么用,它有什么好处?

最佳答案

启动消费者

您可能知道,当您创建一个 Worker 时,它会

1) 创建 consumer offset table在 dynamodb 中

2)configured interval of time 创建租约,安排 租约接受者和续租者

如果你有两个分区,那么在同一个 dynamodb 表中会有两条记录,这意味着分区需要租约。

例如。

{
"checkpoint": "TRIM_HORIZON",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 38,
"leaseKey": "shardId-000000000000",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}

{
"checkpoint": "49570828493343584144205257440727957974505808096533676050",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 40,
"leaseKey": "shardId-000000000001",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
  • 租赁协调器 ScheduledExecutorService(称为 leaseCoordinatorThreadPool)负责安排和续订租赁

3) 然后对于流中的每个分区,Worker 创建一个内部 PartitionConsumer ,实际上 fetches the events ,并发送到您的 RecordProcessor#processRecords。见ProcessTask#call

4) 关于您的问题,您必须将您的IRecordProcessorFactory impl 注册到worker,这将提供一个ProcessorFactoryImpl 到每个 PartitionConsumer

例如。 see example here, which might be helpful

KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
"consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
.withKinesisClientConfig(getHttpConfiguration())
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream

Worker consumerWorker = new Worker.Builder()
.recordProcessorFactory(new DavidsEventProcessorFactory())
.config(streamConfig)
.dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
.build();


public class DavidsEventProcessorFactory implements IRecordProcessorFactory {

private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);

@Override
public IRecordProcessor createProcessor() {
logger.info("Creating an EventProcessor.");
return new DavidsEventPartitionProcessor();
}
}

class DavidsEventPartitionProcessor implements IRecordProcessor {

private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);

//TODO add consumername ?

private String partitionId;

private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;

public KinesisEventPartitionProcessor() {
}

@Override
public void initialize(InitializationInput initializationInput) {
this.partitionId = initializationInput.getShardId();
logger.info("Initialised partition {} for streaming.", partitionId);
}

@Override
public void processRecords(ProcessRecordsInput recordsInput) {
recordsInput.getRecords().forEach(nativeEvent -> {
String eventPayload = new String(nativeEvent.getData().array());
logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);

//update offset after configured amount of retries
try {
recordsInput.getCheckpointer().checkpoint();
logger.debug("Persisted the consumer offset to {} for partition {}",
nativeEvent.getSequenceNumber(), partitionId);
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
});
}

@Override
public void shutdown(ShutdownInput shutdownReason) {
logger.debug("Shutting down event processor for {}", partitionId);

if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
try {
shutdownReason.getCheckpointer().checkpoint();
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
}
}

}

//然后启动一个消费者

consumerWorker.run();

停止消费者

现在,当您想要停止您的 Consumer 实例(Worker)时,您不需要对每个 PartitionConsumer 做太多处理,PartitionConsumer 将由 Worker 一旦你要求它关闭。

  • 通过shutdown,它要求 leaseCoordinatorThreadPool 停止,它负责更新和获取租约,并等待终止.

  • 另一方面,
  • requestShutdown 取消租约接受者,AND 通知 PartitionConsumer 关闭。

requestShutdown 更重要的是,如果您想在 RecordProcessor 上收到通知,那么您也可以实现 IShutdownNotificationAware。这样,当您的 RecordProcessor 正在处理事件但 worker 即将关闭时出现竞争条件,您仍然应该能够提交偏移量然后关闭。

requestShutdown 返回一个 ShutdownFuture,然后回调 worker.shutdown

您必须在 RecordProcessor 上实现以下方法才能在 requestShutdown 上收到通知,

class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {

private String partitionId;

// few implementations

@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
logger.debug("Shutdown requested for {}", partitionId);
}

}

但是如果您在通知之前松开租约,那么它可能不会被调用。

问题总结

The new library provides a new interface which record processors needs to be implemented. But how does it get invoked?

  • 实现 IRecordProcessorFactoryIRecordProcessor
  • 然后将您的 RecordProcessorFactory 连接到您的 Worker

Tried invoking first the worker.requestShutdown() then worker.shutdown() and it works. But is it any intended way to use it?

对于 graceful shutdown,您应该使用 requestShutdown() ,这将处理竞争条件。它是在 kinesis-client-1.7.1 中介绍的

关于java - 对于 AWS Kinesis 的 KCL Java 库,如何使用 requestShutdown 和 shutdown 进行正常关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42381253/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com