- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我尝试在 scala 中实现一个非常简单的 Kafka (0.9.0.1) 消费者(代码如下)。
据我了解,Kafka(或者更确切地说是 Zookeeper)为每个 groupId 存储给定主题的最后一条消费消息的偏移量。所以给定以下场景:
groupId1
昨天消费了唯一的 5主题中的消息。现在最后消费的消息有偏移量 4(考虑到偏移量为 0) 的第一条消息groupId1
,会有有两种选择:选项 1:如果我将以下属性设置为 "latest"
,消费者将阅读夜间到达的最后 2 条新消息:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
选项 2:如果我将以下属性设置为 "earliest"
,消费者将阅读主题中的所有 7 条消息:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
问题:出于某种原因,如果我将消费者的 groupId 更改为 groupId2
,这是给定主题的新 groupId,因此它从未消费过任何消息之前,它的最新偏移量应该是 0。我期待通过设置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
消费者将在第一次执行期间读取主题中存储的所有消息(相当于最早)。然后对于接下来的执行,它将只消耗新的。然而,事实并非如此。
如果我设置一个新的 groupId
并将 AUTO_OFFSET_RESET_CONFIG
保持为 latest
,消费者将无法阅读任何消息。然后我需要做的是在第一次运行时将 AUTO_OFFSET_RESET_CONFIG
设置为 earliest
,一旦 groupID 已经有一个不同于 0 的偏移量,我就可以移动到 最新的
。
我的消费者应该是这样的吗?有没有比在我第一次运行消费者后切换 AUTO_OFFSET_RESET_CONFIG
更好的解决方案?
下面是我作为一个简单的消费者使用的代码:
class KafkaTestings {
val brokers = "listOfBrokers"
val groupId = "anyGroupId"
val topic = "anyTopic"
val props = createConsumerConfig(brokers, groupId)
def createConsumerConfig(brokers: String, groupId: String): Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "12321")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props
}
def run() = {
consumer.subscribe(Collections.singletonList(this.topic))
Executors.newSingleThreadExecutor.execute( new Runnable {
override def run(): Unit = {
while (true) {
val records = consumer.poll(1000)
for (record <- records) {
println("Record: "+record.value)
}
}
}
})
}
}
object ScalaConsumer extends App {
val testConsumer = new KafkaTestings()
testConsumer.run()
}
This被用作编写这个简单消费者的引用
最佳答案
这是按照记录工作的。
如果你开始一个新的消费者组(即 Kafka 中没有存储现有偏移量的消费者组),你必须选择消费者是否应该从最早的可能消息开始(主题中仍然可用的最旧消息)或来自最新的(仅从现在开始产生的消息)。
Is there a better solution than switching the AUTO_OFFSET_RESET_CONFIG after the first time I run the consumer?
您可以将它保持在 EARLIEST,因为当您第二次运行消费者时,它已经存储了偏移量,只需从那里获取。重置策略仅在创建新的消费者组时使用。
Today I restart the consumer, with the same groupId1, there will be two options:
不是真的。由于消费者组在前一天运行,它将找到其提交的偏移量并从中断的地方继续。因此,无论您将重置策略设置为什么,它都会收到这两条新消息。
虽然知道,Kafka 不会永远存储这些偏移量,但我相信默认值只是一周。因此,如果您关闭消费者的时间超过这个时间,偏移量可能会过时,并且您可能会意外重置为 EARLIEST(这对于大型主题来说可能代价高昂)。鉴于此,无论如何将其更改为 LATEST 可能是谨慎的做法。
关于scala - Kafka - 为什么在将 AUTO_OFFSET_RESET_CONFIG 设置为 "latest"时,新的 groupId 不返回主题中的所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57195132/
我遇到了一个奇怪的问题。我的应用程序的 Release 版本似乎运行良好,但最近当我切换到 Debug 版本时,我在启动时立即遇到访问冲突。当释放分配的内存块时,就会发生访问冲突。所有这些都发生在静态
我在 C# 中偶然发现了这种奇怪的语法形式,并试图弄清楚它的含义以及如何使用它。网络上似乎没有关于此的任何文档。 object data = new { var1 = someValue, var2
我正在尝试使用浏览器的内置类型 CSSStyleDeclaration 以编程方式传递和修改样式(由于 .cssText 属性,这很方便)。 但是,new CSSStyleDeclaration()
我有现成的代码: internal bool firstAsSymbol(out Symbol s) { return (s = first as Symbol) !=
在新的 Eclipse 版本 2022-03 中,一些(但不是全部)java 项目在 Project Explorer View 中的外观发生了变化。尽管 Package Presentation 设
我正在尝试使用 FormData 通过获取 API 在 POST 请求中发送用户输入的数据。问题是,当我用我创建的表单创建一个新的 FormData 对象时,它一直在创建一个空对象——没有条目/键/值
我有一个用一些 intel-intrinsincs 编写的 C 代码。在我先用 avx 然后用 ssse3 标志编译后,我得到了两个完全不同的汇编代码。例如: AVX: vpunpckhbw %xm
最近,discord 为您自己的应用程序添加了对斜杠命令的支持。我通读了它的文档,并尝试搜索一些视频(但是该功能刚刚出现),但我不明白我实际上需要做什么才能使其正常工作。我正在使用 WebStorm(
我想使用 JRI 从 Java 调用 R。 我在 eclipse 下在主类中运行它: Rengine c = new Rengine(new String[] { "--vanilla" },
我正在使用新的 Place Autocomplete那是来自新的静态Google Places SDK 客户端库 (here)。所以它真的很容易使用,我刚得到this tutorial它按预期工作。
我刚刚更新到 flutter 版本 1.25.0-5.0.pre.92,我的代码中出现了很多与空安全相关的错误,这些错误以前运行良好。我没有以任何方式选择空安全,我所做的只是运行 flutter 升级
我已经使用 React Native 有一段时间了,但我想我会在网络上试用 React。所以我遵循了这个指南:https://reactjs.org/docs/create-a-new-react-a
周六早上在这里。尝试学习新的 Scala 编译器 dotty。 安装在我的 Mac 上使用 brew install lampepfl/brew/dotty 安装成功。我有版本 dotr -versi
我使用了谷歌地方的新依赖。单击自动完成 View 时应用程序崩溃。错误如下。, java.lang.NullPointerException: Place Fields must be set.
我关注了这个博客-> https://medium.com/@teyou21/training-your-object-detection-model-on-tensorflow-part-2-e9e
在哪里可以找到用于在此架构上进行组装的新寄存器的名称? 我指的是 X86 中的寄存器,如 EAX、ESP、EBX 等。但我希望它们是 64 位的。 我认为它们与我反汇编 C 代码时不同,我得到的是 r
新的服务总线库 Azure.Messaging.ServiceBus 使用 ServiceBusReceivedMessage 来接收消息 https://learn.microsoft.com/en
需要使用实时流媒体 channel 的实时编码类型在新的 Azure 门户中配置广告插入和石板图像。请帮忙解决这个问题,因为我找不到该功能。 最佳答案 此处描述了 Azure 媒体服务的广告插入选项
我正在使用新的 GitHub 操作,下面的工作流程的想法是在打开或同步 pr 时运行,它应该首先检查并安装依赖项,然后运行一些 yarn 脚本 name: PR to Master on: pul
我听说 DMD 2.058 中将有一个用于匿名函数的新语法,但我找不到任何相关信息。新语法是什么?旧语法是否会被弃用? 最佳答案 我相信它就像 C#'s . 以下内容是等效的: delegate(i,
我是一名优秀的程序员,十分优秀!