- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试通过 Apache Spark Streaming 阅读 Kafka 主题,但无法弄清楚如何将 DStream 中的数据转换为 DataFrame,然后存储在临时表中。 Kafka 中的消息采用 Avro 格式,由 Kafka JDBC Connect 从数据库创建。我有以下代码,在执行 spark.read.json
将 json 读取到数据帧之前,它工作正常。
package consumerTest
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import scala.util.parsing.json.{JSON, JSONObject}
object Consumer {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "<kafka-server>:9092",
"key.deserializer" -> classOf[KafkaAvroDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"group.id" -> "sakwq",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "false",
"schema.registry.url" -> "http://<schema-registry>:8181"
)
val topics = Array("cdcemployee")
val stream = KafkaUtils.createDirectStream[String, Object](
ssc,
PreferConsistent,
Subscribe[String, Object](topics, kafkaParams)
)
val data = stream.map(record => {
println(record.value.toString())
record.value
val df = spark.read.json(record.value.toString())
})
data.print();
ssc.start()
ssc.awaitTermination()
}
}
我在执行 val df = spark.read.json(record.value.toString())
行时遇到空指针异常
18/05/10 09:49:11 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:135)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:133)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:645)
at consumerTest.Consumer$.$anonfun$main$1(Consumer.scala:63)
at consumerTest.Consumer$.$anonfun$main$1$adapted(Consumer.scala:60)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/05/10 09:49:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:135)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:133)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:645)
at consumerTest.Consumer$.$anonfun$main$1(Consumer.scala:63)
at consumerTest.Consumer$.$anonfun$main$1$adapted(Consumer.scala:60)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
此外,如果我删除 spark.read.json
,这里是执行语句 println(record.value.toString())
时打印的示例数据声明
{"CDCTRANSACTIONID": 182241, "CDCTIMESTAMP": "2018-03-26 18:04:44:776 - 04:00", "CDCCHANGESEQ": 14, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": null, "PostalCode": null, "DeptCode": "300", "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 182241, "CDCTIMESTAMP": "2018-03-26 18:04:44:776 - 04:00", "CDCCHANGESEQ": 14, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "Raleigh", "State": null, "PostalCode": null, "DeptCode": "", "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 197086, "CDCTIMESTAMP": "2018-03-27 11:18:48:022 - 04:00", "CDCCHANGESEQ": 15, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "New York", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 197086, "CDCTIMESTAMP": "2018-03-27 11:18:48:022 - 04:00", "CDCCHANGESEQ": 15, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 363712, "CDCTIMESTAMP": "2018-04-04 15:30:46:551 - 04:00", "CDCCHANGESEQ": 16, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Diego", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 363712, "CDCTIMESTAMP": "2018-04-04 15:30:46:551 - 04:00", "CDCCHANGESEQ": 16, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "New York", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 363785, "CDCTIMESTAMP": "2018-04-04 15:35:11:492 - 04:00", "CDCCHANGESEQ": 17, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 2, "EmpNum": 57, "LastName": "bobba2s", "FirstName": "Saikrishna Teja", "Address": "9220 Bothwell St", "Address2": "", "City": "San Diego", "State": "NC", "PostalCode": "27617", "DeptCode": "300", "Position": "", "HomePhone": "919 931-5737", "WorkPhone": "919 931-5737", "VacationDaysLeft": 10, "SickDaysLeft": 5, "StartDate": 16979, "Birthdate": 7270}
{"CDCTRANSACTIONID": 364688, "CDCTIMESTAMP": "2018-04-04 16:39:05:602 - 04:00", "CDCCHANGESEQ": 18, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 1, "EmpNum": 59, "LastName": "Bobba", "FirstName": "Saikrishna Teja", "Address": "9220 Bothwell St", "Address2": "", "City": "Raleigh", "State": "NC", "PostalCode": "27617", "DeptCode": "300", "Position": "", "HomePhone": "919 931-5737", "WorkPhone": "919 931-5737", "VacationDaysLeft": 10, "SickDaysLeft": 5, "StartDate": 16979, "Birthdate": 7270}
{"CDCTRANSACTIONID": 384368, "CDCTIMESTAMP": "2018-04-05 15:43:15:478 - 04:00", "CDCCHANGESEQ": 19, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": "CA", "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 384368, "CDCTIMESTAMP": "2018-04-05 15:43:15:478 - 04:00", "CDCCHANGESEQ": 19, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "Raleigh", "State": "NC", "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 650254, "CDCTIMESTAMP": "2018-04-18 16:19:35:669 - 04:00", "CDCCHANGESEQ": 20, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Diego", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
任何人都可以帮助我如何将其转换为数据框并将其临时存储在表中吗?
编辑:
最佳答案
stream
包含每个 interval
时间的 RDD,因此对于每个 interval
时间,您可以转换 rdd
数据农场为
stream.foreachRDD(rddRaw => {
val rdd = rddRaw.map(_.value.toString) // or rddRaw.map(_._2)
val df = spark.read.json(rdd)
})
这应该会按预期为您提供数据框。
希望这对您有所帮助!
关于apache-spark - Spark Streaming - 将 json 格式的消息 Dstream 到 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50274793/
我是Spark的新手。看起来超棒! 我有来自不同来源的每小时日志文件的内容,并且想用〜5分钟的滑动窗口从它们创建DStream,以探索相关性。 我只是想知道实现此目标的最佳方法是什么。我应该把它们切成
我想将 DStream 中的每个 RDD 加入一个非流式的、不变的引用文件。这是我的代码: val sparkConf = new SparkConf().setAppName("LogCounter
是否可以将流式传输 o.a.s.sql.Dataset 转换为 DStream?如果是,怎么办? 我知道如何将它转换为 RDD,但它是在流式上下文中。 最佳答案 这是不可能的。 Structured
我正在处理一个 java jar。累加器将流值相加。问题是,我想在每次递增时或在特定的周期间隔内在我的 UI 中显示该值。 但是,由于累加器的值只能从 Driver 程序中获取,因此在进程完成执行之前
Spark流textFileStream和fileStream可以监视目录并处理Dstream RDD中的新文件。 如何获取特定时间间隔内DStream RDD正在处理的文件名? 最佳答案 fileS
在 Spark Streaming 中,每一批数据总是生成一个且仅一个 RDD,为什么我们使用 foreachRDD() 来 foreach RDD? RDD只是一个,不需要foreach。在我的测试
无论数据量有多大,一个批处理的数据是否会在 DStream 中生成一个且仅一个 RDD? 最佳答案 是的,每个批处理间隔恰好有一个 RDD,在每个批处理间隔生成,与记录数量无关(包含在 RDD 中 -
我遇到了以下处理 Spark Streaming 中的消息的代码: val listRDD = ssc.socketTextStream(host, port) listRDD.foreachRDD(
我在 Spark Scala 中有一些 DStream,我想对它进行排序然后取前 N 个。问题是,每当我尝试运行它时,我都会得到 NotSerializableException 并且异常消息显示:
我正在尝试将部分函数传递给通过滑动窗口在 DStream 批处理中捕获的所有 RDD 的并集。假设我在离散为 1 秒批处理的流上构造了一个超过 10 秒的窗口操作: val ssc = new Str
我是 Spark 编程的新手。我有一个 Spark 流程序,它需要将接收到的 DStream 存储到数据库中。我想迭代我的 Dstream 并将每条记录存储到数据库中。 像这样。 JavaStream
我知道我们有一个 RDD 选项: JavaRDD javaRDD = coreRdd.toJavaRDD();` 是否可以将 Dstream 转换为 JavaDStream? 最佳答案 是的,您可以使
我一直面临关于将输出 Dstream 插入永久 SQL 表的“Spark Streaming”问题。我想将每个输出 DStream(来自激发进程的单个批次)插入到一个唯一的表中。我一直在使用 Pyth
我正在尝试使用 apache spark 流。我有一个数据源,来自 HDFS 的 csv 文件。 我打算用 Spark Stream 做以下事情: 使用 textFileStream 定期(5 分钟)
在 Spark Streaming 中,可以(如果您要使用有状态操作,则是强制性的)设置 StreamingContext将检查点执行到(AND)的可靠数据存储(S3,HDFS,...)中: 元数据
我正在使用 updateStateByKey()在我的 Spark Streaming 应用程序中维护状态的操作。输入数据来自 Kafka 主题。 我想了解 DStreams 是如何分区的? 分区如何
我已经经历了this stackoverflow 问题,根据答案,它创建了一个 DStream,批处理间隔只有一个 RDD。 例如: 我的批处理间隔是 1 分钟,Spark Streaming 作业正
如何从 dstream 窗口返回单个 rdd?: my_dstream_window : somedstream.window(3mins,1min) 假设上面的my_dstream_window包含
在函数中,有没有办法在使用 filter 后返回两个 DStream?例如,当我过滤一个DStream时,过滤后的将存储在一个DStream中,未过滤的将存储在另一个DStream中。 最佳答案 如果
使用 pyspark 从 kinesis 消费数据后,我有一个包含如下条目的 dstream: ('filename_1', [{'name': 'test'}, {'name': 'more'},
我是一名优秀的程序员,十分优秀!