gpt4 book ai didi

apache-spark - 如何计算 Spark Structured Streaming 中的滞后差异?

转载 作者:行者123 更新时间:2023-12-04 15:50:17 31 4
gpt4 key购买 nike

我正在编写一个 Spark Structured Streaming 程序。我需要创建一个具有滞后差异的附加列。

为了重现我的问题,我提供了代码片段。此代码消耗 data.json文件存储在 data文件夹:

[
{"id": 77,"type": "person","timestamp": 1532609003},
{"id": 77,"type": "person","timestamp": 1532609005},
{"id": 78,"type": "crane","timestamp": 1532609005}
]

代码:

from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark = SparkSession \
.builder \
.appName("Test") \
.master("local[2]") \
.getOrCreate()

schema = StructType([
StructField("id", IntegerType()),
StructField("type", StringType()),
StructField("timestamp", LongType())
])

ds = spark \
.readStream \
.format("json") \
.schema(schema) \
.load("data/")

diff_window = Window.partitionBy("id").orderBy("timestamp")
ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))

query = ds \
.writeStream \
.format('console') \
.start()

query.awaitTermination()

我收到此错误:

pyspark.sql.utils.AnalysisException: u'Non-time-based windows are not supported on streaming DataFrames/Datasets;;\nWindow [lag(timestamp#71L, 1, null) windowspecdefinition(host_id#68, timestamp#71L ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS prev_timestamp#129L]

最佳答案

pyspark.sql.utils.AnalysisException: u'Non-time-based windows are not supported on streaming DataFrames/Datasets



这意味着您的窗口应该基于 timestamp柱子。所以你每秒有一个数据点,然后你做一个 30s带有 stride 的窗口的 10s ,您的结果窗口将创建一个新的 window列,带有 startend将包含差异为 30s 的时间戳的列.

您应该以这种方式使用窗口:
words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))

w = F.window('date_time', '30 seconds', '10 seconds')
words = words \
.withWatermark('date_format', '1 minutes') \
.groupBy(w).agg(F.mean('value'))

关于apache-spark - 如何计算 Spark Structured Streaming 中的滞后差异?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53450514/

31 4 0