gpt4 book ai didi

java - 并行 KafkaStream 处理的更好方法?

转载 作者:行者123 更新时间:2023-11-30 06:03:45 27 4
gpt4 key购买 nike

下面是我的代码片段。我想并行kafka流处理。但我不想放入 Runnable,也不想多次启动这个应用程序。

有没有类似streams.parallel()的方法?

            final Serde<String> stringSerde = Serdes.String();
Consumed<String, String> types = Consumed.with(stringSerde, stringSerde);
//create StreamFactory
StreamsBuilder builder = new StreamsBuilder();
//read message from topic
KStream<String, String> xmlMessages = builder.stream("from_topic", types);

//select matched messages
KStream<String, String> matchedMessages = xmlMessages.filter((key, xmlMessageValue) -> {
//here does the filter tasks
});

//dispatch matched message to destination topic
matchedMessages.to("to_topic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

最佳答案

您可以通过将 num.stream.threads 设置为大于默认值 1 的值来使用多个线程运行 Stream。

多线程将由 Kafka 内部处理,无需更改应用程序代码(启动额外的流或可运行程序)。

但请注意

  • 您使用的线程数不能多于它们所使用的主题中的分区数。在多个线程之间分配工作的方式与启动 Stream 的多个实例完全相同(即主题分区在它们之间平均分配)。
  • 在同一个 JVM 中拥有多个线程(而不是使用相同的代码启动多个 JVM)不会为您提供故障转移/重新平衡功能(所有这些线程很可能会一起生存和消亡)。

关于java - 并行 KafkaStream 处理的更好方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51767072/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com