- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们有 1 个 Kafka 主题和 1 个分区:
从 spring boot kafka 消费者那里看到一个相当奇怪的行为。 Spring kafka消费者在重新启动时总是从主题的开头开始消费。 我已经配置了spring kafka监听器如下
卡夫卡监听器:
@KafkaListener(topics = "${application.kafkaInputTopic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(String message) {
log.debug("SG message received. Parsing...");
TransmissionMessage transmissionMessage;
SGTransmission transmission = parseMessage(message);
//Porcess Transmission......
}
消费者配置和Spring消费者容器 Autowiring bean
@Resource
public Environment env;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
// I know this isnt right, should be run in 1 thread as there isonly
//partition in the topic
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap();
propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty(Constants.SPRING_KAFKA_SECURITY_PROTOCOL));
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(Constants.SPRING_KAFKA_BOOTSTRAP_SERVERS));
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty(Constants.SPRING_KAFKA_GROUP_ID));
return propsMap;
}
Spring 应用yaml
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
properties:
consumer:
# If this consumer does not have an offset yet, start at latest offset.
# Be careful with `earliest`, this will use the first (available) offset in the topic, which is most likely not what you want.
auto-offset-reset: latest
group-id: ${KAFKA_GROUP_ID}
每次消费者崩溃并重新启动时,所有消息都会从头开始读取。正如您在 application.yaml 中看到的那样,情况不应该是这样的
auto-offset-reset: latest
代理端或消费者端是否可能存在一些我可能忽略的其他配置,这导致消费者每次重新启动时都从头开始读取?
最佳答案
您必须以某种方式提交初始偏移量,也许是在完成此配置之前。
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
这意味着您有责任提交偏移量。
使用AckMode.BATCH
(默认)或AckMode.RECORD
。
或者,使用 kafka-consumer-groups CLI 工具删除当前提交的偏移量(您也可以使用相同的工具列出当前偏移量)。
或者使用组的 UUID 来每次获取一个新组。
编辑
您还可以让监听器类实现 ConsumerSeekAware
并在 onPartitionsAssigned()
中调用 callback.seekToEnd(partitions)
。
关于java - Spring kafka消费者不尊重自动偏移重置=最新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59438744/
我在 R Markdown 文档中设置全局选项时遇到问题。下面是一个简单的例子。在这种情况下,我试图设置 global.par = TRUE .期望是任何 par()我在一个夹头中制作的规范被带到后续
Cloudflare 正在缓存我的请求,但它忽略了语言 header 。 示例请求: URL : https://api.example.com/v1/places/?param=1¶m=2
我正在尝试创建一个分面条形图,每个分面的百分比加起来为 100。对此的解决方案似乎是 group 的组合和 ..density.. .如何 - 在我看来 group与 fill 冲突. 数据: tes
我正在开发一个 C# 应用程序,该应用程序试图遵守其运行系统的时间格式。如果 Windows 控制面板更改为 24 小时格式,这就是应用程序显示时间的格式。无论如何,它成功地做到了这一点,除了只有在应
我用过 Vundle安装 editorconfig-vim插件。它正确加载并列在 :scriptnames 中.但是当我创建一个新文件时,比如 x.js ,缩进设置不是从 ~/.editorconfi
我曾尝试使用不同的方法自行解决这个问题,但没有一个给我预期的结果。 当我在我的项目的数据库中保存一个文本类型的变量时,问题就出现了。它用换行符保存它,事实上,当我尝试从我的一个 View 中编辑它时,
让我头疼的代码是这样的: $('#timeline .selected').removeClass('selected'); 它在 IE8 中无法正常运行。这些类确实被正确删除,但不知何故该元素仍然具
在处理 java 中的 Swing 对象(还有 JFX,但我稍后会担心这个问题)时,我遇到了一个让我摸不着头脑的问题。 这是我用来在程序中打开字体的代码。这是相当标准的。 public static
我正在为电子商务购物车使用 SOAP API,但我似乎无法让 session 在不同的页面中持续存在。 作为示例,我在下面有一些测试代码(带有一堆调试消息),它将一个项目添加到购物车,然后查看购物车。
我有一个 fieldset与 legend可能加载了很长的字符串。 我想要 legend尊重 fieldset 的宽度并且只使用了 50% 的空间。 目前,如果legend太长它仍然只占fieldse
我有一个完整城市的 3D 模型,想展示一个 这些建筑物的等距 View 。我为此使用 gnuplot 多边形, 因为我不认为我可以将 pm3d 用于具有坐标的多边形 不在一个明确定义的网格上。多边形以
我理解 Clojure 的 *assert*变量可用于关闭断言,但我所做的一切似乎都不起作用。 (defn foo [a] {:pre [(pos? a)]} (assert (even? a
我有一个带有 DependencyProperty 和 CoerceValueCallback 的控件。此属性绑定(bind)到模型对象上的属性。 当将控件属性设置为导致强制的值时,绑定(bind)会
可以通过将它们放置在 smcs.rsp 中来创建全局定义,当您点击播放时 - 您会注意到代码的这些部分被点击,并且一切都表现得好像它应该的那样。 但是,在 MonoDevelop 中编辑源代码时,它无
总的来说,我非常努力尊重模块的隐私(如果变量以下划线为前缀,我不会使用它)。然而,我有一个极端的情况,它看起来相当“安全”。 这是演示 ( my previous question ) parser=
我有一个悬停动画的 div(我正在使用 jquery 的 .hover() 方法)。 div 包含一个带有选择的表单。打开选择并悬停在选项上会使 IE9 将其解释为“取消悬停”父 div,导致第二个悬
如果 max_user_connections 连接已打开,是否有方法告诉 Entity Framework 等待? 我想我可以捕获异常并重试或保留一个计数器,但这最多感觉很糟糕。 我的 Azure
在我的测试中,BitmapFactory.decodeFile() 创建的 Bitmap 不遵循 EXIF header 。 例如,当我调用 Bitmap.getWidth() 时,设备拍摄的肖像图像
请帮助我了解如何解决这个问题。 这是我的路由文件 (auth-routes.js) const userControllers = require('../controllers/user') mod
我的应用程序有时会注入(inject) 标记到网站中,然后创建一个新的 带有亲戚的标签 src 例如设置并注入(inject) 导致浏览器从 http://localhost:8080/chapter
我是一名优秀的程序员,十分优秀!