gpt4 book ai didi

java - 将 Spark SQL 与 Spark Streaming 结合使用

转载 作者:太空宇宙 更新时间:2023-11-04 10:09:14 24 4
gpt4 key购买 nike

尝试理解 SparkSql 相对于 Spark 结构化流的意义。
Spark Session 从 kafka 主题读取事件,将数据聚合到按不同列名称分组的计数,并将其打印到控制台。
原始输入数据结构如下:

+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
|. sourceTypes| Guid| platform|datacenter|pagesId| eventTimestamp| Id1234| Id567890|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
| Notififcation|....................| ANDROID| dev| aa|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................| ANDROID| dev| ab|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:46|0ee089c1-d5da-3b3...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:48|57c18964-40c9-311...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:52|d9fc4cfa-0934-3e9...|{"id":...|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+---------+

sourceTypesplatformdatacenterpageId 需要计数。

使用以下代码聚合数据:

Dataset<Row> query = sourceDataset
.withWatermark("eventTimestamp", watermarkInterval)
.select(
col("eventTimestamp"),
col("datacenter"),
col("platform"),
col("pageId")
)
.groupBy(
window(col("eventTimestamp"), windowInterval),
col("datacenter"),
col("platform"),
col("pageId")
)
.agg(
max(col("eventTimestamp"))
);

这里watermarkInterval=45秒windowInterval=15秒triggerInterval=15秒

使用新的聚合数据集:

aggregatedDataset
.writeStream()
.outputMode(OutputMode.Append())
.format("console")
.trigger(Trigger.ProcessingTime(triggerInterval))
.start();

有几个问题:

  1. 输出数据不会打印每个groupBy(如平台、pageId 等)的计数。

  2. 如何以json格式打印输出?我尝试在控制台上输出数据时使用 select(to_json(struct("*")).as("value")) 但它不起作用。

最佳答案

您可以使用以下代码片段解决您的问题:

.outputMode("complete")

关于java - 将 Spark SQL 与 Spark Streaming 结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52538968/

24 4 0