gpt4 book ai didi

java - 如何将 DataSet 转换为 JSON 消息的 DataSet 以写入 Kafka?

转载 作者:行者123 更新时间:2023-11-30 06:48:26 25 4
gpt4 key购买 nike

我使用 Spark 2.1.1。

我有以下DataSet<Row> ds1;

 name   | ratio | count  // column names
"hello" | 1.56 | 34

( ds1.isStreaming 给出 true )

我正在尝试生成 DataSet<String> ds2。换句话说,当我写入卡夫卡接收器时,我想写这样的东西

{"name": "hello", "ratio": 1.56, "count": 34}

我尝试过这样的事情df2.toJSON().writeStream().foreach(new KafkaSink()).start()但随后出现以下错误

Queries with streaming sources must be executed with writeStream.start()

to_jsonjson_tuple但是我不确定如何在这里利用它们?


我使用 json_tuple() 尝试了以下操作功能

 Dataset<String> df4 = df3.select(json_tuple(new Column("result"), " name", "ratio", "count")).as(Encoders.STRING());

我收到以下错误:

cannot resolve 'result' given input columns: [name, ratio, count];;

最佳答案

tl;dr 使用 struct 函数,然后使用 to_json (因为 toJSON 对于流数据集来说已损坏,因为SPARK-17029 刚刚修复了 20 days ago )。


引用struct的scaladoc :

struct(colName: String, colNames: String*): Column Creates a new struct column that composes multiple input columns.

假设您使用 Java API,您有 4 个不同的 struct 变体功能也:

public static Column struct(Column... cols) Creates a new struct column.

to_json您的案例涵盖的功能:

public static Column to_json(Column e) Converts a column containing a StructType into a JSON string with the specified schema.

以下是 Scala 代码(将其翻译为 Java 是您的家庭练习):

val ds1 = Seq(("hello", 1.56, 34)).toDF("name", "ratio", "count")
val recordCol = to_json(struct("name", "ratio", "count")) as "record"
scala> ds1.select(recordCol).show(truncate = false)
+----------------------------------------+
|record |
+----------------------------------------+
|{"name":"hello","ratio":1.56,"count":34}|
+----------------------------------------+

我也尝试了您的解决方案(今天构建了 Spark 2.3.0-SNAPSHOT),看起来它工作得很好。

val fromKafka = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
select('value cast "string")
fromKafka.
toJSON. // <-- JSON conversion
writeStream.
format("console"). // using console sink
start

format("kafka") 已添加到 SPARK-19719并且在 2.1.0 中不可用。

关于java - 如何将 DataSet<Row> 转换为 JSON 消息的 DataSet 以写入 Kafka?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44280360/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com