gpt4 book ai didi

apache-spark - 在 Spark 结构化流中执行单独的流查询

转载 作者:行者123 更新时间:2023-12-04 03:15:38 24 4
gpt4 key购买 nike

我正在尝试使用两个不同的窗口聚合流并将其打印到控制台中。但是,仅打印第一个流式查询。 tenSecsQ没有打印到控制台中。

SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCountWindowed")
.config("spark.master", "local[*]")
.getOrCreate();

Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load();

Dataset<Row> words = lines
.as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
.toDF("word", "timestamp");

// 5 second window
Dataset<Row> fiveSecs = words
.groupBy(
functions.window(words.col("timestamp"), "5 seconds"),
words.col("word")
).count().orderBy("window");

// 10 second window
Dataset<Row> tenSecs = words
.groupBy(
functions.window(words.col("timestamp"), "10 seconds"),
words.col("word")
).count().orderBy("window");

为 5s 和 10s 聚合流触发流式查询。不打印 10s 流的输出。只有 5s 打印到控制台

// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
.queryName("5_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();

// Start writeStream() for 10s window
StreamingQuery tenSecsQ = tenSecs.writeStream()
.queryName("10_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();

tenSecsQ.awaitTermination();

最佳答案

我一直在研究这个问题。

总结:Structured Streaming 中的每个查询都会消耗 source数据。套接字源为定义的每个查询创建一个新连接。在这种情况下看到的行为是因为 nc仅将输入数据传送到第一个连接。

从今以后,不可能在套接字连接上定义多个聚合,除非我们可以确保连接的套接字源向每个打开的连接传递相同的数据。

我在 Spark 邮件列表上讨论了这个问题。
Databricks 开发者朱世雄回答:

Spark creates one connection for each query. The behavior you observed is because how "nc -lk" works. If you use netstat to check the tcp connections, you will see there are two connections when starting two queries. However, "nc" forwards the input to only one connection.



我通过定义一个小实验来验证这种行为:
首先,我创建了一个 SimpleTCPWordServer 它向每个打开的连接提供随机词和一个声明两个查询的基本结构化流作业。它们之间的唯一区别是第二个查询定义了一个额外的常量列来区分其输出:
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.option("includeTimestamp", true)
.load()

val q1 = lines.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()

val q2 = lines.withColumn("foo", lit("foo")).writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("7 seconds"))
.start()

如果 StructuredStreaming 只消耗一个流,那么我们应该看到两个查询传递的相同词。如果每个查询都使用单独的流,那么每个查询将报告不同的词。

这是观察到的输出:
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-------------------+
| value| timestamp|
+--------+-------------------+
|champion|2017-08-14 13:54:51|
+--------+-------------------+

+------+-------------------+---+
| value| timestamp|foo|
+------+-------------------+---+
|belong|2017-08-14 13:54:51|foo|
+------+-------------------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-------------------+---+
| value| timestamp|foo|
+-------+-------------------+---+
| agenda|2017-08-14 13:54:52|foo|
|ceiling|2017-08-14 13:54:52|foo|
| bear|2017-08-14 13:54:53|foo|
+-------+-------------------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------------------+
| value| timestamp|
+----------+-------------------+
| breath|2017-08-14 13:54:52|
|anticipate|2017-08-14 13:54:52|
| amazing|2017-08-14 13:54:52|
| bottle|2017-08-14 13:54:53|
| calculate|2017-08-14 13:54:53|
| asset|2017-08-14 13:54:54|
| cell|2017-08-14 13:54:54|
+----------+-------------------+

我们可以清楚地看到每个查询的流是不同的。看起来不可能对 socket source 提供的数据定义多个聚合。除非我们可以保证 TCP 后端服务器向每个打开的连接传递完全相同的数据。

关于apache-spark - 在 Spark 结构化流中执行单独的流查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45618489/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com