gpt4 book ai didi

java - 如何测试是否正在调用带有 @KafkaListener 的方法

转载 作者:太空宇宙 更新时间:2023-11-04 09:37:04 27 4
gpt4 key购买 nike

我真的很难编写一个测试来检查当消息发送到指定主题时我的 Kafka Consumer 是否被正确调用。

我的消费者:

@Service
@Slf4j
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class ProcessingConsumer {

private AppService appService;

@KafkaListener(
topics = "${topic}",
containerFactory = "processingConsumerContainerFactory")
public void listen(ConsumerRecord<Key, Value> message, Acknowledgment ack) {
try {
appService.processMessage(message);
ack.acknowledge();
} catch (Throwable t) {
log.error("error while processing message!", t);
}
}
}

我的消费者配置:

@EnableKafka
@Configuration
public class ProcessingCosumerConfig {

@Value("${spring.kafka.schema-registry-url}")
private String schemaRegistryUrl;

private KafkaProperties props;

public ProcessingCosumerConfig(KafkaProperties kafkaProperties) {
this.props = kafkaProperties;
}

public Map<String, Object> deserializerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

return props;
}

private KafkaAvroDeserializer getKafkaAvroDeserializer(Boolean isKey) {
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(deserializerConfigs(), isKey);
return kafkaAvroDeserializer;
}

private DefaultKafkaConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
props.buildConsumerProperties(),
getKafkaAvroDeserializer(true),
getKafkaAvroDeserializer(false));
}

@Bean(name = "processingConsumerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Key, Value>>
kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<Key, Value>
factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
}

最后,我的(想要的)测试:

@DirtiesContext
public class ProcessingConsumerTest extends BaseIntegrationTest{

@Autowired private ProcessingProducerFixture processingProducer;
@Autowired private ProcessingConsumer processingConsumer;
@org.springframework.beans.factory.annotation.Value("${topic}")
String topic;

@Test
public void consumer_shouldConsumeMessages_whenMessagesAreSent() throws Exception{
Thread.sleep(1000);
ProducerRecord<Key, Value> message = new ProducerRecord<>(topic, new Key("b"), new Value("a", "b", "c", "d"));
processingProducer.send(message);
}
}

到目前为止我所掌握的就是这些。我尝试过使用调试手动检查这种方法是否到达消费者手中,甚至只是将简单的打印放在那里,但执行似乎根本没有到达那里。另外,如果我的测试可以以某种方式正确调用它,我不知道如何在实际测试中实际断言它。

最佳答案

将模拟 AppService 注入(inject)监听器并验证其 processMessage() 是否被调用。

关于java - 如何测试是否正在调用带有 @KafkaListener 的方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56386509/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com