gpt4 book ai didi

apache-spark - 如何在 Spark Streaming 中跨多个批处理间隔传输数据流

转载 作者:行者123 更新时间:2023-12-04 21:36:09 25 4
gpt4 key购买 nike

我正在使用 Apache Spark Streaming 1.6.1 编写一个 Java 应用程序,该应用程序连接两个键/值数据流并将输出写入 HDFS。这两个数据流包含 K/V 字符串,并使用 textFileStream() 从 HDFS 定期在 Spark 中摄取。

两个数据流不同步,这意味着在时间 t0 时在流 1 中的某些键可能会在时间 t1 时出现在流 2 中,反之亦然。因此,我的目标是连接两个流并计算“剩余”键,在下一个批处理间隔中的连接操作应该考虑这些键。

为了更好地阐明这一点,请查看以下算法:

variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0

operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)

我试图用 Spark Streaming 实现这个算法没有成功。最初,我以这种方式为剩余的键创建了两个空流(这只是一个流,但生成第二个流的代码类似):
JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String> () {
@Override
public scala.Tuple2<String, String> call(String s) {
return new scala.Tuple2(s, s);
}
});

稍后,这个空流与 stream1 统一(即,union()),最后,在加入之后,我添加来自 stream1 的剩余键并调用 window()。流 2 也会发生同样的情况。

问题在于生成 left_keys_s1 和 left_keys_s2 的操作是没有 Action 的转换,这意味着 Spark 不会创建任何 RDD 流图,因此它们永远不会被执行。我现在得到的是一个连接,它只输出键在同一时间间隔内的流 1 和流 2 的记录。

你们有什么建议可以用 Spark 正确实现这个吗?

谢谢,
马可

最佳答案

通过保留对我们保存这些值的 RDD 的引用,应该可以将值从一个批次转移到下一个批次。

不要尝试使用 queueDStream 合并流,而是声明一个可变的 RDD 引用,可以在每个流间隔更新。

这是一个例子:

在这个流工作中,我们从一个 RDD 携带 100 开始整数。每个区间,10生成并减去那些最初的 100 个整数的随机数。这个过程一直持续到初始的 100 个元素的 RDD 为空。这个例子展示了如何将元素从一个区间转移到下一个区间。

  import scala.util.Random
import org.apache.spark.streaming.dstream._

val ssc = new StreamingContext(sparkContext, Seconds(2))

var targetInts:RDD[Int] = sc.parallelize(0 until 100)

var loops = 0

// we create an rdd of functions that generate random data.
// evaluating this RDD at each interval will generate new random data points.
val randomDataRdd = sc.parallelize(1 to 10).map(_ => () => Random.nextInt(100))

val dstream = new ConstantInputDStream(ssc, randomDataRdd)

// create values from the random func rdd

dataDStream.foreachRDD{rdd =>
loops += 1
targetInts = targetInts.subtract(rdd)
if (targetInts.isEmpty) {println(loops); ssc.stop(false)}
}


ssc.start()

运行此示例并绘制 loops反对 targetInts.count给出以下图表:

Removing 100 ints by generating random numbers

我希望这能给你足够的指导来实现完整的用例。

关于apache-spark - 如何在 Spark Streaming 中跨多个批处理间隔传输数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37356691/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com