gpt4 book ai didi

apache-spark - 在 Spark Structured Streaming 中使用 collect_list 时出错

转载 作者:行者123 更新时间:2023-12-03 17:32:10 24 4
gpt4 key购买 nike

我有两个版本的 Spark 代码。第一个使用带有 Kafka 源的结​​构化流:

dfStream.printSchema()
//root
//|-- dt: string (nullable = true)
//|-- ip: string (nullable = true)
//|-- device: string (nullable = true)

val dfWindowed = dfStream
.groupBy($"ip")
.agg(concat_ws(",", collect_list($"device")).alias("devices"))
.writeStream
.outputMode("complete")
.format("memory")
.start()

第二个从文件中读取。但是数据真的和上面一样:

logDF.printSchema() 
//root
//|-- dt: string (nullable = true)
//|-- ip: string (nullable = true)
//|-- device: string (nullable = true)

logDF.repartition(32)
.groupBy("ip")
.agg(concat_ws(",", collect_list($"device")).alias("devices"))

问题是,虽然第二个运行良好,但第一个一直给我以下错误:

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 1.0 (TID 28, c3-hadoop-prc-st3417.bj, executor 3): java.lang.RuntimeException: Collect cannot be used in partial aggregations.

一个长句子..但错误似乎如下:

java.lang.RuntimeException: Collect cannot be used in partial aggregations.

我发现了几个相关的 SO 问题,但到目前为止没有解决方案。非常感谢有关以下方面的任何建议:
  • “部分聚合”的含义以及静态(非流)数据集没有此类问题的原因,
  • 解决方法...
  • 最佳答案

    我最终按照建议写了一个 UDAF here .

    class ConcatString extends UserDefinedAggregateFunction {
    // This is the input fields for your aggregate function.
    override def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", StringType) :: Nil)

    // This is the internal fields you keep for computing your aggregate.
    override def bufferSchema: StructType = StructType(
    StructField("concated", StringType) :: Nil)

    // This is the output type of your aggregatation function.
    override def dataType: DataType = StringType

    override def deterministic: Boolean = true

    // This is the initial value for your buffer schema.
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = "-1"
    }

    // This is how to update your buffer schema given an input.
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = (buffer.getAs[String](0) + ",,," + input.getAs[String](0))
    .stripPrefix("-1,,,")
    }

    // This is how to merge two objects with the bufferSchema type.
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = (buffer1.getAs[String](0) + ",,," + buffer2.getAs[String](0))
    .stripPrefix("-1,,,")
    .stripSuffix(",,,-1")
    }

    // This is where you output the final value, given the final value of your bufferSchema.
    override def evaluate(buffer: Row): Any = {
    buffer.getString(0)
    }
    }

    注意:分隔符是',,,'。看似奇怪的“-1”初始化和随后的 stripPre/Subfix() 是我无意中连接缓冲区初始值的糟糕解决方案。

    用法如下:

    val udafConcatCol = new ConcatString
    val dfWindowed = dfStream
    .groupBy($"ip")
    .agg(udafConcatCol(col("device")).as("devices")
    ....

    关于apache-spark - 在 Spark Structured Streaming 中使用 collect_list 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51796690/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com