gpt4 book ai didi

java - Spring 的卡夫卡。未启动 EmbeddedKafkaBroker

转载 作者:行者123 更新时间:2023-12-05 03:54:33 24 4
gpt4 key购买 nike

我正在编写 Kafka Broker 和 Consumer 代码来捕获来自应用程序的消息。尝试从Consumer获取消息时,发生错误

java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:303)
at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:280)

在应用端(Producer),同样出现连接错误

2020-03-25 12:29:33.689  WARN 25786 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=tx0] Connection to node -1 (<here broker hostname>:9092) could not be established. Broker may not be available.

我的项目有以下依赖:

compile "org.springframework.kafka:spring-kafka-test:2.4.4.RELEASE"
compile "org.springframework.kafka:spring-kafka:2.4.4.RELEASE"

我的Kafka Broker代码

public class KafkaServer {

private static final String BROKERPORT = "9092";
private static final String BROKERHOST = "localhost";
public static final String TOPIC1 = "fss-fsstransdata";
public static final String TOPIC2 = "fss-fsstransscores";
public static final String TOPIC3 = "fss-fsstranstimings";
public static final String TOPIC4 = "fss-fssdevicedata";
@Getter
private Consumer<String, String> consumer;

private EmbeddedKafkaBroker embeddedKafkaBroker;

public void run() {

String[] topics = {TOPIC1, TOPIC2, TOPIC3, TOPIC4};

this.embeddedKafkaBroker = new EmbeddedKafkaBroker(
1,
false,
1,
topics
).kafkaPorts(BROKERPORT);

Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", this.embeddedKafkaBroker));
this.consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();

this.consumer.subscribe(Arrays.asList(topics));
}
}

请帮助处理这种情况。我不擅长 kafka 架构以及如何在 Spring 上实现它。

最佳答案

EmbeddedKafkaBroker 旨在从 Spring 应用程序上下文或 JUnit4 @Rule@ClassRule 或 JUnit5 条件

要在这些环境之外使用它,您必须调用 afterPropertiesSet() 对其进行初始化,并调用 destroy() 将其关闭。

关于java - Spring 的卡夫卡。未启动 EmbeddedKafkaBroker,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60850710/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com