gpt4 book ai didi

java - 消息未传递到 Kafka 主题

转载 作者:行者123 更新时间:2023-12-01 18:22:35 27 4
gpt4 key购买 nike

我正在开发一个简单的应用程序,使用 Spring Boot 和 Kafka 将对象保存到 Kafka 主题,然后有一个消费者将监听该记录并将其添加到内存数据库中。经过以下配置后,Kafka 和 Spring Boot 都可以工作,但对 Kafka 的 API 调用将挂起一段时间,并且 Kafka 主题中也不会有任何消息。

我按照以下教程进行操作

https://www.confluent.io/blog/apache-kafka-spring-boot-application/

但是我稍微改变了我的服务。

package com.dilshan.shoppingcart.cart;

import org.springframework.stereotype.Service;

import java.io.IOException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

@Service
public class ShoppingService {
private static final String TOPIC = "users";

@Autowired
private KafkaTemplate<String, ShoppingCart> kafkaTemplate;

private ShoppingCartRepository cartRepository;

public void publish(ShoppingCart cart) {
this.kafkaTemplate.send(TOPIC, cart);
}

@KafkaListener(topics = "users", groupId = "group_id")
public void subscribe(ShoppingCart cart) throws IOException {
System.out.println("Hello-"+cart.getId());
cartRepository.save(cart);

}

}

这是我的 application.properties 文件

spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.h2.console.enabled=true
spring.h2.console.path=/h2
server.port=8080
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.listener.missing-topics-fatal=false

这是我用来访问 kafka 方法的 Controller 。

package com.dilshan.shoppingcart.cart;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ShoppingCartController {

@Autowired
private ShoppingService shoppingService;

@RequestMapping(method = RequestMethod.POST, value = "/shoppingcart")
private ResponseEntity<ShoppingCart> selectProducts( @RequestBody ShoppingCart cart) {
shoppingService.publish(cart);
return ResponseEntity.ok().build();
}

}

并且以下日志不断显示在应用程序日志中。

2020-02-18 19:22:12.951[0;39m [33m WARN[0;39m [35m6986[0;39m [2m---[0;39m [2m[ntainer#0-0-C-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-1, groupId=group_id] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2m2020-02-18 19:22:13.958[0;39m [33m WARN[0;39m [35m6986[0;39m [2m---[0;39m [2m[ntainer#0-0-C-1][0;39m [36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-1, groupId=group_id] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2m2020-02-18 19:22:15.115[0;39m [33m WARN[0;39m [35m6986[0;39m [2m---[0;39m [2m[ntainer#0-0-C-1][0;39m [36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-1, groupId=group_id] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2m2020-02-18 19:22:16.272[0;39m [33m WARN[0;39m [35m6986[0;39m [2m---[0;39m [2m[ntainer#0-0-C-1][0;39m [36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-1, groupId=group_id] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

最佳答案

Kafka 代理,有时称为 Kafka 服务器,应该在您的电脑或远程计算机上运行。

请检查此链接 https://kafka.apache.org/quickstart

首先,你需要运行Zookeeper和Kafka代理然后您可以使用您的应用程序发送消息。

关于java - 消息未传递到 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60279775/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com