- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
下面是我的 Spark Dataframe,我想进行插值并为此编写 Spark UDF,我不知道如何编写更好的逻辑并从上面创建 UDF
这用于转换 Position_float 并将其插值为整数,以将 Position 转换为适当的整数值
def dirty_fill(df, id_col, y_cols):
from pyspark.sql import types as T
df = df.withColumn('position_plus', (df.position_float + 0.5).cast(T.IntegerType()))
df = df.withColumn('position_minus', (df.position_float - 0.5).cast(T.IntegerType()))
df = df.withColumn('position', df.position_float.cast(T.IntegerType()))
df1 = df.select([id_col, 'position_plus'] + y_cols).withColumnRenamed('position_plus', 'position')
df2 = df.select([id_col, 'position_minus'] + y_cols).withColumnRenamed('position_minus', 'position')
df3 = df.select([id_col, 'position'] + y_cols)
df123 = df1.union(df2).union(df3).sort([id_col, 'position']).dropDuplicates([id_col, 'position'])
return df123
y_cols = ['entry_temperature']
finish_mill_entry_filled = dirty_fill(finish_mill_entry, 'finish_mill_id', y_cols)
这是我的数据框示例
| Finishing_mill_id | Sample | Position_float | Entry_Temp |
|--------------------|---------|----------------|------------|
| 2015418529 | 1 | 0.000000 | 1986.0 |
| 2015418529 | 2 | 2.192982 | 1997.0 |
| 2015418529 | 3 | 4.385965 | 2003.0 |
| 2018171498 | 445 | 495.535714 | 1643.0 |
| 2018171498 | 446 | 496.651786 | 1734.0 |
| 2018171498 | 447 | 497.767857 | 1748.0 |
| 2018171498 | 448 | 498.883929 | 1755.0 |
我需要将 float 插值到整数
我想要的是
| Finishing_mill_id | Sample | Position_float | Entry_Temp |
|--------------------|---------|----------------|------------|
| 2015418529 | 1 | 0 | 1986.0 |
| 2015418529 | 2 | 1 | 1986 |
| 2015418529 | 3 | 2 | 1997.0 |
| 2015418529 | 4 | 3 | 1997 |
| 2015418529 | 5 | 4 | 2003.0 |
| 2018171498 | 445 | 496 | 1643.0 |
| 2018171498 | 446 | 497 | 1734.0 |
| 2018171498 | 447 | 498 | 1748.0 |
| 2018171498 | 448 | 499 | 1755.0 |
我需要一个 Spark 用户定义的函数来执行此操作,并且不应丢失任何数据点,因为我的 Position_float 范围在 0-500 之间,我还需要注意每个点都不会丢失任何点。需要以适当的方式修改我的插值逻辑
为了不太清楚地说我有立场0.0002.19 但我没有 datapaoint,但是当我需要时我需要什么我需要 1.00 的位置..即使数据不存在线性插值,我也需要位置 1.00 的值。我希望它有帮助
最佳答案
<强>1。窗口函数
您可以使用窗口函数来填充间隙并对值进行插值。
让我们从示例数据框开始:
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
from pyspark.sql import Window
import numpy as np
df = spark.createDataFrame(
[[float(t)/10., float(v)] for t, v in zip(np.random.randint(0, 1000, 20), np.random.randint(100, 200, 20))],
schema=pst.StructType([pst.StructField(c, pst.FloatType()) for c in ['position', 'value']])) \
.withColumn('position_round', psf.round('position'))
+--------+-----+--------------+
|position|value|position_round|
+--------+-----+--------------+
| 68.5|121.0| 69.0|
| 76.3|126.0| 76.0|
| 88.3|150.0| 88.0|
| 59.0|197.0| 59.0|
| 20.7|119.0| 21.0|
| 0.1|167.0| 0.0|
| 20.1|177.0| 20.0|
| 81.9|199.0| 82.0|
| 63.6|163.0| 64.0|
| 32.4|115.0| 32.0|
| 43.6|130.0| 44.0|
| 11.9|175.0| 12.0|
| 68.2|176.0| 68.0|
| 28.9|184.0| 29.0|
| 46.3|199.0| 46.0|
| 9.7|155.0| 10.0|
| 57.8|163.0| 58.0|
| 83.6|173.0| 84.0|
| 16.2|169.0| 16.0|
| 87.1|127.0| 87.0|
+--------+-----+--------------+
为了填补空白,我们将创建一系列整数:
start, end = list(df.agg(psf.min('position_round'), psf.max('position_round')).collect()[0])
pos_df = spark.range(start=start, end=end, step=1) \
.withColumnRenamed('id', 'position_round')
现在我们可以连接两个数据框:
w1 = Window.orderBy('position_round')
w2 = Window.partitionBy('group').orderBy('position_round')
df_resample = df \
.select(
'*',
psf.lead('position_round', 1).over(w1).alias('next_position'),
psf.lead('value', 1).over(w1).alias('next_value')) \
.join(pos_df, on='position_round', how='right') \
.withColumn('group', psf.sum((~psf.isnull('position')).cast('int')).over(w1)) \
.select(
'*',
(psf.row_number().over(w2) - 1).alias('i'),
psf.first(psf.col('next_position') - psf.col('position_round')).over(w2).alias('dx'),
psf.first('value').over(w2).alias('value0'),
psf.first(psf.col('next_value') - psf.col('value')).over(w2).alias('dy')) \
.withColumn(
'value_round',
psf.when((psf.col('dx') > 0) | psf.isnull('next_value'), psf.col('value0') + psf.col('i') * psf.col('dy') / psf.col('dx')) \
.otherwise(psf.col('value')))
next_value
和 next_position
以便稍后能够计算我们的 dx
和 dy
>组
ID 来识别每个间隙,以便我们可以为每个不同的线性段插入值dx
dy
i
我们现在可以计算 value_round
,即位置 position_round
处 value
的插值
+--------------+--------+-----+-------------+----------+-----+---+----+------+-----+-----------+
|position_round|position|value|next_position|next_value|group| i| dx|value0| dy|value_round|
+--------------+--------+-----+-------------+----------+-----+---+----+------+-----+-----------+
| 0| 0.1|167.0| 10.0| 155.0| 1| 0|10.0| 167.0|-12.0| 167.0|
| 1| null| null| null| null| 1| 1|10.0| 167.0|-12.0| 165.8|
| 2| null| null| null| null| 1| 2|10.0| 167.0|-12.0| 164.6|
| 3| null| null| null| null| 1| 3|10.0| 167.0|-12.0| 163.4|
| 4| null| null| null| null| 1| 4|10.0| 167.0|-12.0| 162.2|
| 5| null| null| null| null| 1| 5|10.0| 167.0|-12.0| 161.0|
| 6| null| null| null| null| 1| 6|10.0| 167.0|-12.0| 159.8|
| 7| null| null| null| null| 1| 7|10.0| 167.0|-12.0| 158.6|
| 8| null| null| null| null| 1| 8|10.0| 167.0|-12.0| 157.4|
| 9| null| null| null| null| 1| 9|10.0| 167.0|-12.0| 156.2|
| 10| 9.7|155.0| 12.0| 175.0| 2| 0| 2.0| 155.0| 20.0| 155.0|
| 11| null| null| null| null| 2| 1| 2.0| 155.0| 20.0| 165.0|
| 12| 11.9|175.0| 16.0| 169.0| 3| 0| 4.0| 175.0| -6.0| 175.0|
| 13| null| null| null| null| 3| 1| 4.0| 175.0| -6.0| 173.5|
| 14| null| null| null| null| 3| 2| 4.0| 175.0| -6.0| 172.0|
| 15| null| null| null| null| 3| 3| 4.0| 175.0| -6.0| 170.5|
| 16| 16.2|169.0| 20.0| 177.0| 4| 0| 4.0| 169.0| 8.0| 169.0|
| 17| null| null| null| null| 4| 1| 4.0| 169.0| 8.0| 171.0|
| 18| null| null| null| null| 4| 2| 4.0| 169.0| 8.0| 173.0|
| 19| null| null| null| null| 4| 3| 4.0| 169.0| 8.0| 175.0|
+--------------+--------+-----+-------------+----------+-----+---+----+------+-----+-----------+
<强>2。 UDF
如果您不想使用窗口函数,您可以编写一个 UDF
来在 python
中进行插值,然后返回一个(位置,值)元组数组:
def interpolate(pos, next_pos, value, next_value):
if pos == next_pos or next_value is None:
return [(pos, value)]
return [[pos + i, value + i * (next_value - value) / (next_pos - pos)] for i in range(int(next_pos - pos))]
interpolate_udf = psf.udf(interpolate, pst.ArrayType(pst.StructType([pst.StructField(c, pst.FloatType()) for c in ['position_round', 'value_round']])))
请注意,元组的类型为 StructType
,以便更轻松地将元组“展平”为列。
w1 = Window.orderBy('position_round')
df_udf = df \
.select(
'*',
psf.lead('position_round', 1).over(w1).alias('next_position'),
psf.lead('value', 1).over(w1).alias('next_value')) \
.withColumn('tmp', psf.explode(interpolate_udf('position_round', 'next_position', 'value', 'next_value'))) \
.select('*', 'tmp.*').drop('tmp')
这是我们得到的:
+--------+-----+--------------+-------------+----------+--------------+----------+
|position|value|position_round|next_position|next_value|position_round|value_round|
+--------+-----+--------------+-------------+----------+--------------+----------+
| 0.1|167.0| 0.0| 10.0| 155.0| 0.0| 167.0|
| 0.1|167.0| 0.0| 10.0| 155.0| 1.0| 165.8|
| 0.1|167.0| 0.0| 10.0| 155.0| 2.0| 164.6|
| 0.1|167.0| 0.0| 10.0| 155.0| 3.0| 163.4|
| 0.1|167.0| 0.0| 10.0| 155.0| 4.0| 162.2|
| 0.1|167.0| 0.0| 10.0| 155.0| 5.0| 161.0|
| 0.1|167.0| 0.0| 10.0| 155.0| 6.0| 159.8|
| 0.1|167.0| 0.0| 10.0| 155.0| 7.0| 158.6|
| 0.1|167.0| 0.0| 10.0| 155.0| 8.0| 157.4|
| 0.1|167.0| 0.0| 10.0| 155.0| 9.0| 156.2|
| 9.7|155.0| 10.0| 12.0| 175.0| 10.0| 155.0|
| 9.7|155.0| 10.0| 12.0| 175.0| 11.0| 165.0|
| 11.9|175.0| 12.0| 16.0| 169.0| 12.0| 175.0|
| 11.9|175.0| 12.0| 16.0| 169.0| 13.0| 173.5|
| 11.9|175.0| 12.0| 16.0| 169.0| 14.0| 172.0|
| 11.9|175.0| 12.0| 16.0| 169.0| 15.0| 170.5|
| 16.2|169.0| 16.0| 20.0| 177.0| 16.0| 169.0|
| 16.2|169.0| 16.0| 20.0| 177.0| 17.0| 171.0|
| 16.2|169.0| 16.0| 20.0| 177.0| 18.0| 173.0|
| 16.2|169.0| 16.0| 20.0| 177.0| 19.0| 175.0|
+--------+-----+--------------+-------------+----------+--------------+----------+
关于python - 如何创建 Spark udf 将 float 插值到 INT 以及如何编写比我所做的更好的逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55771749/
我正在尝试使用 y 组合器在 Scala 中定义 gcd: object Main { def y[A,B]( f : (A => B) => A => B ) : A => B = f(y(f)
我正在尝试了解返回指向函数的指针的函数,在我尝试编译代码后,它给了我这种错误: cannot convert int (*(int))(int) to int (*(int))(int) in ass
所以我一直在关注 youtube 上的游戏编程教程,然后弹出了这段代码:bufferedImageObject.getRGB(int, int, int, int, int[], int, int);
我正在将时间现在 与存储在数据库某处的时间进行比较。数据库中存储的时间格式为“yyyyMMddHHmmss”。例如,数据库可能会为存储的时间值返回 201106203354。然后我使用一个函数将时间现
例如 Maze0.bmp (0,0) (319,239) 65 120 Maze0.bmp (0,0) (319,239) 65 120 (254,243,90) Maze0.bmp (0,0) (
评论 Steve Yegge的post关于 server-side Javascript开始讨论语言中类型系统的优点和这个 comment描述: ... examples from H-M style
我正在研究 C 的指针,从 Deitel 的书中我不明白 int(*function)(int,int) 和 int*function(int, int) 表示函数时。 最佳答案 C 中读取类型的经验
您好,我使用 weblogic 11g 创建 war 应用程序,我对 joda time 的方法有疑问 new DateTime(int, int, int, int, int, int); 这抛出了
Create a method called average that calculates the average of the numbers passed as parameters. The
var a11: Int = 0 var a12: Int = 0 var a21: Int = 0 var a22: Int = 0 var valueDeterminant = a11 * a12
我正在为一个项目设置 LED 阵列。我得到了一个 LED 阵列,可以根据引脚变化电压进行更改,但我无法添加更多引脚。 当我尝试时,编译失败并显示错误:函数“int getMode(int, int,
除了创建对列表执行简单操作的函数之外,我对 haskell 还是很陌生。我想创建一个列表,其中包含 Int 类型的内容, 和 Int -> Int -> Int 类型的函数. 这是我尝试过的: dat
这个问题已经有答案了: Java add buttons dynamically as an array [duplicate] (4 个回答) 已关闭 7 年前。 StackOverFlow问题今天
我有几个 EditText View ,我想在其中设置左侧的图像,而 setCompoundDrawablesWithIntrinsicBounds 似乎不起作用。图形似乎没有改变。 有人知道为什么会
#include using namespace std; int main() { static_assert(is_constructible, int(*)(int,int)>::val
fun sum(a: Int, b: Int) = a + b val x = 1.to(2) 我在找: sum.tupled(x),或者 sum(*x) 当然,以上都不能用 Kotlin 1.1.3
有一个函数: func (first: Int) -> Int -> Bool -> String { return ? } 返回值怎么写?我对上面 func 的返回类型感到很困惑。 最
type foo = A of int * int | B of (int * int) int * int 和 (int * int) 有什么区别?我看到的唯一区别在于模式匹配: let test_
我正在尝试制作一个 slider 游戏。在这个类中,我使用 Graphics 对象 g2 的 drawImage 方法来显示“拼图”的 block 。但在绘制类方法中,我收到此错误:找不到符号方法dr
我试着理解这个表达: static Func isOdd = i => (i & 1) == 1; 但是这是什么意思呢? 例如我有 i = 3。然后 (3 & 1) == 1 或 i = 4。然后
我是一名优秀的程序员,十分优秀!