gpt4 book ai didi

cassandra - 将批处理RDD中的结果与Apache Spark中的流式RDD合并

转载 作者:行者123 更新时间:2023-12-04 04:17:05 25 4
gpt4 key购买 nike

上下文:
我正在使用Apache Spark汇总来自日志的不同事件类型的运行计数。日志既存储在Cassandra中以进行历史分析,又存储在Kafka中以进行实时分析。每个日志都有一个日期和事件类型。为了简单起见,假设我想跟踪每天一种类型的日志数量。

我们有两个RDD,一个是来自Cassandra的批处理数据的RDD,另一个是来自Kafka的流式RDD。
伪代码:

CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");

JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
@Override
public Tuple2<String, Integer> call(CassandraRow row) {
return new Tuple2<String, Integer>(row.getString("date"), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
});

save(batchRDD) // Assume this saves the batch RDD somewhere

...

// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
String jsonString = data._2;
JSON jsonObj = JSON.parse(jsonString);
Date eventDate = ... // get date from json object
// Assume startTime is broadcast variable that is set to the time when the job started.
if (eventDate.after(startTime.value())) {
ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
return pairs;
} else {
return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
}
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
Integer previousValue = state.or(0l);
Integer currentValue = ... // Sum of counts
return Optional.of(previousValue + currentValue);
}
});
save(streamRDD); // Assume this saves the stream RDD somewhere

sc.start();
sc.awaitTermination();

问题:
如何将streamRDD的结果与batchRDD合并?
假设 batchRDD具有以下数据,并且此作业在2014-10-16上运行:
("2014-10-15", 1000000)
("2014-10-16", 2000000)

由于Cassandra查询仅包含批处理查询开始之前的所有数据,因此我们必须在查询完成后从Kafka读取数据,仅考虑作业开始时间之后的日志。我们假设查询需要很长时间。这意味着我需要将历史结果与流式结果相结合。

例如:
    |------------------------|-------------|--------------|--------->
tBatchStart tStreamStart streamBatch1 streamBatch2

然后假设在第一批流中我们获得了以下数据:
("2014-10-19", 1000)

然后,我想将批处理RDD与该流RDD组合在一起,以便流RDD现在具有以下值:
("2014-10-19", 2001000)

然后假设在第二个流批处理中,我们获得了以下数据:
("2014-10-19", 4000)

然后应将流RDD更新为具有以下值:
("2014-10-19", 2005000)

等等...

可以使用 streamRDD.transformToPair(...)通过 join将streamRDD数据与batchRDD数据组合在一起,但是如果我们对每个流块执行此操作,那么我们将为每个流块添加来自batchRDD的计数,从而使状态值“重复计数” ,仅应将其添加到第一个流块中。

最佳答案

为了解决这种情况,我将基rdd与保留了流数据总数的StateDStream聚合结果合并。这有效地为每个流传输间隔上报告的数据提供了基准,而无需计算所述基准x次。

我使用示例WordCount尝试了该想法,并且该方法有效。将其放在REPL上作为一个实时示例:

(在单独的 shell 上使用nc -lk 9876socketTextStream提供输入)

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7 )
val defaultRdd = sc.parallelize(defaults)

@transient val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/tmp/spark")

val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
val historicCount = wordCount.updateStateByKey[Int]{(newValues: Seq[Int], runningCount: Option[Int]) =>
Some(newValues.sum + runningCount.getOrElse(0))
}
val runningTotal = historicCount.transform{ rdd => rdd.union(defaultRdd)}.reduceByKey( _+_ )

wordCount.print()
historicCount.print()
runningTotal.print()
ssc.start()

关于cassandra - 将批处理RDD中的结果与Apache Spark中的流式RDD合并,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26520595/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com