gpt4 book ai didi

java - Kafka Streaming 不适用于多个实例

转载 作者:塔克拉玛干 更新时间:2023-11-01 22:21:53 24 4
gpt4 key购买 nike

当我运行 Kafka Streams 应用程序的多个实例时,只有第一个实例正确接收消息。但是,如果我启动新实例,它们将收不到任何消息。

有什么解决这个问题的建议吗?

这是我的 Kafka 流媒体应用

package test.kafkastream;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;

public class Main {

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
//props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-wordcount-processor");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.38:45983,192.168.2.112:45635,192.168.2.116:39571");
//props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);


// setting offset reset to earliest so that we can re-run the demo code
// with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

TopologyBuilder builder = new TopologyBuilder();

builder.addSource("Source", "topic6");

builder.addProcessor("Process", new ProcessMessage(), "Source");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}

}

这是我的制作人

package test.kafkamesos;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public class Producer {

public static void main(String[] args) throws InterruptedException, ExecutionException {
Map<String, Object> producerConfig = new HashMap<String, Object>();
producerConfig.put("bootstrap.servers", "192.168.2.38:45983,192.168.2.112:45635,192.168.2.116:39571");
//producerConfig.put("bootstrap.servers", "localhost:9092");

// optional:
producerConfig.put("metadata.fetch.timeout.ms", "3000");
producerConfig.put("request.timeout.ms", "3000");
// ... other options:
// http://kafka.apache.org/documentation.html#producerconfigs
ByteArraySerializer serializer = new ByteArraySerializer();
KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[], byte[]>(producerConfig, serializer,
serializer);

int i = 0;
while (true) {
String message = "{data:success,g:" + i + "}";
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>("topic6", message.getBytes());
kafkaProducer.send(record).get();
System.out.println("sending " + message);
Thread.sleep(1000);
i++;
}
}
}

和我的 Dockerfile

FROM openjdk:8-jre
COPY ./target/*-with-dependencies.jar /jars/service-jar.jar
CMD java -cp /jars/service-jar.jar test.kafkastream.Main

最佳答案

我认为您遇到此问题是因为 Kafka 代理仅针对您正在使用的主题 (topic6) 配置了一个分区。来自 Confluent 博客:

For example, if your application reads from a single topic that has 10 partitions, then you can run up to 10 instances of your applications (note that you can run further instances but these will be idle). In summary, the number of topic partitions is the upper limit for the parallelism of your Streams API application and thus for the number of running instances of your application.

来源:https://www.confluent.io/blog/elastic-scaling-in-kafka-streams/

关于java - Kafka Streaming 不适用于多个实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43440611/

24 4 0