gpt4 book ai didi

junit - 如何从程序停止flink流作业

转载 作者:行者123 更新时间:2023-12-04 07:16:51 26 4
gpt4 key购买 nike

我正在尝试为Flink流作业创建JUnit测试,该作业将数据写入kafka主题,并分别使用FlinkKafkaProducer09FlinkKafkaConsumer09从同一kafka主题读取数据。我正在生产中传递测试数据:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");

并检查是否来自使用者的数据相同:
List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result = resultSink.getResult();
assertEquals(expected, result);

使用 TestListResultSink

通过打印流,我可以看到来自使用者的数据。但是无法获得Junit测试结果,因为即使消息完成后,使用者仍将继续运行。因此,它并没有成为测试的一部分。

FlinkFlinkKafkaConsumer09中是否有任何方法可以停止进程或在特定时间运行?

最佳答案

潜在的问题是流式程序通常不是有限的并且可以无限期地运行。

至少就目前而言,最好的方法是在流中插入一条特殊的控制消息,以使源正确终止(只需退出读取循环即可停止读取更多数据)。这样,Flink将告诉所有下游运营商,他们在使用完所有数据后可以停止。

或者,您可以在源中抛出一个特殊的异常(例如,一段时间后),以便您可以将“正确的”终止与故障情况区分开(通过检查错误原因)。在源中引发异常将使程序失败。

关于junit - 如何从程序停止flink流作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44441153/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com