gpt4 book ai didi

scala - 关于如何在 Scala 中使用随机值向现有 DataFrame 添加新列

转载 作者:行者123 更新时间:2023-12-04 11:08:05 24 4
gpt4 key购买 nike

我有一个带有 Parquet 文件的数据框,我必须添加一个包含一些随机数据的新列,但我需要这些随机数据彼此不同。这是我的实际代码,spark 的当前版本是 1.5.1-cdh-5.5.2:

val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686
mydf.cache

val r = scala.util.Random
import org.apache.spark.sql.functions.udf
def myNextPositiveNumber :String = { (r.nextInt(Integer.MAX_VALUE) + 1 ).toString.concat("D")}
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))

使用此代码,我有以下数据:
scala> myNewDF.select("myNewColumn").show(10,false)
+-----------+
|myNewColumn|
+-----------+
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
+-----------+

看起来 udf myNextPositiveNumber 只被调用一次,不是吗?

更新
确认,只有一个不同的值:
scala> myNewDF.select("myNewColumn").distinct.show(50,false)
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
...

+-----------+
|myNewColumn|
+-----------+
|889488717D |
+-----------+

我做错了什么?

更新 2:最后,在@user6910411 的帮助下,我得到了以下代码:
val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686
mydf.cache

val r = scala.util.Random

import org.apache.spark.sql.functions.udf

val accum = sc.accumulator(1)

def myNextPositiveNumber():String = {
accum+=1
accum.value.toString.concat("D")
}

val myFunction = udf(myNextPositiveNumber _)

val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))

myNewDF.select("myNewColumn").count

// 63385686

更新 3

实际代码生成如下数据:
scala> mydf.select("myNewColumn").show(5,false)
17/02/22 11:01:57 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+-----------+
|myNewColumn|
+-----------+
|2D |
|2D |
|2D |
|2D |
|2D |
+-----------+
only showing top 5 rows

看起来 udf 函数只被调用一次,不是吗?我需要在该列中添加一个新的随机元素。

更新 4 @user6910411

我有这个增加 id 的实际代码,但它没有连接最终的字符,这很奇怪。这是我的代码:
import org.apache.spark.sql.functions.udf


val mydf = sqlContext.read.parquet("some.parquet")

mydf.cache

def myNextPositiveNumber():String = monotonically_increasing_id().toString().concat("D")

val myFunction = udf(myNextPositiveNumber _)

val myNewDF = mydf.withColumn("myNewColumn",expr(myNextPositiveNumber))

scala> myNewDF.select("myNewColumn").show(5,false)
17/02/22 12:00:02 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_4_0]
+-----------+
|myNewColumn|
+-----------+
|0 |
|1 |
|2 |
|3 |
|4 |
+-----------+

我需要类似的东西:
+-----------+
|myNewColumn|
+-----------+
|1D |
|2D |
|3D |
|4D |
+-----------+

最佳答案

Spark >= 2.3

可以使用 asNondeterministic 禁用一些优化。方法:

import org.apache.spark.sql.expressions.UserDefinedFunction

val f: UserDefinedFunction = ???
val fNonDeterministic: UserDefinedFunction = f.asNondeterministic

在使用此选项之前,请确保您了解保证。

Spark < 2.3

传递给 udf 的函数应该是确定性的(可能有异常(exception) SPARK-20586 )并且空函数调用可以被常量替换。如果要生成随机数,请使用内置函数:
  • rand - 从 U[0.0, 1.0] 生成具有独立同分布(i.i.d.)样本的随机列。
  • randn - 从标准正态分布中生成具有独立同分布(i.i.d.)样本的列。

  • 并转换输出以获得所需的分布,例如:
    (rand * Integer.MAX_VALUE).cast("bigint").cast("string")

    关于scala - 关于如何在 Scala 中使用随机值向现有 DataFrame 添加新列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42367464/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com