gpt4 book ai didi

spring-boot - kafka嵌入: java. io.FileNotFoundException :/tmp/kafka-7785736914220873149/replication-offset-checkpoint. tmp

转载 作者:行者123 更新时间:2023-12-03 14:18:24 25 4
gpt4 key购买 nike

我在集成测试中使用 kafkaEmbedded 并得到 FileNotFoundException :

java.io.FileNotFoundException: /tmp/kafka-7785736914220873149/replication-offset-checkpoint.tmp 
at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[na:1.8.0_141]
at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:43) ~[kafka_2.11-0.11.0.0.jar:na]
at kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:58) ~[kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:1118) [kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:1115) [kafka_2.11-0.11.0.0.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) [scala-library-2.11.11.jar:na]
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) [scala-library-2.11.11.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) [scala-library-2.11.11.jar:na]
at kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1115) [kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$1.apply$mcV$sp(ReplicaManager.scala:211) [kafka_2.11-0.11.0.0.jar:na]
at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) [kafka_2.11-0.11.0.0.jar:na]
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57) [kafka_2.11-0.11.0.0.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_141]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_141]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_141]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_141]

我的测试成功通过,但在构建结束时出现此错误

经过数小时的研究,我发现了这一点:
  • kafka TestUtils.tempDirectory 方法用于为嵌入式 kafka 代理创建临时目录。它还注册了关闭钩子(Hook),当 JVM 退出时删除这个目录。
  • 当单元测试完成执行时,它会调用 System.exit,它会依次执行所有已注册的关闭 Hook

  • 如果 kafka 代理在单元测试结束时运行,它将尝试在已删除的目录中写入/读取数据并产生不同的 FileNotFound 异常。

    我的配置类:
    @Configuration
    public class KafkaEmbeddedConfiguration {

    private final KafkaEmbedded kafkaEmbedded;

    public KafkaEmbeddedListenerConfigurationIT() throws Exception {
    kafkaEmbedded = new KafkaEmbedded(1, true, "topic1");
    kafkaEmbedded.before();
    }

    @Bean
    public KafkaTemplate<String, Message> sender(ProtobufSerializer protobufSerializer,
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) throws Exception {
    KafkaTemplate<String, Message> sender = KafkaTestUtils.newTemplate(kafkaEmbedded, new StringSerializer(),
    protobufSerializer);
    for (MessageListenerContainer listenerContainer :
    registry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(listenerContainer,
    kafkaEmbedded.getPartitionsPerTopic());
    }

    return sender;
    }

    测试类:
    @RunWith(SpringRunner.class)
    public class DeviceEnergyKafkaListenerIT {
    ...
    @Autowired
    private KafkaTemplate<String, Message> sender;

    @Test
    public void test (){
    ...
    sender.send(topic, msg);
    sender.flush();
    }

    有什么想法可以解决这个问题吗?

    最佳答案

    @ClassRule经纪人,添加 @AfterClass方法...

    @AfterClass
    public static void tearDown() {
    embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
    embeddedKafka.getKafkaServers().forEach(b -> b.awaitShutdown());
    }

    对于 @Rule或 bean,使用 @After方法。

    关于spring-boot - kafka嵌入: java. io.FileNotFoundException :/tmp/kafka-7785736914220873149/replication-offset-checkpoint. tmp,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49006479/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com