gpt4 book ai didi

apache-spark - 将数据从 Spark-Streaming 存储到 Cassandra 时出现问题

转载 作者:行者123 更新时间:2023-12-04 05:13:42 25 4
gpt4 key购买 nike

SparkStreaming 上下文以 30 秒的间隔从 RabbitMQ 读取流。我想修改 cassandra 中存在的相应行的几列的值,然后想将数据存储回 Cassandra。为此,我需要检查特定主键的行是否存在于 Cassandra 中,如果是,则获取它并执行必要的操作。但问题是,我在驱动程序上创建了 StreamingContext 并在 Worker 上执行了操作。因此,他们无法获得 StreamingContext 对象,原因是它未序列化并发送给工作人员,我收到此错误:java.io.NotSerializableException:org.apache.spark.streaming.StreamingContext。我也知道我们无法访问 foreachRDD 中的 StreamingContext。但是,如何在不出现序列化错误的情况下实现相同的功能?

我看过几个例子here但它没有帮助。

这是代码片段:

   val ssc = new StreamingContext(sparkConf,30)
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
receiverStream.start()
val lines = receiverStream.map(EventData.fromString(_))
lines.foreachRDD{ x => if (x.toLocalIterator.nonEmpty) {
x.foreachPartition { it => for (tuple <- it) {
val cookieid = tuple.cookieid
val sessionid = tuple.sessionid
val logdate = tuple.logdate
val EventRows = ssc.cassandraTable("SparkTest", CassandraTable).select("*")
.where("cookieid = '" + cookieid + "' and logdate = '" + logdate+ "' and sessionid = '" + sessionid + "')

Somelogic Whether row exist or not for Cookieid

} } }

最佳答案

SparkContext 无法序列化并传递给可能位于不同节点中的多个工作程序。如果你需要做这样的事情,你可以使用 forEachPartiion, mapPartitons。否则使用您传递的函数执行此操作

 CassandraConnector(SparkWriter.conf).withSessionDo { session =>
....
session.executeAsync(<CQL Statement>)

在 SparkConf 中,您需要提供 Cassandra 的详细信息

  val conf = new SparkConf()
.setAppName("test")
.set("spark.ui.enabled", "true")
.set("spark.executor.memory", "8g")
// .set("spark.executor.core", "4")
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.dir", "/ephemeral/spark-events")
//to avoid disk space issues - default is /tmp
.set("spark.local.dir", "/ephemeral/spark-scratch")
.set("spark.cleaner.ttl", "10000")
.set("spark.cassandra.connection.host", cassandraip)
.setMaster("spark://10.255.49.238:7077")

Java CSCParser 是一个不可序列化的库。因此,如果您在 RDD 上调用 map 或 forEach,Spark 无法向它发送可能不同的节点。一种解决方法是使用 mapPartion,在这种情况下,将在一个 SparkNode 中执行一个完整的分区。因此它不需要为每个调用序列化。示例

val rdd_inital_parse = rdd.mapPartitions(pLines).

def pLines(lines: Iterator[String]) = {
val parser = new CSVParser() ---> Cannot be serialized, will fail if using rdd.map(pLines)
lines.map(x => parseCSVLine(x, parser.parseLine))
}

关于apache-spark - 将数据从 Spark-Streaming 存储到 Cassandra 时出现问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39363586/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com