gpt4 book ai didi

apache-kafka - 如何对 Kafka Streams 进行单元测试

转载 作者:行者123 更新时间:2023-12-02 11:30:40 24 4
gpt4 key购买 nike

在探索如何对 Kafka Stream 进行单元测试时,我遇到了 ProcessorTopologyTestDriver,不幸的是,这个类似乎在 0.10.1.0 版本中被破坏了 ( KAFKA-4408 )

是否有解决 KTable 问题的方法?

我看到了“Mocked Streams”项目,但首先它使用版本 0.10.2.0,而我使用的是 0.10.1.1,其次它是 Scala,而我的测试是 Java/Groovy。

这里有关如何对流进行单元测试而无需引导 Zookeeper/kafka 的任何帮助都会很棒。

注意:我确实有使用嵌入式服务器的集成测试,这是用于单元测试,又名快速、简单的测试。

编辑

谢谢拉蒙·加西亚

For people arriving here in Google searches, please note that the test driver class is now org.apache.kafka.streams.TopologyTestDriver

This class is in the maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils

最佳答案

我找到了解决这个问题的方法,我不确定这是答案,尤其是在https://stackoverflow.com/users/4953079/matthias-j-sax之后评论。无论如何,分享我迄今为止所拥有的......

我完全复制了ProcessorTopologyTestDriver from the 0.10.1 branch (这是我正在使用的版本)。

发送至地址KAFKA-4408我做了private final MockConsumer<byte[], byte[]> restoreStateConsumer可访问并移动 block task = new StreamTask(...到一个单独的方法,例如bootstrap .

在测试的设置阶段,我执行以下操作

driver = new ProcessorTopologyTestDriver(config, builder)
ArrayList partitionInfos = new ArrayList();
partitionInfos.add(new PartitionInfo('my_ktable', 1, (Node) null, (Node[]) null, (Node[]) null));
driver.restoreStateConsumer.updatePartitions('my_ktable', partitionInfos);
driver.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition('my_ktable', 1), Long.valueOf(0L)));
driver.bootstrap()

就是这样......

奖金

我还遇到了KAFKA-4461 ,幸运的是,自从我复制了整个类(class)后,我能够“挑选”accepted fix稍作调整。

一如既往地欢迎反馈。虽然显然不是官方测试类,但该驱动程序被证明非常有用!

关于apache-kafka - 如何对 Kafka Streams 进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43220110/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com