- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
新的 Kafka 版本 (0.11) 支持 exactly-once 语义。
我在 Java 中使用 kafka 事务代码设置了一个生产者,就像这样。
producer.initTransactions();
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : payload) {
producer.send(record);
}
Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
我不太确定如何使用 sendOffsetsToTransaction 及其预期用例。 AFAIK,消费者群体是消费者端的多线程读取功能。
javadoc 说
”向消费者组协调器发送一个已消费的偏移量列表,同时将这些偏移量标记为当前事务的一部分。只有在事务提交成功时,这些偏移量才会被视为已消耗。当您需要时应该使用此方法将消费和生产消息一起批量处理,通常采用消费-转换-生产模式。”
produce 将如何维护消耗的抵消列表?这有什么意义呢?
最佳答案
这仅与您消费然后根据消费内容生成消息的工作流程相关。此函数允许您仅在下游生产成功时提交您消耗的偏移量。如果您消费数据,以某种方式处理它,然后产生结果,这将实现跨消费/生产的交易保证。
如果没有事务,您通常使用 Consumer#commitSync()
或 Consumer#commitAsync()
来提交消费者偏移量。但是,如果您在与您的生产者进行生产之前使用这些方法,您将在知道生产者是否成功发送之前已经提交了偏移量。
因此,您可以在生产者上使用 Producer#sendOffsetsToTransaction()
来提交偏移量,而不是向消费者提交偏移量。这会将偏移量发送到处理事务的事务管理器。仅当整个交易(消费和生产)成功时,它才会提交抵消。
(注意:当您发送偏移量进行提交时,您应该将上次读取的偏移量加 1,以便以后的读取从您尚未读取的偏移量恢复。无论无论您是向消费者还是向生产者 promise 。请参阅:KafkaProducer sendOffsetsToTransaction need offset+1 to successfully commit current offset)。
关于java - Kafka 0.11 中 sendOffsetsToTransaction 的含义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45195010/
这个问题在这里已经有了答案: Towards the "true" definition of JAVA_HOME (5 个答案) 关闭 4 年前。 为什么 ActiveMQ 提供者需要设置 JAV
这个问题在这里已经有了答案: What is a lambda expression in C++11? (10 个答案) 关闭 8 年前。 这是来自 boosts asio 的一个例子。这是什么意
这个问题在这里已经有了答案: What does the double colon (::) mean in CSS? (3 个答案) 关闭 7 年前。 我经常看到这种用法。特别是伪类。“::”在
嗨,另一个愚蠢的简单问题。我注意到在Apple框架中的某些typedef中使用符号"<<"谁能告诉我这是什么意思?: enum { UIViewAutoresizingNone
someObject.$() 是什么意思? 我正在浏览 sapui5 工具包中的 tilecontainer-dbg 文件,发现了这个: var oDomRef = this.$(); or some
这个问题已经有答案了: How to interpret function parameters in software and language documentation? (4 个回答) 已关闭
我遇到过这个语法。任何人都可以解释一下 getArg1ListInfo:()=>(object.freeze(arg1)) 的含义 function foo (arg1,arg2) { let
对于子类,我有以下代码: class child1 : public parent { public: static parent* function1(void) { ret
这个问题在这里已经有了答案: What does "|=" mean? (pipe equal operator) (6 个答案) 关闭 1 年前。 我有一部分代码包含以下功能: void Keyb
以下在 C++ 中是什么意思? typedef PComplex RComplex [100]; 请注意,PComplex 是我代码中的用户定义类型。 谢谢 最佳答案 RComplex 是 PComp
在我的 Lisp 代码中,我有函数 (nfa-regex-compile),它创建一个包含初始状态、转换和最终状态的 cons 列表(表示自动机的节点)从作为参数给出的正则表达式开始。 在这种情况下,
以下文字摘自 Learning Spark 第 3 章 One issue to watch out for when passing functions is inadvertently seria
PHP 文档 block 中以下内容的含义是什么: #@+ zend框架代码中的一个例子: /**#@+ * @const string Version constant numbers */ c
由于 python 的一些版本控制问题,我必须使用自定义函数来比较 HMAC (SHA512)。为此,我找到了这个函数: def compare_digest(x, y): if not (i
取自this answer here : static const qi::rule node = '{' >> *node >> '}' | +~qi::char_("{}"); 请注意,声明了名称
我正在查看 chi 包的文档。我看到类似的东西: https://github.com/pressly/chi/blob/master/_examples/rest/main.go#L154 data
我想知道如果我采用值为 8 的 INT,这是否意味着我只能从 1 到 99999999 或从 1 到 4294967295 UNSIGNED? 最佳答案 文档似乎很清楚这一点: Numeric Typ
我想知道如果我采用值为 8 的 INT,这是否意味着我只能从 1 到 99999999 或从 1 到 4294967295 UNSIGNED? 最佳答案 文档似乎很清楚这一点: Numeric Typ
这个问题在这里已经有了答案: 关闭9年前。 Possible Duplicate: Does “/* (non-javadoc)” have a well-understood meaning? 以下
在 Prolog 代码中,可以使用“ headless ”Horn 子句将指令传递给编译器,这些子句与指向左侧的物质蕴涵 ':-' (⇐) 的左侧没有头部关系。例如,导入模块或声明 Unit Test
我是一名优秀的程序员,十分优秀!