gpt4 book ai didi

spring-kafka - 如何针对服务器上运行的真实 kafka 代理测试 kafka 消费者?

转载 作者:行者123 更新时间:2023-12-04 17:37:40 33 4
gpt4 key购买 nike

我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想针对在服务器上运行的真实 Kafka 代理测试消费者,该服务器有一些生产者已将数据写入/已经将数据写入各种主题。我想与服务器建立连接,消费数据,并在测试中验证或处理其内容。

互联网上的绝大多数示例(实际上是我目前看到的所有示例)都涉及嵌入式 kafka、EmbeddedKafkaBroker,并展示了在本地一台机器上实现的生产者和消费者。我还没有找到任何可以解释如何与远程 kafka 服务器建立连接并从特定主题读取数据的示例。
我写了一些代码,并打印了经纪人地址:

System.out.println(embeddedKafkaBroker.getBrokerAddress(0));

我得到的是127.0.0.1:9092,这意味着它是本地的,所以还没有建立与远程服务器的连接。

另一方面,当我运行 SpringBootApplication 时,我从远程代理获取有效负载。

接收者:
@Component
public class Receiver {

private static final String TOPIC_NAME = "X";

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

private CountDownLatch latch = new CountDownLatch(1);

public CountDownLatch getLatch() {
return latch;
}

@KafkaListener(topics = TOPIC_NAME)
public void receive(final byte[] payload) {
LOGGER.info("received the following payload: '{}'", payload);
latch.countDown();
}
}

配置:
    @EnableKafka
@Configuration
public class ByteReceiverConfig {

@Autowired
EmbeddedKafkaBroker kafkaEmbeded;

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")
private String groupIdConfig;

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}

@Bean
Map<String, Object> consumerProperties() {
final Map<String, Object> properties =
KafkaTestUtils.consumerProps("junit-test", "true", this.kafkaEmbeded);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
return properties;
}

测试:
        @EnableAutoConfiguration
@EnableKafka
@SpringBootTest(classes = {ByteReceiverConfig.class, Receiver.class})
@EmbeddedKafka
@ContextConfiguration(classes = ByteReceiverConfig.class)
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.group-id=EmbeddedKafkaTest"})
public class KafkaTest {


@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;


@Autowired
Receiver receiver;

@BeforeEach
void waitForAssignment() {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
System.out.println(messageListenerContainer.getAssignedPartitions().isEmpty());
System.out.println(messageListenerContainer.toString());
System.out.println(embeddedKafkaBroker.getTopics().size());
System.out.println(embeddedKafkaBroker.getPartitionsPerTopic());
System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
System.out.println(embeddedKafkaBroker.getBrokersAsString());

ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafkaBroker.getPartitionsPerTopic());
}

@Test
public void testReceive() {

}
}

我希望有人对以下问题有所了解:

1.EmbeddedKafkaBroker 类的实例是否可以用于测试来自远程代理的数据,还是仅用于本地测试,在本地测试中,我会产生,即将数据发送到我创建的主题并自己使用数据?

2.是否可以为真正的kafka服务器编写测试类?例如,验证是否已建立连接,或者是否已从特定主题读取数据。在这种情况下需要哪些注释、配置和类?

3.如果我只想消费数据,我是否必须在一个配置文件中提供生产者配置(这会很奇怪,但到目前为止我遇到的所有例子都这样做了)?

4.您是否知道任何显示使用 kafka 的真实示例的资源(书籍、网站等),即仅使用远程 kafka 服务器、生产者或消费者?

最佳答案

  • 如果您只想与外部经纪人交谈,则根本不需要嵌入式经纪人。
  • 是的,只需适当设置引导服务器属性。
  • 不,您不需要生产者配置。

  • 编辑

    @SpringBootApplication
    public class So56044105Application {

    public static void main(String[] args) {
    SpringApplication.run(So56044105Application.class, args);
    }

    @Bean
    public NewTopic topic() {
    return new NewTopic("so56044105", 1, (short) 1);
    }

    }
    spring.kafka.bootstrap-servers=10.0.0.8:9092
    spring.kafka.consumer.enable-auto-commit=false

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = { So56044105Application.class, So56044105ApplicationTests.Config.class })
    public class So56044105ApplicationTests {

    @Autowired
    public Config config;

    @Test
    public void test() throws InterruptedException {
    assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
    assertThat(config.received.get(0)).isEqualTo("foo");
    }

    @Configuration
    public static class Config implements ConsumerSeekAware {

    List<String> received = new ArrayList<>();

    CountDownLatch latch = new CountDownLatch(3);

    @KafkaListener(id = "so56044105", topics = "so56044105")
    public void listen(String in) {
    System.out.println(in);
    this.received.add(in);
    this.latch.countDown();
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    System.out.println("Seeking to beginning");
    assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    }

    }

    关于spring-kafka - 如何针对服务器上运行的真实 kafka 代理测试 kafka 消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56044105/

    33 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com