gpt4 book ai didi

java - 单元测试spring-kafka消费者时如何在KafkaEmbedded中设置端口

转载 作者:行者123 更新时间:2023-11-30 12:05:30 32 4
gpt4 key购买 nike

我正在使用 spring-boot-starter-parent 版本 1.5.0.RELEASEspring-kafka 版本 1.0。 0.RELEASEspring-kafka-test 版本 1.0.0.RELEASE 在使用来自 Kakfa 0.9 的消息的应用程序中簇。我为使用 KafkaEmbedded 的消费者进行了单元测试,但它失败了,因为代理端口是随机选择的。有没有一种方法可以在不更改版本的情况下设置此代理属性?或者我应该使用哪个版本才不会破坏任何东西?

这是 KafkaListenerKafkaConsumerTest 的代码。

Listener.java

@Service
public class Listener {

private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
logger.info(msg);
latch.countDown();
ack.acknowledge();
}

public CountDownLatch getLatch() {
return latch;
}
}

KafkaConsumerTest.java(编辑)

@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
private static String TEST_TOPIC = "topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

public KafkaTemplate<String, String> template;

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
private Listener listener;

@Before
public void init(){
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
senderProps.put("key.serializer", StringSerializer.class);
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps);
template = new KafkaTemplate<>(producerFactory);
template.setDefaultTopic(TEST_TOPIC);
}

@Test
public void testConsume() throws Exception {
String record = "message";
template.sendDefault(TEST_TOPIC, record);
logger.debug("test-consume sent record {}", record);
listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(listener.getLatch().getCount(), 0);
}
}

最佳答案

请使用 spring-kafka 1.3.9 和 boot 1.5;不再支持早期版本。当前引导 1.5.x 版本为 1.5.21。

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

static {
embeddedKafka.setKafkaPorts(1234);
}

setKafkaPorts 从 1.3 开始可用。

但是,您在测试中正确使用了分配的随机端口

Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());

要让 kafka 监听器连接到嵌入式代理,您可以使用。

    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString()); 

关于java - 单元测试spring-kafka消费者时如何在KafkaEmbedded中设置端口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56225440/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com