- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在实现 kafka 消费者时应该采取什么更好的方法。
目标从 Kafka 读取并写回 db。数百万行
方法一:
每个分区 - 每个消费者 - 等待消息消耗(即写回数据库)然后在轮询循环中继续下一个。
方法二:
每个分区 - 每个消费者 - 将记录发送到工作线程或线程池以写回数据库,然后提交偏移量并继续轮询。需要注意抵消管理。在这种情况下,不要等待消息写回数据库。继续轮询,将消息传递给工作线程。
关于他们两个的任何见解?
谢谢
最佳答案
方法一:
该方法仅适用于您可以估计消息处理时间的情况,否则不推荐使用。
问题:在这种方法中,主要问题是让消费者保持事件状态,如果您在再次调用 poll() 之前等待消息完全处理,则必须确保您的消费者在调用 poll() 之前应该处于事件状态,因为 kafka维护一个名为“session.timeout.ms”的属性。 kafka broker/cluster 对这个属性的值采取行动,如果消费者在“session.timeout.ms”的时间段内无法再次调用 poll(),broker 会标记消费者死亡,它会被踢出.现在,当消费者完成消息处理并再次调用 poll() 时,它被视为新加入者,并将再次像以前一样从偏移量开始提供记录集。牢记这种情况,消费者将陷入无限循环,永远不会继续其偏移量。
可能的解决方案 1:要使用这种方法,您需要具有以下副作用的以下属性“session.timeout.ms”的良好值(value):
1:值太低:消费者将被标记为死亡,如上所述,并且永远不会继续其偏移量,但是消息将被处理,但每次完成消息时,它将再次获得以前的消息+新消息。
2:值太高:Broker 检测消费者的真正故障会很晚,这将导致记录重复并影响整体吞吐量。
可能的解决方案 2:(仅对版本 0.10.1.x 有效) Kafka 在发布 (0.10.1.0) 中的官方修复。
在这种方法中,引入了两个值得注意的实体:一个新属性“max.poll.interval.ms”,它设置了客户端调用 poll() 之间的最大延迟,以及一个负责保持消费者事件的后台线程。因此,在某个场景中,当消费者调用方法 poll() 然后忙于消息处理时,内部后台线程将使心跳保持事件状态,因此消费者将保持事件状态。但是,这个内部后台线程本身将保持事件状态,直到属性“max.poll.interval.ms”的超时值保持有效。因此,该线程将等待消费者在“max.poll.interval.ms”的时间段内调用 poll() ,否则,它将发送离开请求并自行死亡。”
同样,这个解决方案中棘手的部分是找到这个属性的合适值:“max.poll.interval.ms”(非常重要,这个时间将是后台线程将保持心跳事件而不需要显式调用 poll())。
方法二:使用工作线程是一个好主意,但是您必须维护一个内部队列或对接收到的消息进行验证,这可能很复杂,而且您还需要针对自动提交使用手动提交。有关提交的更多信息,请参阅 this并搜索标题“提交和抵消”。
问题:在这种方法中,主要问题是跟踪收到的消息和成功处理的消息。因为,您的消费者将收到消息,它将消息传递给相应的工作线程,并将提交偏移量并继续接收更多消息。在此过程中,您必须注意以下问题:
关于apache-kafka - 多线程 Kafka Consumer 或 PerPartition-PerConsumer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40990856/
我已经开始研究 MassTransit 并正在编写将处理消息的类。当我从 Consumes 实现接口(interface)时我有四个选项:All , Selected , For和 Context .
我正在尝试找出消费者群体级别是否也有任何抵消。在 Kafka 中,Consumer Offset 是在 Consumer group 级别还是在该 consumer group 内的单个消费者? 最佳
我有一个我不理解的 java 编译器错误。看来消费者 和 Consumer(带有 T 扩展对象)在方法签名参数中不等效。请查看以下代码: import java.util.function.Consu
我在泛型方面遇到了一些麻烦,尽管找到了解决方法,但我不明白是什么阻止了我的代码编译。 我有一个显示 TreeTableView 的 JavaFX 项目:
C++11 标准定义了一个内存模型(1.7、1.10),其中包含内存排序,大致为“顺序一致”、“获取”、“消耗”、“释放”和“放松”。同样粗略地,一个程序只有在它是无种族的情况下才是正确的,如果所有
我有一个 kafka 主题,我正在通过 Kafka Producer 发送数据。现在,在消费者方面,我有两种选择。 1。使用 KafkaConsumer - 下面是 kafkaConsumer 的代码
我有四个当前消费者在 Amazon AWS 上收听同一个队列。从队列中拉取消息时,有时会出现同一条消息被两个不同的消费者消费的情况。请看下面的日志: 18:01:46,515 [jmsContaine
我正在设计一个系统,其中将有 n 个生产者和 m 个消费者,其中 n 和 m 是数字,n != m。 我想这样设计系统, 任何生产者在生产时不得阻止其他生产者 任何消费者都不应在消费时阻止其他消费者
我们有一个系统,我们希望将记录(例如联系人、客户、机会)从我们的系统推送到 SalesForce。 为此,我们使用了 ForceToolKit for .Net .我们成功地将联系人记录从我们的系统推
我怎样才能写一个方法来组合 Stream的 Consumers成单个 Consumer使用 Consumer.andThen(Consumer) ? 我的第一个版本是: Consumer combi
我需要开始使用 kafka。我很难理解消费者应该收到什么:据我了解,我们可以通过多种方式配置消费者: 示例 1: @KafkaListener(topics = "topic_name) public
我需要开始使用 kafka。我很难理解消费者应该收到什么:据我了解,我们可以通过多种方式配置消费者: 示例 1: @KafkaListener(topics = "topic_name) public
我正在尝试在我的 scala play 应用程序中创建消费者 secret / key 对,但我似乎无法让它正常工作。我有以下代码 import org.apache.commons.codec.bi
我通过传递用户(消费者)名称使用 .NET 应用程序,我需要从 Salesforce 检索消费者 key 和消费者 key ,我该如何实现。 最佳答案 Consumer Key 和 Consumer
我想设置 至 0 .这似乎是另一个问题 ( JMS queue with multiple consumers ) 的答案,并在此 article 中进行了描述。在第 17.1.1 章中。我使用 JN
I have send message api to my users.When I send to message from my x numbers I need to wait 10-15
我有一个 java Kafka 消费者,我在其中批量获取 ConsumerRecords 进行处理。示例代码如下- while (true) { ConsumerRecords records
我正在为 iPhone 编写 Twitter/Facebook 应用程序。我有自己的 Apache/PHP 服务器。我只想把Consumer Key放在app里,然后我把Consumer Secret
Spring AMQP:比较多个消费者与每个消费者多个线程的性能 我正处于从 Spring 文档学习 Spring AMQP 的阶段。我不清楚提高异步消息消费率的首选方法:根据 Spring 文档 (
我正在制作一个需要 oAuth 1.0 身份验证的应用程序。我可以访问客户提供的消费者 key 和消费者 secret 。我曾尝试使用 AFNetworking 进行此操作,但效果不佳。有人可以建议我
我是一名优秀的程序员,十分优秀!