- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们一直在尝试创建一个 kafka 消费者,它尝试在来自其他 kafka 集群的 60 个分区中以大约 2.7tb/小时的速度使用数据。
到目前为止,我们已经设法消耗了大约 2tb 的数据/小时并且无法 catch 目标(2.7)。
我们正在消费的集群具有存储问题的数据保留/删除率,因此我们需要在 3 分钟内使用该数据量。
细节,
我们在 6 台机器上使用 60 个分区的数据。
import java.io.*;
import java.net.InetSocketAddress;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.*;
import javax.json.*;
import java.sql.Timestamp;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.protobuf.util.JsonFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.hadoop.security.UserGroupInformation;
public class NotificationConsumerThread implements Runnable {
private final KafkaConsumer<byte[], byte[]> consumer;
private final String topic;
public NotificationConsumerThread(String brokers, String groupId, String topic) {
Properties prop = createConsumerConfig(brokers, groupId);
this.consumer = new KafkaConsumer<>(prop);
this.topic = topic;
this.consumer.subscribe(Arrays.asList(this.topic));
}
private static Properties createConsumerConfig(String brokers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "120000");
props.put("request.timeout.ms", "120001");
props.put("max.poll.records", "280000");
props.put("fetch.min.bytes", "1");
props.put("max.partition.fetch.bytes", "10000000");
props.put("auto.offset.reset", "latest");
props.put("receive.buffer.bytes", "15000000");
props.put("send.buffer.bytes", "1500000");
props.put("heartbeat.interval.ms", "40000");
// props.put("max.poll.interval.ms", "420000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return props;
}
@Override
public void run() {
try {
Configuration confHadoop = new Configuration();
confHadoop.addResource(new Path("redacted"));
confHadoop.addResource(new Path("redacted"));
confHadoop.setBoolean("dfs.support.append" ,true);
confHadoop.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
confHadoop.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
confHadoop.set("hadoop.security.authentication","kerberos");
confHadoop.set("dfs.namenode.kerberos.principal.pattern", "redacted");
UserGroupInformation.setConfiguration(confHadoop); UserGroupInformation.loginUserFromKeytab("redacted", "redacted");
FileSystem fileHadoop1 = FileSystem.get(confHadoop);
StringBuffer jsonFormat3 = new StringBuffer();
while (true) {
String jsonFormat;
String jsonFormat1;
String jsonFormat2;
DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH");
dateFormat.toString();
Date date = new Date();
ConsumerRecords<byte[], byte[]> records = consumer.poll(3000);
for (ConsumerRecord<byte[], byte[]> record : records) {
FlowOuterClass.Flow data = FlowOuterClass.Flow.parseFrom(record.value());
jsonFormat = JsonFormat.printer().print(data);
jsonFormat1 = jsonFormat.replaceAll("\\n", "");
JsonObject jsonObject1 = Json.createReader(new StringReader(jsonFormat1)).readObject();
Timestamp ts = new Timestamp(Long.parseLong(jsonObject1.getString("xxxx")));
date = new Date(ts.getTime());
jsonFormat2 = jsonFormat1.substring(0, jsonFormat1.length() - 1) + ", " + "\"xxxxx\"" + ": " + "\"" + dateFormat.format(date) + "\"" + "}\n";
jsonFormat3.append(jsonFormat2);
}
String jsonFormat4 = jsonFormat3.toString();
if(jsonFormat4.length()>100000000) {
FSDataOutputStream stream = fileHadoop1.create(new Path("redacted-xxxxx" + dateFormat.format(date) + "/" + UUID.randomUUID().toString() + ".json"));
stream.write(jsonFormat4.getBytes());
stream.close();
jsonFormat3.delete(0, jsonFormat3.length());
}
}
} catch (Exception e) {
System.out.println(e);
}
consumer.close();
}
}
这是滞后状态:
最佳答案
您可以尝试做一些事情,看看您是否能够以最小的延迟 catch 生成消息的原始速率。
max.poll.records
来增加轮询时的记录数。至280000
.请注意,此配置 只会工作当您以类似的方式调整其他两个配置时。您需要更改 max.partition.fetch.bytes
和 fetch.max.bytes
以成比例的速度。我看到你试图改变max.partition.fetch.bytes
至10000000
(10MB) 大约您还应该考虑调整此值 fetch.max.bytes
.因此,简而言之,您需要以正确的比例调整所有这些值。请仔细阅读,您可能会发现这很有用。 increase number of message in the poll 关于java - 卡夫卡消费者滞后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62796034/
我使用 streamsBuilder.table("myTopic") 为某个主题创建了一个 Ktable,并将其具体化为状态存储,以便我可以使用交互式查询. 每小时,我都想从该状态存储(以及关联的变
我正在kafka中进行数据复制。但是,kafka 日志文件的大小增长得非常快。一天大小达到 5 GB。作为这个问题的解决方案,我想立即删除已处理的数据。我正在 AdminClient 中使用删除记录方
我实际上正在使用 SASL 纯文本设置简单的 Kafka 身份验证并添加 ACL 授权。但是当我尝试使用数据时遇到问题。 [main] INFO org.apache.kafka.common.uti
我正在一个使用 Kafka 和 Akka Streams 的项目 reactive-kafka连接器。我们发现reactive-kafka使用它自己的调度程序(akka.kafka.default-d
我试图在HDP上运行简单的kafka生产者消费者示例,但面临以下异常。 [2016-03-03 18:26:38,683] WARN Fetching topic metadata with corr
我继承了一些正在实现到另一个项目中的 Kafka 代码,并遇到了一个问题...消费者收到来自生产者的 3995 条消息后,它崩溃并给出以下错误: ERROR Error while accepting
我正在尝试测试 Flink 程序以使用此 JSONKeyValueDeserializationSchema 类读取来自 Kafka 的 JSON 数据。但是我的 Intellij 没有找到这个类。我
我有一个简单的生产者-消费者设置:1 个生产者(作为一个线程)和 2 个消费者(作为 2 个进程)。生产者的run方法: def run(self): producer = K
我正在使用“node-rdkafka”npm 模块来构建用 Nodejs 编写的分布式服务架构。我们有一个计量用例,其中我们只允许每 n 秒消耗和处理一定数量的消息。例如,“主”主题有 100 条由生
我正在学习 Kafka,我想知道当我消费来自主题的消息时如何指定然后分区。 我找到了几张这样的图片: 这意味着一个消费者可以消费来自多个分区的消息,但一个分区只能由单个消费者(在消费者组内)读取。 此
我想从flink读取一个kafka主题 package Toletum.pruebas; import org.apache.flink.api.common.functions.MapFunctio
我阅读了 Kafka 网站上的文档,但是在尝试实现一个完整的最小示例(生产者 --> kafka --> 消费者)之后,我不太清楚“消费者状态”如何处理偏移量。 一些信息 我正在使用高级 API (J
刚开始使用Spring Kafka(2.1.4.RELEASE)和Kafka(1.0.0),但是当我添加事务时,处理速度降低了很多。 代码: spring.kafka.consumer.max-pol
我尝试在安全模式下使用kafka(0.9.1)。我会使用 Spark 读取数据,因此我必须将 JAAS conf 文件传递给 JVM。我使用这个 cmd 来开始我的工作: /opt/spa
目标:读取主题中的所有消息,然后终止进程。 我能够连续阅读以下消息: props.put("bootstrap.servers", kafkaBootstrapSrv); props.put("gro
我写了一个非常简单的 Flink 流作业,它使用 FlinkKafkaConsumer082 从 Kafka 获取数据。 protected DataStream getKafkaStream(Str
我使用的是kafka 2.10-0.9.0.1当我通过命令删除主题时,主题被标记为删除。 bin/kafka-topics.sh --zookeeper localhost:2181 --delete
当我实例化一个 Kafka 消费者时 KafkaConsumer consumer = new KafkaConsumer(props); 我收到这条消息 SLF4J: Failed to load
我有一个用例,我需要 100% 的可靠性、幂等性(无重复消息)以及我的 Kafka 分区中的顺序保留。我正在尝试使用事务 API 来设置概念证明来实现这一点。有一个名为“isolation.level
我们有一个 3 主机的 Kafka 集群。我们有 136 个主题,每个主题有 100 个分区,复制因子为 3。这使得我们的集群中有 13,600 个分区。 这是我们主题的合理配置吗? 最佳答案 太多了
我是一名优秀的程序员,十分优秀!