gpt4 book ai didi

java - 卡夫卡1.0流媒体API : message consumption from partitions get delayed

转载 作者:太空宇宙 更新时间:2023-11-04 10:42:36 25 4
gpt4 key购买 nike

最近,我将我们的流应用程序从 Spark-streaming 2.1 切换为使用 kafka-streaming 新 API (1.0) 和 kafka 代理服务器 0.11.0.0

我已经实现了自己的Processor类,并且在process方法中,我只是打印了消息内容。

我有一个由 3 台机器组成的 kafka 集群,我挂接的主题有 300 个分区。

我在一台具有 32 GB RAM 和 8 个内核的机器上运行了具有 100 个线程的流应用程序。

我的问题是,在某些情况下,我在消息到达kafka主题/分区后收到消息,而在其他情况下,我在消息到达主题后10-15分钟收到消息,不知道为什么!

我使用下面的命令行来跟踪流应用程序的 group.id 的 kafka 主题上的延迟。

./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --new-consumer --describe --group kf_streaming_gp_id

但不幸的是,它并不能始终给出准确的结果,甚至根本不能给出结果,有人知道为什么吗?

流媒体应用程序是否有我错过的东西,以便我可以在到达分区后一致地读取消息?任何消费者属性都可以解决此类问题。

我的 kafka-streaming 应用程序结构如下:

        Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kf_streaming_gp_id");
config.put(StreamsConfig.CLIENT_ID_CONFIG, "kf_streaming_gp_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, DocumentSerde.class);
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimeExtractor.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 100);

KStream<String, Document> topicStreams = builder.stream(sourceTopic);

topicStreams.process(() -> new DocumentProcessor(appName, environment, dimensions, vector, sinkTopic));

KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();

最佳答案

我找出了我的案例中的问题所在。

事实证明,有一些线程被困在执行高CPU密集型工作,这导致阻止其他线程消费消息,这就是为什么我看到这样的突发,当我停止这个CPU密集型逻辑时,一切都非常快,消息一旦到达kafka主题就进入流作业。

关于java - 卡夫卡1.0流媒体API : message consumption from partitions get delayed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48789973/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com