gpt4 book ai didi

java - 流启动后 Spark Stream 新作业

转载 作者:行者123 更新时间:2023-12-02 11:00:50 24 4
gpt4 key购买 nike

我遇到了一种情况,我尝试使用来自 kafka 的 Spark Streaming 进行流式传输。该流是直接流。我能够创建一个流,然后开始流式传输,还能够通过流式传输获取 kafka 上的任何更新(如果有)。

当我有新的流式传输新主题的请求时,就会出现问题。由于每个 jvm SparkStreaming 上下文只能有 1 个,因此我无法为每个新请求创建一个新流。

我想到的方法是

  1. 创建 DStream 并且 Spark Streaming 已经在进行中后,只需向其附加一个新流即可。这似乎不起作用,createDStream(对于新的 topic2)不会返回流并且进一步的处理被停止。流式传输继续处理第一个请求(例如主题1)。

  2. 其次,我想停止流,创建 DStream,然后再次开始流。我无法使用相同的流上下文(它会引发流停止后无法添加作业的预期),并且如果我为新主题(topic2)创建新流,则旧流主题(topic1)将丢失并且它会流只有新的。

这是代码,你看一下

 JavaStreamingContext javaStreamingContext;
if(null == javaStreamingContext) {
javaStreamingContext = JavaStreamingContext(sparkContext, Durations.seconds(duration));
} else {
StreamingContextState streamingContextState = javaStreamingContext.getState();
if(streamingContextState == StreamingContextState.STOPPED) {
javaStreamingContext = JavaStreamingContext(sparkContext, Durations.seconds(duration));
}


}
Collection<String> topics = Arrays.asList(getTopicName(schemaName));
SparkVoidFunctionImpl impl = new SparkVoidFunctionImpl(getSparkSession());

KafkaUtils.createDirectStream(javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, getKafkaParamMap()))
.map((stringStringConsumerRecord) -> stringStringConsumerRecord.value())
.foreachRDD(impl);
if (javaStreamingContext.getState() == StreamingContextState.ACTIVE) {

javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}

不用担心 SparkVoidFunctionImpl,这是一个自定义类,它是 VoidFunction 的实现。

以上是方法1,我不停止现有的流媒体。当新请求进入此方法时,它不会获取新的流对象,而是尝试创建 dstream。问题是 DStream 对象永远不会返回。

KafkaUtils.createDirectStream(javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, getKafkaParamMap()))

这不会返回 dstream,控件只是在没有错误的情况下终止。不会执行进一步的步骤。

我尝试了很多方法并阅读了多篇文章,但我相信这是一个非常常见的生产级别问题。任何流式传输都是在多个不同主题上完成的,并且每个主题的处理方式都不同。

请帮忙

最佳答案

事情是 Spark Master 向工作人员发送代码,尽管数据是流式传输,但底层代码和变量值保持静态,除非重新启 Action 业。

我能想到的几个选项:

  1. Spark 作业服务器:每次您想要从不同的主题订阅/流式传输而不是接触已在运行的作业时,请启动一个新作业。您可以从 API 主体中提供参数或主题名称。如果您想停止特定主题的流式传输,只需停止相应的作业即可。它将为您提供很大的灵 active 和对资源的控制。

  2. [理论]主题过滤:订阅您认为想要的所有主题,当拉取记录一段时间后,根据主题列表过滤掉记录。通过 API 操作此主题列表以增加或减少主题范围,它也可以是广播变量。这只是一个想法,我根本没有尝试过这个选项。

  3. 另一种解决方法是在需要时使用微服务将 Topic-2 数据转发到 Topic-1,如果不需要则停止。

关于java - 流启动后 Spark Stream 新作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51341461/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com