gpt4 book ai didi

java - 卡夫卡消费者滞后

转载 作者:行者123 更新时间:2023-12-02 19:12:31 25 4
gpt4 key购买 nike

我们一直在尝试创建一个 kafka 消费者,它尝试在来自其他 kafka 集群的 60 个分区中以大约 2.7tb/小时的速度使用数据。
到目前为止,我们已经设法消耗了大约 2tb 的数据/小时并且无法 catch 目标(2.7)。
我们正在消费的集群具有存储问题的数据保留/删除率,因此我们需要在 3 分钟内使用该数据量。
细节,
我们在 6 台机器上使用 60 个分区的数据。

import java.io.*;
import java.net.InetSocketAddress;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.*;
import javax.json.*;
import java.sql.Timestamp;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.protobuf.util.JsonFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.hadoop.security.UserGroupInformation;

public class NotificationConsumerThread implements Runnable {

private final KafkaConsumer<byte[], byte[]> consumer;
private final String topic;

public NotificationConsumerThread(String brokers, String groupId, String topic) {
Properties prop = createConsumerConfig(brokers, groupId);
this.consumer = new KafkaConsumer<>(prop);
this.topic = topic;
this.consumer.subscribe(Arrays.asList(this.topic));
}

private static Properties createConsumerConfig(String brokers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "120000");
props.put("request.timeout.ms", "120001");
props.put("max.poll.records", "280000");
props.put("fetch.min.bytes", "1");
props.put("max.partition.fetch.bytes", "10000000");
props.put("auto.offset.reset", "latest");
props.put("receive.buffer.bytes", "15000000");
props.put("send.buffer.bytes", "1500000");
props.put("heartbeat.interval.ms", "40000");
// props.put("max.poll.interval.ms", "420000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

return props;
}



@Override
public void run() {
try {
Configuration confHadoop = new Configuration();

confHadoop.addResource(new Path("redacted"));
confHadoop.addResource(new Path("redacted"));
confHadoop.setBoolean("dfs.support.append" ,true);



confHadoop.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
confHadoop.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
confHadoop.set("hadoop.security.authentication","kerberos");
confHadoop.set("dfs.namenode.kerberos.principal.pattern", "redacted");
UserGroupInformation.setConfiguration(confHadoop); UserGroupInformation.loginUserFromKeytab("redacted", "redacted");

FileSystem fileHadoop1 = FileSystem.get(confHadoop);
StringBuffer jsonFormat3 = new StringBuffer();

while (true) {
String jsonFormat;
String jsonFormat1;
String jsonFormat2;


DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH");
dateFormat.toString();

Date date = new Date();


ConsumerRecords<byte[], byte[]> records = consumer.poll(3000);


for (ConsumerRecord<byte[], byte[]> record : records) {


FlowOuterClass.Flow data = FlowOuterClass.Flow.parseFrom(record.value());
jsonFormat = JsonFormat.printer().print(data);
jsonFormat1 = jsonFormat.replaceAll("\\n", "");


JsonObject jsonObject1 = Json.createReader(new StringReader(jsonFormat1)).readObject();
Timestamp ts = new Timestamp(Long.parseLong(jsonObject1.getString("xxxx")));


date = new Date(ts.getTime());
jsonFormat2 = jsonFormat1.substring(0, jsonFormat1.length() - 1) + ", " + "\"xxxxx\"" + ": " + "\"" + dateFormat.format(date) + "\"" + "}\n";
jsonFormat3.append(jsonFormat2);

}

String jsonFormat4 = jsonFormat3.toString();

if(jsonFormat4.length()>100000000) {
FSDataOutputStream stream = fileHadoop1.create(new Path("redacted-xxxxx" + dateFormat.format(date) + "/" + UUID.randomUUID().toString() + ".json"));
stream.write(jsonFormat4.getBytes());
stream.close();



jsonFormat3.delete(0, jsonFormat3.length());
}
}



} catch (Exception e) {
System.out.println(e);
}
consumer.close();
}
}
这是滞后状态:
enter image description here
我们在互联网上找不到任何解决方案,因此我们很高兴知道如何与 kafka 消费者一起使用这些大量数据的最佳实践。
谢谢!

最佳答案

您可以尝试做一些事情,看看您是否能够以最小的延迟 catch 生成消息的原始速率。

  • 您应该增加消费者组中的消费者数量。根据您发布的图片,我可以看到有 10 个消费者在 6 台机器上运行。如果您的机器是 能够运行更多的消费者 ,那么您可能应该考虑增加消费者的数量。请注意,如果您可以将消费者数量增加到 12、15、20、30 中的任何一个,则效果会更好。这是因为我们想要 所有消费者获得相等数量的分区从话题。所以这个想法是消费者的数量应该是 60 倍。(你正在消费的主题中的分区数量)
  • 您试图通过更改 max.poll.records 来增加轮询时的记录数。至280000 .请注意,此配置 只会工作当您以类似的方式调整其他两个配置时。您需要更改 max.partition.fetch.bytesfetch.max.bytes以成比例的速度。我看到你试图改变max.partition.fetch.bytes10000000 (10MB) 大约您还应该考虑调整此值 fetch.max.bytes .因此,简而言之,您需要以正确的比例调整所有这些值。请仔细阅读,您可能会发现这很有用。 increase number of message in the poll
  • 如果上述两种方法不起作用,这是您可以考虑的最后一步。因为我们知道 Kafka 中的分区决定并行度 你可以实现的。您可以考虑增加您正在消费的主题中的分区数(将其更改为 120 或从当前 60 个分区中更大的数字)

  • 我希望这有帮助。

    关于java - 卡夫卡消费者滞后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62796034/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com