- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
有一个耗时的操作(大约10分钟),但是kafka在5分钟后重新平衡,即使我暂停了消费者。消费者方法:
@KafkaListener(topics = {TopicAppoint.EXECUTE_SCHOOL_DATA_STATICS_TASK})
public void receiveMessage(@Payload String payload, Consumer<String, String> consumer) {
Set<TopicPartition> assignment = consumer.assignment();
consumer.pause(assignment);
if (StringUtils.isNotEmpty(payload)) {
SchoolStatisticsTaskDTO staticsTaskDTO = JSONObject.parseObject(payload, SchoolStatisticsTaskDTO.class);
Optional<SchoolStatisticsTaskDO> taskOptional = schoolStatisticsTaskRepository.findById(staticsTaskDTO.getTrackId());
taskOptional.ifPresent(schoolStaticsTaskDO -> {
// handler
});
}
consumer.resume(assignment);
}
这是我的配置:
kafka:
bootstrap-servers: 192.168.0.230:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
properties:
max.request.size: 12582912
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: dc-fitness-data-consumer-group
properties:
max.partition.fetch.bytes: 12582912
#enable-auto-commit: false
listener:
ack-mode: record
concurrency: 6
日志
13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.records.incremental.update.task-4, ft.es.records.incremental.update.task-5, ft.es.records.incremental.update.task-2, ft.es.records.incremental.update.task-3, ft.es.records.incremental.update.task-0, ft.es.records.incremental.update.task-1]
13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.records.incremental.update.task-4, ft.es.records.incremental.update.task-5, ft.es.records.incremental.update.task-2, ft.es.records.incremental.update.task-3, ft.es.records.incremental.update.task-0, ft.es.records.incremental.update.task-1]
13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.220 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.student.batch.upload.task-17, ft.student.batch.upload.task-14, ft.student.batch.upload.task-13, ft.student.batch.upload.task-16, ft.student.batch.upload.task-15, ft.student.batch.upload.task-12]
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.student.batch.upload.task-17, ft.student.batch.upload.task-14, ft.student.batch.upload.task-13, ft.student.batch.upload.task-16, ft.student.batch.upload.task-15, ft.student.batch.upload.task-12]
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.record.batch.upload.task-32, ft.record.batch.upload.task-31, ft.record.batch.upload.task-34, ft.record.batch.upload.task-33, ft.record.batch.upload.task-30, ft.record.batch.upload.task-35]
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.record.batch.upload.task-32, ft.record.batch.upload.task-31, ft.record.batch.upload.task-34, ft.record.batch.upload.task-33, ft.record.batch.upload.task-30, ft.record.batch.upload.task-35]
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.class.batch.upload.task-18, ft.class.batch.upload.task-19, ft.class.batch.upload.task-20, ft.class.batch.upload.task-21, ft.class.batch.upload.task-22, ft.class.batch.upload.task-23]
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.class.batch.upload.task-26, ft.class.batch.upload.task-27, ft.class.batch.upload.task-28, ft.class.batch.upload.task-29, ft.class.batch.upload.task-24, ft.class.batch.upload.task-25]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.class.batch.upload.task-18, ft.class.batch.upload.task-19, ft.class.batch.upload.task-20, ft.class.batch.upload.task-21, ft.class.batch.upload.task-22, ft.class.batch.upload.task-23]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.class.incremental.update.task-26, ft.es.class.incremental.update.task-27, ft.es.class.incremental.update.task-28, ft.es.class.incremental.update.task-29, ft.es.class.incremental.update.task-24, ft.es.class.incremental.update.task-25]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.class.batch.upload.task-26, ft.class.batch.upload.task-27, ft.class.batch.upload.task-28, ft.class.batch.upload.task-29, ft.class.batch.upload.task-24, ft.class.batch.upload.task-25]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.class.incremental.update.task-26, ft.es.class.incremental.update.task-27, ft.es.class.incremental.update.task-28, ft.es.class.incremental.update.task-29, ft.es.class.incremental.update.task-24, ft.es.class.incremental.update.task-25]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.class.incremental.update.task-22, ft.es.class.incremental.update.task-23, ft.es.class.incremental.update.task-18, ft.es.class.incremental.update.task-19, ft.es.class.incremental.update.task-20, ft.es.class.incremental.update.task-21]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.class.incremental.update.task-22, ft.es.class.incremental.update.task-23, ft.es.class.incremental.update.task-18, ft.es.class.incremental.update.task-19, ft.es.class.incremental.update.task-20, ft.es.class.incremental.update.task-21]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#5-4-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-47, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.225 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] (Re-)joining group
最佳答案
让我们看一下文档,从 pause()
方法开始
公共(public)无效暂停(集合分区) here
Suspend fetching from the requested partitions. Future calls to poll(long) will not return any records from these partitions until they have been resumed using resume(Collection). Note that this method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.
所以从上面的描述来看,pause()
方法会暂停分区获取消息,但不会暂停消费者线程,这意味着消费者线程将执行后续的 poll()
请求暂停的分区,但不会获取任何记录
在您的应用程序中:分区已暂停,但使用者线程正忙于执行耗时操作超过 10 分钟,而没有执行任何轮询请求。
检测消费者故障:因此当轮询停止时,心跳不会发送到集群
After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown), then no heartbeats will be sent. If a period of the configured session timeout elapses before the server has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned.
解决方案:增加以下属性的超时,因为默认时间为 5 分钟,您会看到每 5 分钟进行一次重新平衡(个人不支持增加超时)here
The new Java Consumer now supports heartbeating from a background thread. There is a new configuration
max.poll.interval.ms
which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configurationrequest.timeout.ms
must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value ofmax.poll.records
has been changed to 500.
解决方案2:如果您需要用更少的时间处理大量数据,请增加更多分区并用每个线程消耗每个分区(这意味着更多并发性),5 天内可以处理的数据更少分钟
关于spring-boot - Spring Kafka 即使我暂停消费者,也总是在 5 分钟后重新平衡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53292655/
我不明白 int 63823 为何比 double 1.0 占用更少的空间。在这个特定实例中,int 中是否没有存储更多信息? 最佳答案 I don't understand how an int 6
这可能不是一个直接的代码问题,但它是一个经常出现在 SO 上的问题,我发现阅读它非常有用。 App Store - Help answering “Missing Compliance” (using
我在我们的应用程序中使用 syncfusion 寻呼机和下拉列表请打开以下链接。 https://stackblitz.com/edit/angular-nv6myv?file=src%2Fapp%2
以便解释指针和引用in this question我写了这段代码。 MyClass& MyClass::MyInstance() { static MyClass & myLoca
在 C 和 C++ 中,assert 是一个非常 重量级例程,将错误写入 stdout 并终止程序。在我们的应用程序中,我们实现了一个更强大的 assert 替代品,并为其提供了自己的宏。已尽一切努力
我已经创建了一个 MVC webApi 项目,现在我想使用身份验证和授权。我想我已经实现了这种安全措施,但由于某种原因,有些事情变糟了,当我编写我的凭据并尝试调用一些 webApi 方法时,显示消息“
我发现自己使用一种奇怪的方式向我的函数添加回调函数,我想知道是否有更通用的方式向函数添加回调函数,最好的情况是我的所有函数都检查最后给定的作为函数的参数,如果是,则将其用作回调。 我以前是这样的: v
几乎从来没有我只想获取某个 Remote 的情况;我总是想要所有的 Remote 。我认为这将是一个足够常见的用例,git 会考虑它(与他们有 pull.rebase true 的方式相同)。 那么,
我正在尝试使用 inarray 但它总是返回 true?有任何想法吗? (所有 li 均已显示) $("#select-by-color-list li").hide(); // get the se
我正在尝试为我公司的开发环境设置过期网址。我们使用 lighttpd在此环境中提供上传的文件,我发现 these docs这似乎相当有希望。 问题是我似乎根本无法让它工作,而且我有点不知所措,试图找出
我无法让“文件夹”外部变量工作。我总是得到[:]。 我正在 Windows 下的 Grails 上进行开发(这就是为什么外部配置文件看起来像 file:C:\path\to/file)。 我在另一个项
这个问题是出于对 PL 如何工作的好奇,而不是其他任何事情。 (它实际上是在查看与 Haskell 不同的 SML 时想到的,因为前者使用按值调用 - 但我的问题是关于 Haskell。) Haske
我有一个高速缓存内存模块,我希望它是可字寻址的,但有字节的写使能信号。 always @ (posedge clk) begin //stuff... if(write) begin
我正在处理一些代码,其中一个对象“foo”正在创建另一个对象对象“bar”,并向其传递一个Callable。之后 foo 将返回bar,然后我希望 foo 变得无法访问(即:可用于垃圾收集)。 我最初
我已将我的程序与此方法相关联: public static void CreateFileAssociation(string extension, string key, string descri
所以我正在进行目录遍历,但我无法让 opendir 按照我想要的方式工作。它总是无法打开我发送的目录,它给出了一些未知的错误。我通常传入 argv[1],但我放弃了,只是开始硬编码路径。 char *
这个问题在这里已经有了答案: How do I compare strings in Java? (23 个回答) 关闭 9 年前。 出于某种原因,我的(基本)程序总是打印我为 else 语句保留的
我不想冒为此提出破解的风险,因为它涉及 datetime 对象。基本上,我想按如下方式进行转换: 2010-04-21 06:37:53 -> 2010-04-21 06:40:00 2010-08-
我正在用 C 语言玩文件 I/O。我正在尝试使用 fgets 从一个文件中读取数据并将其输出到另一个文件。问题是它总是返回 NULL,因此没有任何内容被复制到输出文件中。这是我的代码: #includ
class MyClass { // empty class with no base class }; int main() { MyClass* myClass = new MyC
我是一名优秀的程序员,十分优秀!