gpt4 book ai didi

java - Producer.send 不接受 KeyedMessage 类型

转载 作者:行者123 更新时间:2023-12-01 09:39:29 27 4
gpt4 key购买 nike

我尝试运行此代码,但它不起作用,因为 Producer.send() 不接受 KeyedMessage 类型。

我尝试导入 kafka.javaapi. Producer.Producer 而不是 kafka. Producer.Producer ;但还是不行

代码是:

package sources;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

//import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
//import kafka.producer.Producer;

public class ProducerCode {

private static Producer<Integer, String> producer;
private static final String topic= "mytopic";

public void initialize() {
Properties producerProps = new Properties();
producerProps.put("metadata.broker.list", "localhost:9092");
producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
producerProps.put("request.required.acks", "1");
// ProducerConfig producerConfig = new ProducerConfig(producerProps);
// have a change here **
producer = new Producer<Integer, String>(new ProducerConfig(producerProps));
}

public void publishMesssage() throws Exception{
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true){
System.out.print("Enter message to send to kafka broker (Press 'Y' to close producer): ");
String msg = null;
msg = reader.readLine(); // Read message from console
//Define topic name and message
KeyedMessage<Integer, String> keyedMsg = new KeyedMessage<Integer, String>(topic, msg);

producer.send(keyedMsg);
// producer.send(keyedMsg); // This publishes message on given topic

if("Y".equals(msg)){ break; }
System.out.println("--> Message [" + msg + "] sent.Check message on Consumer's program console");
}
return;
}

public static void main(String[] args) throws Exception {

KafkaProducer kafkaProducer = new KafkaProducer();
// Initialize producer
kafkaProducer.initialize();
// Publish message
kafkaProducer.publishMesssage();
//Close the producer
producer.close();
}
}

最佳答案

您必须使用ProducerRecord (而不是 KeyedMessage )与构造函数 ProducerRecord(String topic, K key, V value)

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("my-topic", "key", "value"));

参见https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

关于java - Producer.send 不接受 KeyedMessage 类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38578361/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com