gpt4 book ai didi

java - Confluence Cloud Apache Kafka Consumer - 主题 [test-1] 不存在且 MissingTopicsFatal 为 true

转载 作者:行者123 更新时间:2023-12-01 19:38:19 25 4
gpt4 key购买 nike

我是一名新手,试图使用 Confluence Cloud Apache Kafka 在两个 Spring Boot 微服务之间进行通信。

在 Confluence Cloud 上使用 Kafka 时,在 ServiceA 将消息发布到主题后,我的消费者 (ServiceB) 上出现以下错误。但是,当我登录 Confluence Cloud 时,我看到该消息已成功发布到该主题。

 org.springframework.context.ApplicationContextException: Failed to start bean 
'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is
java.lang.IllegalStateException: Topic(s) [topic-1] is/are not present and
missingTopicsFatal is true

当我在本地服务器上运行 Kafka 时,我不会遇到此问题。 ServiceA 能够将消息发布到我的本地 Kafka 服务器上的主题,并且 ServiceB 能够成功使用该消息。

我在 application.properties 中提到了我的本地 Kafka 服务器配置(如注释掉的代码)

服务A:生产者

application.properties

app.topic=test-1
#Remote
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"

#Local
#ssl.endpoint.identification.algorithm=https
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN
#request.timeout.ms=20000
#bootstrap.servers=localhost:9092
#retry.backoff.ms=500
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

Sender.java

public class Sender {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Value("${app.topic}")
private String topic;

public void send(String data){
Message<String> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, topic)
.build();
kafkaTemplate.send(message);
}
}

KafkaProducerConfig.java

@Configuration
@EnableKafka
public class KafkaProducerConfig {

@Value("${bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}

}

服务 B:消费者

application.properties

app.topic=test-1
#Remote
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"

#Local
#ssl.endpoint.identification.algorithm=https
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN
#request.timeout.ms=20000
#bootstrap.servers=localhost:9092
#retry.backoff.ms=500
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

KafkaConsumerConfig.java

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "confluent_cli_consumer_040e5c14-0c18-4ae6-a10f-8c3ff69cbc1a"); // confluent cloud consumer group-id
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory(
consumerConfigs(),
new StringDeserializer(), new StringDeserializer());
}

@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

KafkaConsumer.java

@Service
public class KafkaConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaListener.class);

@Value("{app.topic}")
private String kafkaTopic;

@KafkaListener(topics = "${app.topic}", containerFactory = "kafkaListenerContainerFactory")
public void receive(@Payload String data) {
LOG.info("received data='{}'", data);
}
}

最佳答案

用户名和密码是 JAAS 配置的一部分,因此请将它们放在一行

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkaclient1" password="kafkaclient1-secret";

我还建议您验证您的属性文件是否已正确加载到客户端

关于java - Confluence Cloud Apache Kafka Consumer - 主题 [test-1] 不存在且 MissingTopicsFatal 为 true,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59192303/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com