gpt4 book ai didi

scala - Spark Structured Streaming DataFrame 上的排序操作

转载 作者:行者123 更新时间:2023-12-05 00:43:35 24 4
gpt4 key购买 nike

我正在尝试对 Spark 结构化流数据帧进行非常简单的排序操作,但最终出现“线程“主”org.apache.spark.sql.AnalysisException 中的异常:流数据帧/数据集不支持排序,除非它在聚合数据帧/上完整输出模式下的数据集”,但有以下异常(exception)。你能帮我解决这个问题吗?

代码 :

   val df: DataFrame = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokerList)
.option("kafka.security.protocol", security)
.option("startingOffsets", "latest")
.option("subscribe", srcTopic)
.option("group.id", groupID)
.option("failOnDataLoss", false)
.load

val uDF = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.select($"value")
.select(from_json($"value", uSchema).as("events"))
.select($"events.*")

val uDF2 = uDF
.select($"COL1", $"COL2", $"COL3", $"COL4", $"COL5", $"COL6", $"COL7", $"COL8")
.sort($"COL5",$"COL3",$"COL8")


val kDF = uDF2
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.security.protocol", "PLAINTEXT")
.option("topic", "r_topic")
.option("checkpointLocation", "/tmp/kafka-sink-checkpoint")
.start()


kDF.awaitTermination()

异常(exception):
Exception in thread "main" org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;

数据:

想按 "COL5","COL3","COL8"对 DF 进行排序
+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+
|COL1 |COL2 |COL3 |COL4 |COL5 |COl6 |COL7 |COl8 |
+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+
|RunKafkaTest|DUMMY VALUE |1528326884394|52.0 |Analog |0 |1528326880|67 |
|RunKafkaTest|DUMMY VALUE |1528326884388|53.0 |Analog |0 |1528326880|68 |
|RunKafkaTest|DUMMY VALUE |1528326886400|54.0 |Analog |0 |1528326880|69 |
|RunKafkaTest|DUMMY VALUE |1528326887412|55.0 |Analog |0 |1528326880|70 |
|RunKafkaTest|DUMMY VALUE |1528326887406|56.0 |Analog |0 |1528326880|71 |
|RunKafkaTest|DUMMY VALUE |1528326889418|57.0 |Analog |0 |1528326880|72 |
|RunKafkaTest|DUMMY VALUE |1528326890423|58.0 |Analog |0 |1528326880|73 |
|RunKafkaTest|DUMMY VALUE |1528326891429|59.0 |Analog |0 |1528326880|74 |
|RunKafkaTest|DUMMY VALUE |1528326892435|1.0 |Analog |0 |1528326880|76 |
|RunKafkaTest|DUMMY VALUE |1528326893449|2.0 |Analog |0 |1528326880|77 |
|RunKafkaTest|DUMMY VALUE |1528326894447|3.0 |Analog |0 |1528326880|78 |
|RunKafkaTest|DUMMY VALUE |1528326895459|4.0 |Analog |0 |1528326880|79 |
|RunKafkaTest|DUMMY VALUE |1528326896458|5.0 |Analog |0 |1528326880|80 |
|RunKafkaTest|DUMMY VALUE |1528326897464|6.0 |Analog |0 |1528326880|81 |
|RunKafkaTest|DUMMY VALUE |1528326898370|7.0 |Analog |0 |1528326880|82 |
|RunKafkaTest|DUMMY VALUE |1528326899476|8.0 |Analog |0 |1528326880|83 |
|RunKafkaTest|DUMMY VALUE |1528326900482|9.0 |Analog |0 |1528326880|84 |
|RunKafkaTest|DUMMY VALUE |1528326901488|10.0 |Analog |0 |1528326880|85 |
|RunKafkaTest|DUMMY VALUE |1528326902493|11.0 |Analog |0 |1528326880|86 |
+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+

最佳答案

您可能需要重新考虑哪个将是流中排序的输出。在真正的流媒体中,您永远不会得到输出,因为理论上您不太可能遇到流中的最后一个事件。虽然 Spark 实际上是进行微批处理,但它试图保持语义与真正的流式传输相似。您最终可能会重新定义您的问题,并利用窗口化或 flatMapGroupsWithState 等有状态操作。您也许还可以手动拆分范围并运行批处理。

关于scala - Spark Structured Streaming DataFrame 上的排序操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50982620/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com