gpt4 book ai didi

scala - 如何在结构化流中创建列的所有值的列表?

转载 作者:行者123 更新时间:2023-12-02 22:35:16 24 4
gpt4 key购买 nike

我有一个 spark 结构化的流作业,它从 Kafka 获取记录(10,000 作为 maxOffsetsPerTrigger)。我通过 spark 的 readStream 方法获得了所有这些记录。此数据框有一个名为“key”的列。

我需要 string(set(all values in that column 'key')) 在 ElasticSearch 的查询中使用这个字符串。

我已经试过了 df.select("key").collect().distinct()但它抛出异常:

 collect() is not supported with structured streaming.

谢谢。

编辑:
数据帧:
+-------+-------------------+----------+
| key| ex|new column|
+-------+-------------------+----------+
| fruits| [mango, apple]| |
|animals| [cat, dog, horse]| |
| human|[ram, shyam, karun]| |
+-------+-------------------+----------+

架构:
root
|-- key: string (nullable = true)
|-- ex: array (nullable = true)
| |-- element: string (containsNull = true)
|-- new column: string (nullable = true)

我需要的字符串:
'["fruits", "animals", "human"]'

最佳答案

您不能在流数据帧上应用收集。这里的streamingDf指的是从Kafka读取。

val query = streamingDf
.select(col("Key").cast(StringType))
.writeStream
.format("console")
.start()

query.awaitTermination()

它将在控制台中打印您的数据。要在外部源中写入数据,您必须提供 foreachWriter 的实现。供引用, refer

在给定的链接中,数据使用 Kafka 进行流式传输,由 Spark 读取并最终写入 Cassandra。

希望,它会有所帮助。

关于scala - 如何在结构化流中创建列的所有值的列表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57724861/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com