gpt4 book ai didi

scala - 如何加入 2 个 spark sql 流

转载 作者:行者123 更新时间:2023-12-01 04:47:56 26 4
gpt4 key购买 nike

环境:
斯卡拉 Spark 版本:2.1.1

这是我的流(从 kafka 读取):

val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("JoinStreams")

val spark = SparkSession.builder().config(conf).getOrCreate()

import spark.implicits._

val schema = StructType(
List(
StructField("t", DataTypes.StringType),
StructField("dst", DataTypes.StringType),
StructField("dstPort", DataTypes.IntegerType),
StructField("src", DataTypes.StringType),
StructField("srcPort", DataTypes.IntegerType),
StructField("ts", DataTypes.LongType),
StructField("len", DataTypes.IntegerType),
StructField("cpu", DataTypes.DoubleType),
StructField("l", DataTypes.StringType),
StructField("headers", DataTypes.createArrayType(DataTypes.StringType))
)
)
val baseDataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", 'topic')
.load()
.selectExpr("cast (value as string) as json")
.select(from_json($"json", schema).as("data"))
.select($"data.*")

val requestsDataFrame = baseDataFrame
.filter("t = 'REQUEST'")
.repartition($"dst")
.withColumn("rowId", monotonically_increasing_id())

val responseDataFrame = baseDataFrame
.filter("t = 'RESPONSE'")
.repartition($"src")
.withColumn("rowId", monotonically_increasing_id())

responseDataFrame.createOrReplaceTempView("responses")
requestsDataFrame.createOrReplaceTempView("requests")


val dataFrame = spark.sql("select * from requests left join responses ON requests.rowId = responses.rowId")

启动应用程序时出现此错误:
org.apache.spark.sql.AnalysisException: Left outer/semi/anti joins with a streaming DataFrame/Dataset on the right is not supported;;

我怎样才能加入这两个流?
我也尝试直接加入并得到相同的错误。
我应该先将它保存到文件然后再读一遍吗?
最佳做法是什么?

最佳答案

看来您需要 Spark 2.3:

“在 Spark 2.3 中,我们添加了对流-流连接的支持......”

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins

关于scala - 如何加入 2 个 spark sql 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45049878/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com