gpt4 book ai didi

apache-kafka - 通过 Java KafkaServer API 删除和重新创建 Kafka 主题

转载 作者:行者123 更新时间:2023-12-04 05:16:02 25 4
gpt4 key购买 nike

我对连接到本地 Kafka 实例的应用程序进行了一些集成测试。当测试以类似于此问题的已接受答案的方式运行时,我正在使用 Java KafkaServer API 按需创建本地实例:

How can I instanciate a Mock Kafka Topic for junit tests?

我的每个测试在单独运行时都通过了。我遇到的问题是我的测试使用相同的 Kafka 主题,我希望主题开始每个不包含消息的测试。但是,当我连续运行测试时,在第一次运行所有测试并尝试重新创建它们需要的主题时,我会收到此错误:

kafka.common.TopicExistsException: Topic "test_topic" already exists.
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:187)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)

每个测试都会创建和关闭自己的 EmbeddedZookeeper 和 KafkaServer。我还尝试在每次测试结束时从 ZK 以及 KafkaServer 的 logDirs 中删除“brokers/topics”路径。不知何故,第一次测试的主题仍然存在到第二次测试中。

在每个测试结束时我可以做什么来确保它使用的主题不会干扰在它之后运行的测试?

最佳答案

我最终能够让它工作。

我没有在每次测试后进行清理,而是将测试更改为在运行前进行清理。

我需要执行两个清理步骤。

首先是在启动 KafkaServer 之前删除代理的数据目录。

    String dataDirectory = 'tmp/kafka'
FileUtils.deleteDirectory(FileUtils.getFile(dataDirectory))

Properties props = TestUtils.createBrokerConfig(BROKER_ID, port, true)
props.put('log.dir', dataDirectory)
props.put('delete.topic.enable', 'true')

KafkaConfig config = new KafkaConfig(props)
Time mock = new MockTime()
kafkaServer = TestUtils.createServer(config, mock)

第二个是在发送createTopic命令之前在Zookeeper中递归删除主题路径。

    zkClient.deleteRecursive(ZkUtils.getTopicPath(topicName))

List<String> arguments = ['--topic', topicName, '--partitions', '1', '--replication-factor', '1']
TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments as String[]))

我尝试了很多类似的方法,但除了这个之外无法让它与任何东西一起工作。

请注意,代码是 Groovy 而不是 Java。

关于apache-kafka - 通过 Java KafkaServer API 删除和重新创建 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35022695/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com