gpt4 book ai didi

apache-kafka - 为什么 Kafka Consumer 不断收到相同的消息(offset)

转载 作者:行者123 更新时间:2023-12-04 02:13:38 25 4
gpt4 key购买 nike

我有一个 SOAP Web 服务,它发送 kafka 请求消息并等待 kafka 响应消息(例如 consumer.poll(10000))。

每次调用 Web 服务时,它都会创建一个新的 Kafka 生产者和一个新的 Kafka 消费者。

每次我调用网络服务时,消费者都会收到相同的消息(例如具有相同偏移量的消息)。

我正在使用 Kafka 0.9 并且启用了自动提交并且自动提交频率为 100 毫秒。

对于我在其自己的 Callable 中处理的 poll() 方法返回的每个 ConsumerRecord,例如

ConsumerRecords<String, String> records = consumer.poll(200);

for (ConsumerRecord<String, String> record : records) {

final Handler handler = new Handler(consumerRecord);
executor.submit(handler);

}

为什么我总是一遍又一遍地收到相同的消息?

更新 0001

metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class com.kafka.MDCDeserializer
group.id = group-A.group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class com.kafka.UUIDDerializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXTSASL
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = IbmX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest

最佳答案

基于您显示的代码。我认为你的问题是新的 Consumer 是单线程的。如果您进行一次投票,然后不再进行另一次投票,则 auto.commit.offset 将无法正常工作。

尝试将您的代码放在一个 while 循环中,看看您何时再次轮询偏移量将被提交。

关于apache-kafka - 为什么 Kafka Consumer 不断收到相同的消息(offset),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36007141/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com