gpt4 book ai didi

Spring Boot Kafka Streams - 绑定(bind)问题

转载 作者:行者123 更新时间:2023-12-05 02:16:07 24 4
gpt4 key购买 nike

我正在尝试使用此示例基于 Kafka Streams 构建一个简单的流应用程序。

Word Count

但是,当我启动该应用程序时,出现以下错误:有人可以指出我在这里遗漏了什么吗?这是代码、配置和错误

@SpringBootApplication
@Slf4j
@EnableScheduling
@EnableBinding(PersonBinding.class)
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

@Component
public static class PersonSource {

private final MessageChannel personOut;

@Autowired
PersonSource(PersonBinding personBinding) {

this.personOut = personBinding.personOut();
}

@Scheduled(fixedDelay = 5000L)
public void run() {

Message<Person> message = MessageBuilder
.withPayload(new Person("John", "Doe", Instant.now()))
.build();

try {

personOut.send(message);

log.info("Published message: {}", message);
} catch (Exception e) {

e.printStackTrace();
throw e;
}
}
}

@Component
public static class PersonProcessor {

@StreamListener
public void process(@Input(PersonBinding.PERSON_IN) KStream<String, Person> events) {

events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
}
}
}

@Data
@AllArgsConstructor
class Person {

String firstName;

String lastName;

Instant createdOn;
}

interface PersonBinding {

String PERSON_IN = "pin";

String PERSON_OUT = "pout";

@Output(PERSON_OUT)
MessageChannel personOut();

@Input(PERSON_IN)
KStream<String, Person> personIn();
}

依赖管理(Spring Boot 1.5.13.RELEASE)

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>

配置

# Default Configuration
spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
# Out Bindings Configuration
spring.cloud.stream.bindings.pout.destination=pout
spring.cloud.stream.bindings.pout.producer.header-mode=raw
# In Bindings Configuration
spring.cloud.stream.bindings.pin.destination=pout
spring.cloud.stream.bindings.pin.consumer.header-mode=raw

错误

Field configurationProperties in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration required a single bean, but 2 were found:
- spring.cloud.stream.kafka.binder-org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties: a programmatically registered singleton - binderConfigurationProperties: defined by method 'binderConfigurationProperties' in class path resource [org/springframework/cloud/stream/binder/kstream/config/KStreamBinderSupportAutoConfiguration.class]


Action:

Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed

** 编辑 1 **

已上传代码到Github https://github.com/tapitoe/demo-spring-cloud-streams/tree/master/src

最佳答案

我遇到了类似的问题,添加后修复了:

    <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

关于Spring Boot Kafka Streams - 绑定(bind)问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50693858/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com