- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
下面是我的 Spark Dataframe,我想进行插值并为此编写 Spark UDF,我不知道如何编写更好的逻辑并从上面创建 UDF
这用于转换 Position_float 并将其插值为整数,以将 Position 转换为适当的整数值
def dirty_fill(df, id_col, y_cols):
from pyspark.sql import types as T
df = df.withColumn('position_plus', (df.position_float + 0.5).cast(T.IntegerType()))
df = df.withColumn('position_minus', (df.position_float - 0.5).cast(T.IntegerType()))
df = df.withColumn('position', df.position_float.cast(T.IntegerType()))
df1 = df.select([id_col, 'position_plus'] + y_cols).withColumnRenamed('position_plus', 'position')
df2 = df.select([id_col, 'position_minus'] + y_cols).withColumnRenamed('position_minus', 'position')
df3 = df.select([id_col, 'position'] + y_cols)
df123 = df1.union(df2).union(df3).sort([id_col, 'position']).dropDuplicates([id_col, 'position'])
return df123
y_cols = ['entry_temperature']
finish_mill_entry_filled = dirty_fill(finish_mill_entry, 'finish_mill_id', y_cols)
这是我的数据框示例
| Finishing_mill_id | Sample | Position_float | Entry_Temp |
|--------------------|---------|----------------|------------|
| 2015418529 | 1 | 0.000000 | 1986.0 |
| 2015418529 | 2 | 2.192982 | 1997.0 |
| 2015418529 | 3 | 4.385965 | 2003.0 |
| 2018171498 | 445 | 495.535714 | 1643.0 |
| 2018171498 | 446 | 496.651786 | 1734.0 |
| 2018171498 | 447 | 497.767857 | 1748.0 |
| 2018171498 | 448 | 498.883929 | 1755.0 |
我需要将 float 插值到整数
我想要的是
| Finishing_mill_id | Sample | Position_float | Entry_Temp |
|--------------------|---------|----------------|------------|
| 2015418529 | 1 | 0 | 1986.0 |
| 2015418529 | 2 | 1 | 1986 |
| 2015418529 | 3 | 2 | 1997.0 |
| 2015418529 | 4 | 3 | 1997 |
| 2015418529 | 5 | 4 | 2003.0 |
| 2018171498 | 445 | 496 | 1643.0 |
| 2018171498 | 446 | 497 | 1734.0 |
| 2018171498 | 447 | 498 | 1748.0 |
| 2018171498 | 448 | 499 | 1755.0 |
我需要一个 Spark 用户定义的函数来执行此操作,并且不应丢失任何数据点,因为我的 Position_float 范围在 0-500 之间,我还需要注意每个点都不会丢失任何点。需要以适当的方式修改我的插值逻辑
为了不太清楚地说我有立场0.0002.19 但我没有 datapaoint,但是当我需要时我需要什么我需要 1.00 的位置..即使数据不存在线性插值,我也需要位置 1.00 的值。我希望它有帮助
最佳答案
<强>1。窗口函数
您可以使用窗口函数来填充间隙并对值进行插值。
让我们从示例数据框开始:
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
from pyspark.sql import Window
import numpy as np
df = spark.createDataFrame(
[[float(t)/10., float(v)] for t, v in zip(np.random.randint(0, 1000, 20), np.random.randint(100, 200, 20))],
schema=pst.StructType([pst.StructField(c, pst.FloatType()) for c in ['position', 'value']])) \
.withColumn('position_round', psf.round('position'))
+--------+-----+--------------+
|position|value|position_round|
+--------+-----+--------------+
| 68.5|121.0| 69.0|
| 76.3|126.0| 76.0|
| 88.3|150.0| 88.0|
| 59.0|197.0| 59.0|
| 20.7|119.0| 21.0|
| 0.1|167.0| 0.0|
| 20.1|177.0| 20.0|
| 81.9|199.0| 82.0|
| 63.6|163.0| 64.0|
| 32.4|115.0| 32.0|
| 43.6|130.0| 44.0|
| 11.9|175.0| 12.0|
| 68.2|176.0| 68.0|
| 28.9|184.0| 29.0|
| 46.3|199.0| 46.0|
| 9.7|155.0| 10.0|
| 57.8|163.0| 58.0|
| 83.6|173.0| 84.0|
| 16.2|169.0| 16.0|
| 87.1|127.0| 87.0|
+--------+-----+--------------+
为了填补空白,我们将创建一系列整数:
start, end = list(df.agg(psf.min('position_round'), psf.max('position_round')).collect()[0])
pos_df = spark.range(start=start, end=end, step=1) \
.withColumnRenamed('id', 'position_round')
现在我们可以连接两个数据框:
w1 = Window.orderBy('position_round')
w2 = Window.partitionBy('group').orderBy('position_round')
df_resample = df \
.select(
'*',
psf.lead('position_round', 1).over(w1).alias('next_position'),
psf.lead('value', 1).over(w1).alias('next_value')) \
.join(pos_df, on='position_round', how='right') \
.withColumn('group', psf.sum((~psf.isnull('position')).cast('int')).over(w1)) \
.select(
'*',
(psf.row_number().over(w2) - 1).alias('i'),
psf.first(psf.col('next_position') - psf.col('position_round')).over(w2).alias('dx'),
psf.first('value').over(w2).alias('value0'),
psf.first(psf.col('next_value') - psf.col('value')).over(w2).alias('dy')) \
.withColumn(
'value_round',
psf.when((psf.col('dx') > 0) | psf.isnull('next_value'), psf.col('value0') + psf.col('i') * psf.col('dy') / psf.col('dx')) \
.otherwise(psf.col('value')))
next_value
和 next_position
以便稍后能够计算我们的 dx
和 dy
>组
ID 来识别每个间隙,以便我们可以为每个不同的线性段插入值dx
dy
i
我们现在可以计算 value_round
,即位置 position_round
处 value
的插值
+--------------+--------+-----+-------------+----------+-----+---+----+------+-----+-----------+
|position_round|position|value|next_position|next_value|group| i| dx|value0| dy|value_round|
+--------------+--------+-----+-------------+----------+-----+---+----+------+-----+-----------+
| 0| 0.1|167.0| 10.0| 155.0| 1| 0|10.0| 167.0|-12.0| 167.0|
| 1| null| null| null| null| 1| 1|10.0| 167.0|-12.0| 165.8|
| 2| null| null| null| null| 1| 2|10.0| 167.0|-12.0| 164.6|
| 3| null| null| null| null| 1| 3|10.0| 167.0|-12.0| 163.4|
| 4| null| null| null| null| 1| 4|10.0| 167.0|-12.0| 162.2|
| 5| null| null| null| null| 1| 5|10.0| 167.0|-12.0| 161.0|
| 6| null| null| null| null| 1| 6|10.0| 167.0|-12.0| 159.8|
| 7| null| null| null| null| 1| 7|10.0| 167.0|-12.0| 158.6|
| 8| null| null| null| null| 1| 8|10.0| 167.0|-12.0| 157.4|
| 9| null| null| null| null| 1| 9|10.0| 167.0|-12.0| 156.2|
| 10| 9.7|155.0| 12.0| 175.0| 2| 0| 2.0| 155.0| 20.0| 155.0|
| 11| null| null| null| null| 2| 1| 2.0| 155.0| 20.0| 165.0|
| 12| 11.9|175.0| 16.0| 169.0| 3| 0| 4.0| 175.0| -6.0| 175.0|
| 13| null| null| null| null| 3| 1| 4.0| 175.0| -6.0| 173.5|
| 14| null| null| null| null| 3| 2| 4.0| 175.0| -6.0| 172.0|
| 15| null| null| null| null| 3| 3| 4.0| 175.0| -6.0| 170.5|
| 16| 16.2|169.0| 20.0| 177.0| 4| 0| 4.0| 169.0| 8.0| 169.0|
| 17| null| null| null| null| 4| 1| 4.0| 169.0| 8.0| 171.0|
| 18| null| null| null| null| 4| 2| 4.0| 169.0| 8.0| 173.0|
| 19| null| null| null| null| 4| 3| 4.0| 169.0| 8.0| 175.0|
+--------------+--------+-----+-------------+----------+-----+---+----+------+-----+-----------+
<强>2。 UDF
如果您不想使用窗口函数,您可以编写一个 UDF
来在 python
中进行插值,然后返回一个(位置,值)元组数组:
def interpolate(pos, next_pos, value, next_value):
if pos == next_pos or next_value is None:
return [(pos, value)]
return [[pos + i, value + i * (next_value - value) / (next_pos - pos)] for i in range(int(next_pos - pos))]
interpolate_udf = psf.udf(interpolate, pst.ArrayType(pst.StructType([pst.StructField(c, pst.FloatType()) for c in ['position_round', 'value_round']])))
请注意,元组的类型为 StructType
,以便更轻松地将元组“展平”为列。
w1 = Window.orderBy('position_round')
df_udf = df \
.select(
'*',
psf.lead('position_round', 1).over(w1).alias('next_position'),
psf.lead('value', 1).over(w1).alias('next_value')) \
.withColumn('tmp', psf.explode(interpolate_udf('position_round', 'next_position', 'value', 'next_value'))) \
.select('*', 'tmp.*').drop('tmp')
这是我们得到的:
+--------+-----+--------------+-------------+----------+--------------+----------+
|position|value|position_round|next_position|next_value|position_round|value_round|
+--------+-----+--------------+-------------+----------+--------------+----------+
| 0.1|167.0| 0.0| 10.0| 155.0| 0.0| 167.0|
| 0.1|167.0| 0.0| 10.0| 155.0| 1.0| 165.8|
| 0.1|167.0| 0.0| 10.0| 155.0| 2.0| 164.6|
| 0.1|167.0| 0.0| 10.0| 155.0| 3.0| 163.4|
| 0.1|167.0| 0.0| 10.0| 155.0| 4.0| 162.2|
| 0.1|167.0| 0.0| 10.0| 155.0| 5.0| 161.0|
| 0.1|167.0| 0.0| 10.0| 155.0| 6.0| 159.8|
| 0.1|167.0| 0.0| 10.0| 155.0| 7.0| 158.6|
| 0.1|167.0| 0.0| 10.0| 155.0| 8.0| 157.4|
| 0.1|167.0| 0.0| 10.0| 155.0| 9.0| 156.2|
| 9.7|155.0| 10.0| 12.0| 175.0| 10.0| 155.0|
| 9.7|155.0| 10.0| 12.0| 175.0| 11.0| 165.0|
| 11.9|175.0| 12.0| 16.0| 169.0| 12.0| 175.0|
| 11.9|175.0| 12.0| 16.0| 169.0| 13.0| 173.5|
| 11.9|175.0| 12.0| 16.0| 169.0| 14.0| 172.0|
| 11.9|175.0| 12.0| 16.0| 169.0| 15.0| 170.5|
| 16.2|169.0| 16.0| 20.0| 177.0| 16.0| 169.0|
| 16.2|169.0| 16.0| 20.0| 177.0| 17.0| 171.0|
| 16.2|169.0| 16.0| 20.0| 177.0| 18.0| 173.0|
| 16.2|169.0| 16.0| 20.0| 177.0| 19.0| 175.0|
+--------+-----+--------------+-------------+----------+--------------+----------+
关于python - 如何创建 Spark udf 将 float 插值到 INT 以及如何编写比我所做的更好的逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55771749/
我在弄清楚如何从另一个 UDF 返回 UDF 中的数组时遇到了一些麻烦。这里的一个是简单的指数移动平均 UDF,我试图将数组返回到另一个 UDF,但我收到 #value 错误。我觉得有一个我没有看到的
我需要从另一个模块(在同一个工作簿中)调用以前制作的 UDF 来构建另一个 UDF。如何调用第一个函数? 这适用于 Excel VBA。我已经构建了我的第一个函数,它计算产品在特定时间的账面值(val
这个问题可能对许多 VBA 程序员有用。它涉及实现两个有用的独立任务并使它们同时工作。 第一个任务是为 UDF 制作 Excel 函数工具提示。虽然似乎还没有找到明确的解决方案,但目前我对自定义插入函
我正在将 Spark 与 Scala 一起使用,并希望将整行传递给 udf 并选择 udf 中的每个列名和列值。我怎样才能做到这一点? 我正在尝试以下 - inputDataDF.withColumn
这个问题在这里已经有了答案: Spark functions vs UDF performance? (3 个答案) 关闭2 年前。 我从 Pyspark 网站获取了以下 UDF,因为我试图了解是否
我已经使用 Spark 2.4 一段时间了,最近几天才开始切换到 Spark 3.0。切换到 Spark 3.0 运行后出现此错误 udf((x: Int) => x, IntegerType)
这个问题源自 SQLServer: Why avoid Table-Valued User Defined Functions? 。我开始在一些评论中提出问题,而对我评论的回复却偏离了主题。 这样您就
这是我的 hive 表 CREATE TABLE `dum`(`val` map>); insert into dum select map('A',array('1','2','3'),'B',ar
我想知道编写 spark udf 是否会降低性能。一般来说,我更喜欢组合做一件事的小函数…… 这是一个简单的例子,给定一个 DataFrame df: def inc = udf( (i: Doubl
我正在尝试根据另一列的值在 Spark 数据集中创建一个新列。另一列的值作为键在 json 文件中搜索,返回的值是用于新列的值。 这是我尝试过的代码,但它不起作用,而且我不确定 UDF 是如何工作的。
SPARK_VERSION = 2.2.0 我在尝试做 filter 时遇到了一个有趣的问题。在具有使用 UDF 添加的列的数据帧上。我能够用较小的数据集复制问题。 鉴于虚拟案例类: case cla
我正在 Java 中使用 Spark 来处理 XML 文件。来自databricks的spark-xml包用于将xml文件读入dataframe。 示例 xml 文件是: 1 joh
我正在尝试创建一个 MySQL UDF getFile(),它应该从磁盘上的某个目录返回文本文件的内容。问题是调用一次或两次有效,但在第二次或第三次调用 UDF 时,MySQL 服务器崩溃。 我无法通
我听说 Microsoft SQL Server 中有多种方法可以查找“最差”存储过程:按执行次数、按 CPU 工作时间、按队列等待时间等。 我正在寻找一种方法来查找最差(最慢/最常用)的 UDF -
我已经为一个项目构建了一个包含多个公式的 Excel 工作表。然后,我添加了一个用于折叠/展开某些单元组的命令按钮。 命令按钮代码是: Private Sub CommandButton1_Click
MySQL版本:5.1.73数据库客户端版本:libmysql - 5.1.73 我试图检查 NEW.src 在过去一小时内是否存在,如果不存在则执行 sys_exec udf。 我在 mysql 中
我正在尝试将元组列表传递给 scala 中的 udf。我不确定如何为此准确定义数据类型。我试图将它作为一整行传递,但它无法真正解决它。我需要根据元组的第一个元素对列表进行排序,然后返回 n 个元素。我
关闭。这个问题是not reproducible or was caused by typos .它目前不接受答案。 此问题是由拼写错误或无法再重现的问题引起的。虽然类似的问题可能是 on-topic
我正在尝试创建一个类似 =Extractinfo("A2","Name") 的函数,它可以从原始数据中提取姓名、电话和电子邮件 ID,一个用于所有 3 次提取的函数,我已经有一个提取电子邮件 ID 的
我正在编写一个用户定义函数(UDF),它以一些单元格作为参数。 这些单元格包含相同的数据,但精度不同;该功能显示可用的最佳精度。 函数的参数按精度升序编写。 这是一个例子: +---+--------
我是一名优秀的程序员,十分优秀!