gpt4 book ai didi

java - 如何在 Kafka 集成测试中通过发送消息来设置应用程序状态?

转载 作者:行者123 更新时间:2023-11-30 01:44:00 26 4
gpt4 key购买 nike

我想使用 Kafka 绑定(bind)的 testcontainers 集成测试我的应用程序。

在以下场景中,测试设置永远不会超过 waitForMessagesToBeDelivered它会永远等待收到消息后创建实体。根据我的理解,它应该通过 Kafka 传递两条消息,这些消息应该由 MyConsumer 接收。然后应该创建两个 MyEntity s 应该增加 myEntityRepository.count() 返回的值.

在我创建此复制设置的类似场景中,收到两条消息中的一条并为其创建一个实体。

@Service
public class MyConsumer {
private final static Logger LOGGER = LoggerFactory.getLogger(MyConsumer.class);
private final MyEntityRepository myEntityRepository;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String TOPIC_ENTITIES = "entities";

@Autowired
public MyConsumer(MyEntityRepository myEntityRepository) {
this.myEntityRepository = myEntityRepository;
}

@KafkaListener(topics = TOPIC_ENTITIES, groupId = "MyEntity")
public void consume(String message) throws IOException {
LOGGER.info(String.format("#### -> Consumed message -> %s", message));
MyEntityCreatedMessage deserializedMessage = OBJECT_MAPPER.readValue(message, MyEntityCreatedMessage.class);
myEntityRepository.save(new MyEntity(deserializedMessage.getUuid()));
LOGGER.info("persisted created entity with uuid {}",
deserializedMessage.getUuid());
}
}

和一个

@Configuration
public class MySpringKafkaConfiguration {

@Bean
public NewTopic topicEntities() {
return new NewTopic(TOPIC_ENTITIES, 10, (short) 2);
}
}

我想用它来测试

@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = {MySpringKafkaApplication.class})
@ContextConfiguration(initializers = MyConsumerIT.TestcontainersInitializer.class)
@Testcontainers
public class MyConsumerIT {
private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumerIT.class);
private static KafkaContainer kafkaContainer = new KafkaContainer();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final UUID ENTITY1_UUID = UUID.randomUUID();
private static final UUID ENTITY2_UUID = UUID.randomUUID();

@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private MyEntityRepository myEntityRepository;

@BeforeAll
public static void setUpClass() {
kafkaContainer.start();
}

@AfterAll
public static void tearDownClass() {
kafkaContainer.stop();
}

@BeforeEach
public void setUp() throws JsonProcessingException, InterruptedException {
final String serializedMessage = OBJECT_MAPPER.writeValueAsString(new MyEntityCreatedMessage(ENTITY1_UUID));
kafkaTemplate.send(TOPIC_ENTITIES, serializedMessage);
final String serializedMessageAdmin = OBJECT_MAPPER.writeValueAsString(new MyEntityCreatedMessage(ENTITY2_UUID));
kafkaTemplate.send(TOPIC_ENTITIES, serializedMessageAdmin);
waitForMessagesToBeDelivered();
}

private void waitForMessagesToBeDelivered() throws InterruptedException {
while(myEntityRepository.count() != 2) {
LOGGER.info(String.format("userRepository.count: %d",
myEntityRepository.count()));
Thread.sleep(500);
}
}

@Test
public void testSomethingWhichRequiresTwoMyEntities() {
}

/* default */ static class TestcontainersInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
String kafkaContainerBootstrapServers = kafkaContainer.getBootstrapServers();
LOGGER.info("kafkaContainerBootstrapServers: {}",
kafkaContainerBootstrapServers);
TestPropertyValues
.of("spring.kafka.consumer.bootstrap-servers=" + kafkaContainerBootstrapServers)
.applyTo(configurableApplicationContext.getEnvironment());
TestPropertyValues
.of("spring.kafka.producer.bootstrap-servers=" + kafkaContainerBootstrapServers)
.applyTo(configurableApplicationContext.getEnvironment());
}
}
}

logs我看到了

2019-11-15 17:01:44.575 ERROR 30056 --- [           main] o.springframework.kafka.core.KafkaAdmin  : Could not configure topics

org.springframework.kafka.KafkaException: Timed out waiting to get existing topics; nested exception is java.util.concurrent.TimeoutException

但是连接似乎成功了,但我不确定。

我知道嵌入式 kafka 等替代方案,但我想使用 testcontainers 来提高集成的真实性。

最佳答案

您在发送消息和消费者启动之间进行了一场竞赛。默认情况下,新的 Kafka 消费者从主题末尾开始消费。

添加:

spring.kafka.consumer.auto-offset-reset=earliest

将确保消费者获取主题中的任何现有记录。

关于java - 如何在 Kafka 集成测试中通过发送消息来设置应用程序状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58880766/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com