任何人都可以建议我们需要设置哪些属性才能使用 java KafkaProducer 将消息发送到 SSL 保护的 kafka 主题,我是 kafka 的新手,无法将一条消息发送到受 SSL 保护的 kafka
我假设您已经知道如何为 SSL 配置 Kafka。您需要为 SSL 加密和 SSL 身份验证添加配置设置。基本上,这是一个基本的生产者结构。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
// configure the following three settings for SSL Authentication
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
TestCallback callback = new TestCallback();
Random rnd = new Random();
for (long i = 0; i < 100 ; i++) {
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
"test-topic", "key-" + i, "message-"+i );
producer.send(data, callback);
}
producer.close();
我是一名优秀的程序员,十分优秀!