- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 Kafka 0.10.0.1 集群中有一个有 10 个分区的主题。我有一个产生多个消费者线程的应用程序。对于这个主题,我生成了 5 个线程。在我的应用程序日志中,我多次看到此条目
INFO :: AbstractCoordinator:600 - Marking the coordinator x.x.x.x:9092
(id:2147483646 rack: null) dead for group notifications-consumer
(Re-)joining group notifications-consumer.
之后我还看到一个警告说
Auto commit failed for group notifications-consumer: Commit cannot be completed since
the group has already rebalanced and assigned the partitions to another member. This means
that the time between subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is spending too much time
message processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned by poll() with max.poll.records.
props.put("max.poll.records", 200);
props.put("heartbeat.interval.ms", 20000);
props.put("session.timeout.ms", 60000);
最佳答案
与 session.timeout.ms
您只控制由于心跳导致的超时,这意味着已通过 session.timeout.ms
自上次心跳以来的毫秒数,集群将您声明为死节点并触发重新平衡。
之前 KIP-62心跳是在轮询中发送的,但现在被移动到特定的后台线程,以避免在您花费的时间超过 session.timeout.ms
时被从集群中逐出。调用另一个 poll()
.
将心跳分离到特定线程可以将处理与告诉集群您已启动并正在运行分离,但这引入了“活锁”情况的风险,其中进程处于事件状态但没有取得进展,因此除了使心跳独立的poll
引入了新的超时,以确保消费者还活着并取得进展。
文档说明了 KIP-62 之前的实现:
As long as the consumer is sending heartbeats, it basically holds a lock on the partitions it was assigned. If the process becomes defunct in such a way that it cannot make progress but is nevertheless continuing to send heartbeats, then no other member in the group will be able to take over the partitions, which causes increasing lag. The fact that heartbeating and processing is all done in the same thread, however, guarantees that consumers must make progress to keep their assignment. Any stall which affects processing also affects heartbeats.
Decoupling the processing timeout: We propose to introduce a separate locally enforced timeout for record processing and a background thread to keep the session active until this timeout expires. We call this new timeout as the "process timeout" and expose it in the consumer's configuration as max.poll.interval.ms. This config sets the maximum delay between client calls to poll()
max.poll.interval.ms
(默认为 5 分钟)处理 200 条轮询记录。
max.poll.records
或增加
max.poll.interval.ms
.
max.poll.interval.ms
出现在您的日志中的配置来自(至少)kafka 0.10.1.0,所以我认为您在那里犯了一个小错误。
org.apache.kafka.clients.consumer.KafkaConsumer
,如果您使用的是 Java)并将它们订阅到 N 个不同的主题,但使用相同的
group.id
.
KafkaConsumer
启动或停止,因为它会发送
JoinGroup
或
LeaveGroup
包含
group.id
的消息(参见相应的
kafka protocol )和
member.id
(
member.id
不是主机,因此在同一进程中创建的两个使用者仍将具有不同的 ID)。请注意,这些消息不包含主题订阅信息(尽管该信息应该在代理中,但 kafka 不会将其用于重新平衡)。
JoinGroup
或
LeaveGroup
为
group.id
X,它将为所有具有相同
group.id
的消费者触发重新平衡。 X。
group.id
启动 25 个消费者您将看到重新平衡,直到创建最后一个消费者并且相应的重新平衡结束(如果您继续看到这一点,您可能会停止消费者)。
If we have two KafkaConsumer using the same group.id (running in the same process or in two different processes) and one of them is closed, it triggers a rebalance in the other KafkaConsumer even if they were subscribed to different topics. I suppose that brokers must be taking into account only the group.id for a rebalance and not the subscribed topics corresponding to the pair (group_id,member_id) of the LeaveGroupRequest but I'm wondering if this is the expected behavior or it's something that should be improved? I guess that is probably the first option to avoid a more complex rebalance in the broker and considering that the solution is very simple, i.e. just use different group ids for different KafkaConsumer that subscribe to different topics even if they are running in the same process.
When rebalance occurs we see duplicate messages coming
I segregated into two groups, now suddenly problem has disappeared since past 2 hours.
group.id
对于每个主题。
关于apache-kafka - 卡夫卡消费者错误 : Marking coordinator dead,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50351464/
kafka的Java客户端-消费者 一、kafka消费方式 pull(拉)模式:consumer采用从broker中主动拉取数据。Kafka 采用这种方式 push(推)模式:Kafka没有采用这种方
我编写这个小应用程序是为了解决 Python 中的经典生产者/消费者问题。我知道我可以使用线程安全的队列机制来解决这个问题,但我有兴趣自己解决这个问题来学习。 from threading impor
下面是一个示例消费者/生产者模型的代码: int buffer[MAX]; int fill_ptr = 0; int use_ptr = 0; int count = 3; void put(int
我的消费者、生产者程序有问题,它似乎可以加载,但返回段错误。我已经尝试了一切来修复它,但仍然失败!将不胜感激任何帮助。笔记;代码真的很多,semaphore.h的代码都在里面,有谁想测试一下。其余代码
我正在阅读著名的操作系统概念书(Avi Silberschatz、Peter Baer Galvin、Greg Gagne)第 9 版:http://codex.cs.yale.edu/avi/os-
我正在尝试构建一个服务,为许多异步客户端提供队列以发出请求并等待响应。我需要能够通过每 Y 个持续时间的 X 个请求来限制队列处理。例如:每秒 50 个 Web 请求。它用于第 3 方 REST 服务
我正在尝试使用一组资源来实现生产者/消费者模式,因此每个线程都有一个与之关联的资源。例如,我可能有一个任务队列,其中每个任务都需要一个 StreamWriter写出它的结果。每个任务还必须有参数传
为什么我们需要 Azure 存储帐户上的 blob 容器用于 Eventhub 消费者客户端(我使用的是 python)。为什么我们不能像在 Kafka 中那样直接使用来自 Eventhub(Kafk
我有一个有趣的生产者-消费者衍生产品需要实现,但我无法理解它的算法。因此,每个生产者都会“产生”给定范围(最小值,最大值)之间的数字,这对除以给定“商”给出了相同的提醒。对于消费者来说也是如此。 额外
我需要实现一种生产者/消费者方案,出于性能原因,消费者尝试在一批中处理许多工作项(每个工作项都会耗尽工作队列)。 目前,我只是创建固定数量的相同工作人员,它们在循环中的同一队列上工作。由于其中一些可能
为什么我们需要 Azure 存储帐户上的 blob 容器用于 Eventhub 消费者客户端(我使用的是 python)。为什么我们不能像在 Kafka 中那样直接使用来自 Eventhub(Kafk
我的关系必须按如下方式运作;线程 A 向线程 B 发布一些更改,线程 B 接受该更改并将其发布到线程 C。 问题是生产者-消费者,我使用 BlockingQueue 仅用两个实体来实现它没有问题。我怎
我一直在研究 PC 问题,以了解 Java 同步和线程间通信。使用底部的代码,输出为 Producer produced-0 Producer produced-1 Producer produced
我编写了代码来实现生产者-消费者问题,它似乎工作正常,不需要同步。这可能吗? 如何测试代码并检查它是否确实正常工作?我如何知道是否会发生死锁?现在,我没有跳出循环(即生产者不断插入,消费者不断在无限循
我必须完成一项练习,我必须使用至少一个生产者线程和 x 个消费者线程的生产者/消费者模式在我的文件夹路径中查找“.java”文件。 生产者消费者级:首先,当生产者完成查找文件时,我尝试通过设置从 tr
我被分配了一项类(class)作业来实现消费者/生产者问题的解决方案,该解决方案使用单个生产者、单个消费者和循环缓冲区。这应该用 C 语言编写。 不幸的是,我们没有获得任何学习 Material ,并
有人可以检查我的代码并告诉我是否走在正确的轨道上。我似乎有点迷失了。如果您看到我的错误,请告诉我它们。 我想做的是使用我自己的信号量以及 GCD 来解决有界缓冲区问题。 提前致谢.. sema.c v
我要处理有界缓冲区、生产者消费者问题,只能修改 prod 和 cons 函数。此代码仅在一个消费者和生产者线程上运行,不会出现任何问题。但对于每个都有多个,迟早总会给我带来同样的问题: p5p1:
我有一个从多个线程访问的类的实例。此类接受此调用并将元组添加到数据库中。我需要以串行方式完成此操作,因为由于某些数据库约束,并行线程可能会导致数据库不一致。 由于我不熟悉 C# 中的并行性和并发性,所
我正在尝试编写一个批量邮件服务,它有两种方法: add(Mail mail):可以发送邮件,由Producers调用 flushMailService():刷新服务。消费者应该获取一个列表,并调用另一
我是一名优秀的程序员,十分优秀!