作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想使用 Kafka 绑定(bind)的 testcontainers 集成测试我的应用程序。
在以下场景中,测试设置永远不会超过 waitForMessagesToBeDelivered
它会永远等待收到消息后创建实体。根据我的理解,它应该通过 Kafka 传递两条消息,这些消息应该由 MyConsumer
接收。然后应该创建两个 MyEntity
s 应该增加 myEntityRepository.count()
返回的值.
在我创建此复制设置的类似场景中,收到两条消息中的一条并为其创建一个实体。
@Service
public class MyConsumer {
private final static Logger LOGGER = LoggerFactory.getLogger(MyConsumer.class);
private final MyEntityRepository myEntityRepository;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String TOPIC_ENTITIES = "entities";
@Autowired
public MyConsumer(MyEntityRepository myEntityRepository) {
this.myEntityRepository = myEntityRepository;
}
@KafkaListener(topics = TOPIC_ENTITIES, groupId = "MyEntity")
public void consume(String message) throws IOException {
LOGGER.info(String.format("#### -> Consumed message -> %s", message));
MyEntityCreatedMessage deserializedMessage = OBJECT_MAPPER.readValue(message, MyEntityCreatedMessage.class);
myEntityRepository.save(new MyEntity(deserializedMessage.getUuid()));
LOGGER.info("persisted created entity with uuid {}",
deserializedMessage.getUuid());
}
}
和一个
@Configuration
public class MySpringKafkaConfiguration {
@Bean
public NewTopic topicEntities() {
return new NewTopic(TOPIC_ENTITIES, 10, (short) 2);
}
}
我想用它来测试
@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = {MySpringKafkaApplication.class})
@ContextConfiguration(initializers = MyConsumerIT.TestcontainersInitializer.class)
@Testcontainers
public class MyConsumerIT {
private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumerIT.class);
private static KafkaContainer kafkaContainer = new KafkaContainer();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final UUID ENTITY1_UUID = UUID.randomUUID();
private static final UUID ENTITY2_UUID = UUID.randomUUID();
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private MyEntityRepository myEntityRepository;
@BeforeAll
public static void setUpClass() {
kafkaContainer.start();
}
@AfterAll
public static void tearDownClass() {
kafkaContainer.stop();
}
@BeforeEach
public void setUp() throws JsonProcessingException, InterruptedException {
final String serializedMessage = OBJECT_MAPPER.writeValueAsString(new MyEntityCreatedMessage(ENTITY1_UUID));
kafkaTemplate.send(TOPIC_ENTITIES, serializedMessage);
final String serializedMessageAdmin = OBJECT_MAPPER.writeValueAsString(new MyEntityCreatedMessage(ENTITY2_UUID));
kafkaTemplate.send(TOPIC_ENTITIES, serializedMessageAdmin);
waitForMessagesToBeDelivered();
}
private void waitForMessagesToBeDelivered() throws InterruptedException {
while(myEntityRepository.count() != 2) {
LOGGER.info(String.format("userRepository.count: %d",
myEntityRepository.count()));
Thread.sleep(500);
}
}
@Test
public void testSomethingWhichRequiresTwoMyEntities() {
}
/* default */ static class TestcontainersInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
String kafkaContainerBootstrapServers = kafkaContainer.getBootstrapServers();
LOGGER.info("kafkaContainerBootstrapServers: {}",
kafkaContainerBootstrapServers);
TestPropertyValues
.of("spring.kafka.consumer.bootstrap-servers=" + kafkaContainerBootstrapServers)
.applyTo(configurableApplicationContext.getEnvironment());
TestPropertyValues
.of("spring.kafka.producer.bootstrap-servers=" + kafkaContainerBootstrapServers)
.applyTo(configurableApplicationContext.getEnvironment());
}
}
}
在 logs我看到了
2019-11-15 17:01:44.575 ERROR 30056 --- [ main] o.springframework.kafka.core.KafkaAdmin : Could not configure topics
org.springframework.kafka.KafkaException: Timed out waiting to get existing topics; nested exception is java.util.concurrent.TimeoutException
但是连接似乎成功了,但我不确定。
我知道嵌入式 kafka 等替代方案,但我想使用 testcontainers 来提高集成的真实性。
最佳答案
您在发送消息和消费者启动之间进行了一场竞赛。默认情况下,新的 Kafka 消费者从主题末尾开始消费。
添加:
spring.kafka.consumer.auto-offset-reset=earliest
将确保消费者获取主题中的任何现有记录。
关于java - 如何在 Kafka 集成测试中通过发送消息来设置应用程序状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58880766/
我是一名优秀的程序员,十分优秀!