gpt4 book ai didi

apache-kafka - Kafka 消费者订阅与分配的分区

转载 作者:行者123 更新时间:2023-12-04 23:41:49 25 4
gpt4 key购买 nike

卡夫卡让我很困惑。我正在使用标准值在本地运行它。
仅打开自动创建主题。 1 个分区,1 个节点,一切都是本地的和简单的。
如果写

consumer.subscribe("test_topic");
consumer.poll(10);

它根本行不通,也永远找不到任何数据。
如果我改为分配一个分区
consumer.assign(new TopicPartition("test_topic",0));

并检查我坐在 995 的位置。现在可以轮询和接收我的生产者输入的所有数据。

我对订阅有什么不了解?我不需要多个消费者,每个消费者只处理一部分数据。我的消费者需要获取某个主题的所有数据。为什么所有教程中显示的标准订阅方法对我不起作用?
我确实理解分区用于负载平衡消费者。我不明白订阅有什么问题。
consumer config properties
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "postproc-" + EnvUtils.getAppInst()); // jeder ist eine eigene gruppe -> kriegt alles
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<Long, byte[]>(props);

producer config
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 2);
props.put("batch.size", 16384);
props.put("linger.ms", 5000);
props.put("buffer.memory", 1024 * 1024 * 10); // 10mb
props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer(props);

producer execution
try (ByteArrayOutputStream out = new ByteArrayOutputStream()){
event.writeDelimitedTo(out);
for (long a = 10; a<20;a++){
long rand=new Random(a).nextLong();
producer.send(new ProducerRecord<>("test_topic",rand ,out.toByteArray()));
}
producer.flush();
}catch (IOException e){

消费者执行
consumer.subscribe(Arrays.asList("test_topic"));
ConsumerRecords<Long,byte[]> records = consumer.poll(10);
for (ConsumerRecord<Long,byte[]> r :records){ ...

最佳答案

我设法解决了这个问题。问题是超时。打桩时我没有给它足够的时间来完成。我认为分配分区要快得多,因此可以及时完成。标准订阅轮询需要更长的时间。从未真正完成,也没有提交。
至少我认为那是问题所在。超时时间更长,它的工作原理。

关于apache-kafka - Kafka 消费者订阅与分配的分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35154675/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com