gpt4 book ai didi

java - 在 1 毫秒内删除了 0 个过期的偏移量。 (kafka.coordinator.group.GroupMetadataManager)

转载 作者:行者123 更新时间:2023-12-02 06:25:32 28 4
gpt4 key购买 nike

我是卡夫卡新手。我正在尝试通过我的 Spring Boot 调用 kafka 消费者,并通过我的终端调用 kafka 生产者。

该代码以前运行良好,但最近我不断从日志表单消费者端获取此信息

2019-04-22 11:56:39.415  INFO 10253 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-04-22 11:56:39.416 INFO 10253 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
2019-04-22 11:56:39.424 INFO 10253 --- [ main] org.apache.kafka.clients.Metadata : Cluster ID: iR8AmeB9RC-eqS7rrFyYGw
2019-04-22 11:56:39.425 INFO 10253 --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=com.stellapps.rtc] Discovered group coordinator akash-Lenovo-ideapad-330-15IKB:9092 (id: 2147483647 rack: null)
2019-04-22 11:56:39.426 INFO 10253 --- [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=com.stellapps.rtc] Revoking previously assigned partitions []
2019-04-22 11:56:39.427 INFO 10253 --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=com.stellapps.rtc] (Re-)joining group
2019-04-22 11:56:42.164 INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:56:45.183 INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:56:48.201 INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:56:51.218 INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:56:54.235 INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:56:57.256 INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:57:00.276 INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:57:03.294 INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:57:06.315 INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing
2019-04-22 11:57:09.334 INFO 10253 --- [m.stellapps.rtc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=com.stellapps.rtc] Attempt to heartbeat failed since group is rebalancing

这是生产者方面的

2019-04-22 11:57:47,755] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-04-22 12:00:38,119] INFO [GroupCoordinator 0]: Member consumer-1-c8b9d4fd-d2e3-45a3-8f9a-2e825a9a87bd in group com.stellapps.rtc has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-04-22 12:00:38,123] INFO [GroupCoordinator 0]: Stabilized group com.stellapps.rtc generation 11 (__consumer_offsets-30) (kafka.coordinator.group.GroupCoordinator)
[2019-04-22 12:00:48,124] INFO [GroupCoordinator 0]: Member consumer-2-8f068bb6-d78c-458f-9775-d6b13ca54b57 in group com.stellapps.rtc has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-04-22 12:00:48,125] INFO [GroupCoordinator 0]: Preparing to rebalance group com.stellapps.rtc in state PreparingRebalance with old generation 11 (__consumer_offsets-30) (reason: removing member consumer-2-8f068bb6-d78c-458f-9775-d6b13ca54b57 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2019-04-22 12:00:48,125] INFO [GroupCoordinator 0]: Group com.stellapps.rtc with generation 12 is now empty (__consumer_offsets-30) (kafka.coordinator.group.GroupCoordinator)
[2019-04-22 12:07:47,755] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

消费者不接受来自生产者的任何输入。我尝试更改轮询值,但没有成功,消息不断出现,或者消费者根本不接受生产者的输入表单。

这是我的 Spring 启动代码。

import com.stellapps.rtc.reset.RTCInterpreter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.stellapps.rtc.reset.json.DataParser;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

import javax.annotation.PostConstruct;

@Component
public class DataConsumer {

@Autowired
private RTCInterpreter interpret;


private Consumer<Long, String> createConsumer(String topic) {
final String BOOTSTRAP_SERVERS = "localhost:9092";
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "com.stellapps.rtc"); // group Id of the consumer group (if a consumer
// group exists).
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(topic));
return consumer;
}

public void runConsumer(String topic) throws InterruptedException {
try {
final Consumer<Long, String> consumer = createConsumer(topic);

final int giveUp = 1000;
int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
if (consumerRecords.count() == 0) {
noRecordsCount++;
if (noRecordsCount > giveUp)
break;
else
continue;
}
consumerRecords.forEach(record -> {
// System.out.printf(" %s\n", record.value().getClass().getName());
interpret.call(record.value());
});
consumer.commitAsync();
}
consumer.close();
System.out.println("Kafka is closed");
} catch (Exception e) {
e.printStackTrace();
}
}

@PostConstruct
public void init() {
try {
Runnable r = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());

try {
runConsumer("consume");
} catch (InterruptedException e) {
e.printStackTrace();
}

}
};
Thread run = new Thread(r);
run.join();
run.start();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}

请帮助我。

最佳答案

问题是我也从 Application.java 调用 Consumer,因此我有 2 个 Kafka Consumer 实例,但我将组大小限制为 1。

生产者问题是由于我为生产者设置的属性造成的。

抱歉给您带来麻烦。

关于java - 在 1 毫秒内删除了 0 个过期的偏移量。 (kafka.coordinator.group.GroupMetadataManager),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55790274/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com