gpt4 book ai didi

java - 通过两个进程访问Kafka Topic

转载 作者:行者123 更新时间:2023-12-02 09:44:33 30 4
gpt4 key购买 nike

我有一个 Kafka 生产者类,运行良好。生产者填充了 Kafka 主题。其代码如下:

public class kafka_test {
private final static String TOPIC = "flinkTopic";
private final static String BOOTSTRAP_SERVERS = "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092";
public FlinkKafkaConsumer<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup) {
// ************************** KAFKA Properties ******
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
topic, new SimpleStringSchema(), props);
myconsumer.setStartFromLatest();
return myconsumer;
}
private static Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "MyKafkaProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}

public void runProducer(String msg) throws Exception {
final Producer<Long, String> producer = createProducer();

try {
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, msg );
RecordMetadata metadata = producer.send(record).get();
System.out.printf("sent record(key=%s value='%s')" + " metadata(partition=%d, offset=%d)\n",
record.key(), record.value(), metadata.partition(), metadata.offset());
} finally {
producer.flush();
producer.close();
}
}
}

public class producerTest {
public static void main(String[] args) throws Exception{
kafka_test objKafka=new kafka_test();
String pathFile="/home/cfms11/IdeaProjects/pooyaflink2/KafkaTest/quickstart/lastDay4.csv";
String delimiter="\n";
objKafka.createStringProducer("flinkTopic",
"10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
Scanner scanner = new Scanner(new File(pathFile));
scanner.useDelimiter(delimiter);
int i=0;
while(scanner.hasNext()){
if (i==0)
TimeUnit.MINUTES.sleep(1);
objKafka.runProducer(scanner.next());
i++;
}
scanner.close();
}
}

因为,我想为我的Flink程序提供数据,所以,我使用Kafka。事实上,我有这部分代码来消费来自 Kafka 主题的数据:

    Properties props = new Properties();
props.setProperty("bootstrap.servers",
"10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
props.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
"flinkTopic", new SimpleStringSchema(), props);
DataStream<String> text = env.addSource(myconsumer).setStartFromEarliest());

我想在我的程序运行的同时运行生产者代码。我的目标是生产者向主题发送一条记录,而消费者可以同时从主题轮询该记录。

请您告诉我这如何可能以及如何管理它。

最佳答案

我认为你需要创建两个类文件,一个是生产者,另一个是消费者。先创建topic再运行consumer,或者直接运行producer。

关于java - 通过两个进程访问Kafka Topic,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56765960/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com