gpt4 book ai didi

java - Kafka Producer - Spring Boot应用程序 - 无法生成消息

转载 作者:行者123 更新时间:2023-12-01 16:53:30 24 4
gpt4 key购买 nike

我也在学习 spring boot 和 kakfa。我进行了一些探索并配置了一个示例生产者应用程序,如下所示。但是我无法发布这些消息。如果我能得到有关这里遗漏的内容的帮助,那就太好了。我已经启动了zookeeper服务和kakfa服务并确保该主题可用。

Config:

import java.util.HashMap;
import java.util.Map;

import com.jpmorgan.sample.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaConfig {

@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);

return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public KafkaProducer sender() {
return new KafkaProducer();
}
}


Producer Class:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaProducer {

private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaProducer.class);

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void send(String payload) {
LOGGER.info("sending payload='{}'", payload);
kafkaTemplate.send("test", payload);
}
}

Sample Application Class:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaProducerSampleApplication {

public static void main(final String[] args) {
SpringApplication.run(KafkaProducerSampleApplication.class, args);
}

}

我刚刚运行 IntelliJ。我看到 moneta 启动已经启动

         main] n$WebApplicationLoggingAutoConfiguration : Enabled Moneta Request Logging with exclude-url-patterns: [], http.log-request-headers: [false], http.log-response-headers: [false], http.log-request-entity: [false], http.log-response-entity: [false] and http.max-entity-bytes: [1024]
2020-05-06 18:51:57.539 INFO 4952 --- [ main] c.j.m.b.a.s.cors.CorsAutoConfiguration : Moneta CORS has been disabled because neither [moneta.cors.allowed-origins] nor [moneta.cors.allowed-origins-regex] has been set
2020-05-06 18:51:57.628 INFO 4952 --- [ main] .m.b.a.a.MonetaActuatorAutoConfiguration : Enabled Moneta defaults for Spring Boot Actuator
2020-05-06 18:51:57.677 INFO 4952 --- [ main] o.s.boot.web.servlet.RegistrationBean : Filter corsFilter was not registered (disabled)
2020-05-06 18:51:57.860 INFO 4952 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-06 18:51:57.956 WARN 4952 --- [ main] c.j.m.b.startup.ApplicationInfoLoader : !!!! SEAL ID property [application.seal.id] should be provided as either as a system property or in the application properties file !!!!
2020-05-06 18:51:58.054 INFO 4952 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 4 endpoint(s) beneath base path '/actuator'
2020-05-06 18:51:58.111 INFO 4952 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2020-05-06 18:51:58.114 INFO 4952 --- [ main] c.j.s.KafkaProducerSampleApplication : Started KafkaProducerSampleApplication in 2.547 seconds (JVM running for 4.376)

最佳答案

试试这个方法您不需要在生产者配置中使用@EnableKafka进行注释

从 ProducerConfigs 中删除@Bean,然后尝试我会工作

    public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

关于java - Kafka Producer - Spring Boot应用程序 - 无法生成消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61636349/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com