- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我正在尝试使用适用于 AWS Kinesis 的 Java 中 KCL 库的新功能,通过注册关闭钩子(Hook)来正常关闭所有记录处理器,然后优雅地停止工作人员。新库提供了一个新的接口(interface),需要实现记录处理器。但是它是如何被调用的呢?
尝试先调用 worker.requestShutdown() 然后调用 worker.shutdown() 并且它有效。但这是使用它的任何预期方式吗?那么同时使用这两者有什么用,它有什么好处?
最佳答案
您可能知道,当您创建一个 Worker
时,它会
1) 创建 consumer offset table在 dynamodb 中
2) 在 configured interval of time 创建租约,安排 租约接受者和续租者
如果你有两个分区,那么在同一个 dynamodb 表中会有两条记录,这意味着分区需要租约。
例如。
{
"checkpoint": "TRIM_HORIZON",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 38,
"leaseKey": "shardId-000000000000",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
{
"checkpoint": "49570828493343584144205257440727957974505808096533676050",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 40,
"leaseKey": "shardId-000000000001",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
leaseCoordinatorThreadPool
)负责安排和续订租赁3) 然后对于流中的每个分区,Worker
创建一个内部 PartitionConsumer ,实际上 fetches the events ,并发送到您的 RecordProcessor#processRecords
。见ProcessTask#call
4) 关于您的问题,您必须将您的IRecordProcessorFactory
impl 注册到worker
,这将提供一个ProcessorFactoryImpl
到每个 PartitionConsumer
。
例如。 see example here, which might be helpful
KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
"consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
.withKinesisClientConfig(getHttpConfiguration())
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream
Worker consumerWorker = new Worker.Builder()
.recordProcessorFactory(new DavidsEventProcessorFactory())
.config(streamConfig)
.dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
.build();
public class DavidsEventProcessorFactory implements IRecordProcessorFactory {
private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);
@Override
public IRecordProcessor createProcessor() {
logger.info("Creating an EventProcessor.");
return new DavidsEventPartitionProcessor();
}
}
class DavidsEventPartitionProcessor implements IRecordProcessor {
private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);
//TODO add consumername ?
private String partitionId;
private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;
public KinesisEventPartitionProcessor() {
}
@Override
public void initialize(InitializationInput initializationInput) {
this.partitionId = initializationInput.getShardId();
logger.info("Initialised partition {} for streaming.", partitionId);
}
@Override
public void processRecords(ProcessRecordsInput recordsInput) {
recordsInput.getRecords().forEach(nativeEvent -> {
String eventPayload = new String(nativeEvent.getData().array());
logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);
//update offset after configured amount of retries
try {
recordsInput.getCheckpointer().checkpoint();
logger.debug("Persisted the consumer offset to {} for partition {}",
nativeEvent.getSequenceNumber(), partitionId);
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
});
}
@Override
public void shutdown(ShutdownInput shutdownReason) {
logger.debug("Shutting down event processor for {}", partitionId);
if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
try {
shutdownReason.getCheckpointer().checkpoint();
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
}
}
}
//然后启动一个消费者
consumerWorker.run();
现在,当您想要停止您的 Consumer 实例(Worker
)时,您不需要对每个 PartitionConsumer
做太多处理,PartitionConsumer
将由 Worker
一旦你要求它关闭。
通过shutdown
,它要求 leaseCoordinatorThreadPool
停止,它负责更新和获取租约,并等待终止.
requestShutdown
取消租约接受者,AND 通知 PartitionConsumer
关闭。
requestShutdown
更重要的是,如果您想在 RecordProcessor
上收到通知,那么您也可以实现 IShutdownNotificationAware
。这样,当您的 RecordProcessor
正在处理事件但 worker 即将关闭时出现竞争条件,您仍然应该能够提交偏移量然后关闭。
requestShutdown
返回一个 ShutdownFuture
,然后回调 worker.shutdown
您必须在 RecordProcessor
上实现以下方法才能在 requestShutdown
上收到通知,
class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {
private String partitionId;
// few implementations
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
logger.debug("Shutdown requested for {}", partitionId);
}
}
但是如果您在通知之前松开租约,那么它可能不会被调用。
The new library provides a new interface which record processors needs to be implemented. But how does it get invoked?
IRecordProcessorFactory
和 IRecordProcessor
。RecordProcessorFactory
连接到您的 Worker
。Tried invoking first the worker.requestShutdown() then worker.shutdown() and it works. But is it any intended way to use it?
对于 graceful shutdown,您应该使用 requestShutdown()
,这将处理竞争条件。它是在 kinesis-client-1.7.1 中介绍的
关于java - 对于 AWS Kinesis 的 KCL Java 库,如何使用 requestShutdown 和 shutdown 进行正常关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42381253/
Java 库和 android 库有什么区别,各自有什么优点/缺点? 最佳答案 您可以在 Android 应用程序中包含标准 Java .jar 文件库。它们在 .apk 构建时被翻译成 Dalvik
所以,我现在的代码就像从 Java 层加载库(比如 liba.so),并在内部 liba.so 加载 libb.so。因此,如果我必须将所有库打包到 APK 中并将其安装在没有 root 访问权限的设
我想在我的系统中设置 LEDA 库。 我已经从以下链接下载了 LEDA 库 http://www.algorithmic-solutions.info/free/d5.php Instruct
我想用 autoconf 创建一个共享库。但是,我希望共享库具有“.so”扩展名,而不是以“lib”开头。基本上,我想制作一个加载 dlopen 的插件。 .是否有捷径可寻? 当我尝试使用 autoc
我需要在 Apps 脚本应用程序上修改 PDF。为此,我想使用 JS 库:PDF-LIB 我的代码: eval(UrlFetchApp.fetch("https://unpkg.com/pdf-lib
我正在构建一个使用以下 Boost header 的程序(我使用的是 Microsoft Visual C++ 10), #include #include #include #include
当我通过 cygwin 在 hadoop 上运行此命令时: $bin/hadoop jar hadoop-examples-*.jar grep input output 'dfs[a-z.]+' 我
我已经通过 vcpgk 成功安装了一个 C++ 库,名为:lmdb:x64-windows 我还安装了lmdb通过 Cabal 安装的 Haskell 绑定(bind)包 在尝试测试 lmdb 包时:
我该如何解决这个问题? 我刚刚将 javacv jar 文件复制到我的项目 Lib 文件夹下,但出现了这个错误! 我可以找到这个thread来自谷歌,但不幸的是,由于我国的谷歌限制政策,该页面无法打开
我有一个 Android 库项目 FooLib。 FooLib 引用 Android Context 之类的东西,但不需要任何资源文件(res/ 中的东西)所以我目前将其打包为供我的应用使用的 JAR
我正在开发一个 Android 应用程序(使用 Android Studio),它能够通过手势识别算法了解您正在进行的 Activity 。对于我使用 nickgillian ithub 帐户上可用的
关于从 .NET Framework 项目中引用 .NET Standard 类库的问题有很多类似的问题,其中 netstandard 库中的 NuGet 包依赖项不会流向 netframework
我已经从互联网上下载了 jna-4.2.2.jar,现在想将这个 jar 导入到我的项目中。但是当我试图将这个 jar 导入我的项目时,出现以下错误。 [2016-06-20 09:35:01 - F
我正在尝试通过编译在 Mac 上安装 rsync 3.2.3。但是,我想安装所有功能。为此,它需要一些库,此处 ( https://download.samba.org/pub/rsync/INSTA
进入 Web 开发有点困难。过去 5 年我一直致力于 winforms 工作。所以我正在努力从一种切换到另一种。前段时间,我使用过 JavaScript,但现在还没有大量的 JavaScript 库
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 1
我正在寻找一个用Python编写的与logstash(ruby + java)类似的工具/库。 我的目标是: 从 syslog 中解析所有系统日志 解析应用程序特定日志(apache、django、m
就目前情况而言,这个问题不太适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、民意调查或扩展讨论。如果您觉得这个问题可以改进并可能重新开放,visit
我花了几天时间试图寻找用于 JavaPOS 实现的 .jar 库,但我找不到任何可以工作的东西。我找到了很多像这样的文档:http://jpos.1045706.n5.nabble.com/file/
这个问题在这里已经有了答案: Merge multiple .so shared libraries (2 个答案) 关闭 9 年前。 我有我在代码中使用的第三方库的源代码和对象。该库附带有关如何使
我是一名优秀的程序员,十分优秀!