- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有一些传感器数据按 channel 名称而不是传感器名称存储在表中(这是为了避免由于许多传感器仅在少数设备上使用而导致表格过宽 - 稀疏的工作列,我知道,但我只是数据的用户)。像这样:
from functools import reduce
import numpy as np
import pandas as pd
np.random.seed(0)
data_df = pd.DataFrame({
'id': ['a']*5 + ['b']*5 + ['c']*5,
'chan1': range(15),
'chan2': np.random.uniform(0, 10, size=15),
'chan3': np.random.uniform(0, 100, size=15)
})
还有第二个表告诉我们如何根据设备的特定 ID 将 channel 名称映射到传感器名称:
sensor_channel_df = pd.DataFrame([
{'id': 'a', 'channel': 'chan1', 'sensor': 'weight'},
{'id': 'a', 'channel': 'chan2', 'sensor': 'torque'},
{'id': 'a', 'channel': 'chan3', 'sensor': 'temp'},
{'id': 'b', 'channel': 'chan1', 'sensor': 'weight'},
{'id': 'b', 'channel': 'chan2', 'sensor': 'temp'},
{'id': 'b', 'channel': 'chan3', 'sensor': 'speed'},
{'id': 'c', 'channel': 'chan1', 'sensor': 'temp'},
{'id': 'c', 'channel': 'chan2', 'sensor': 'weight'},
{'id': 'c', 'channel': 'chan3', 'sensor': 'acceleration'},
])
我可以像这样创建一个重命名字典:
channel_rename_dict = sensor_channel_df.groupby('id')\
.apply(lambda grp: dict(zip(grp['channel'], grp['sensor'])))\
.to_dict()
然后用进一步的groupby
/apply
重命名所有列:
data_df.groupby('id')\
.apply(lambda group: group.rename(columns=channel_rename_dict[group.name]))\
.reset_index(level=0, drop=True)
我们得到这样的结果:
acceleration id speed temp torque weight
0 NaN a NaN 8.712930 5.488135 0.000000
1 NaN a NaN 2.021840 7.151894 1.000000
2 NaN a NaN 83.261985 6.027634 2.000000
3 NaN a NaN 77.815675 5.448832 3.000000
4 NaN a NaN 87.001215 4.236548 4.000000
5 NaN b 97.861834 6.458941 NaN 5.000000
6 NaN b 79.915856 4.375872 NaN 6.000000
7 NaN b 46.147936 8.917730 NaN 7.000000
8 NaN b 78.052918 9.636628 NaN 8.000000
9 NaN b 11.827443 3.834415 NaN 9.000000
10 63.992102 c NaN 10.000000 NaN 7.917250
11 14.335329 c NaN 11.000000 NaN 5.288949
12 94.466892 c NaN 12.000000 NaN 5.680446
13 52.184832 c NaN 13.000000 NaN 9.255966
14 41.466194 c NaN 14.000000 NaN 0.710361
这一切都很好(尽管得知在 pandas 中有更好的方法我不会感到惊讶),我用它向一些同事演示了这个过程的逻辑。
但是,对于项目架构,我们决定使用 spark。有没有一种方法可以在 Spark 数据帧中实现同样的行为?
我最初的想法是首先缓存
完整的data_df
,然后用filter
分解id
上的dataframe >。例如,假设 data_df
现在是一个 spark 数据帧:
data_df.cache()
unique_ids = data_df.select('id').distinct().rdd.map(lambda row: row[0]).collect()
split_dfs = {id: data_df.filter(data_df['id'] == id) for id in unique_ids}
然后,如果我们像以前一样有列重命名字典,我们可以执行以下操作:
dfs_paired_with_rename_tuple_lists = [
(split_dfs[id], list(channel_rename_dict[id].items()))
for id in unique_ids
]
new_dfs = [
reduce(lambda df_i, rename_tuple: df_i.withColumnRenamed(*rename_tuple), rename_tuple_list, df)
for df, rename_tuple_list in dfs_paired_with_rename_tuple_lists
]
在确保它们具有公共(public)列之后,我可以在这个 spark Dataframes 列表上使用 Union()
执行 reduce
。
我的感觉是这会非常慢,而且可能有更好的方法来解决这个问题。
最佳答案
首先,让我们将映射重新定义为按channel
分组并返回MapType
Column
(toolz
很方便,但可以替换为 itertools.chain
)*:
from toolz import concat, interleave
from pyspark.sql.functions import col, create_map, lit, struct
# Create literal column from id to sensor -> channel map
channel_map = create_map(*concat((lit(k), v) for k, v in sensor_channel_df
.groupby("id")
# Create map Column from literal label to channel
.apply(lambda grp: create_map(*interleave([
map(lit, grp["sensor"]),
map(col, grp["channel"])])))
.to_dict()
.items()))
接下来,获取传感器列表:
sensors = sorted(sensor_channel_df["sensor"].unique().tolist())
并合并数据列:
df = spark.createDataFrame(data_df)
data_cols = struct(*[c for c in df.columns if c != "id"])
上面定义的组件可以组合:
cols = [channel_map[col("id")][sensor].alias(sensor) for sensor in sensors]
df.select(["id"] + cols)
+---+------------------+------------------+------------------+------------------+------------------+
| id| acceleration| speed| temp| torque| weight|
+---+------------------+------------------+------------------+------------------+------------------+
| a| null| null| 8.712929970154072|5.4881350392732475| 0.0|
| a| null| null| 2.021839744032572| 7.151893663724195| 1.0|
| a| null| null| 83.2619845547938| 6.027633760716439| 2.0|
| a| null| null| 77.81567509498505| 5.448831829968968| 3.0|
| a| null| null| 87.00121482468191| 4.236547993389047| 4.0|
| b| null| 97.8618342232764| 6.458941130666561| null| 5.0|
| b| null| 79.91585642167236| 4.375872112626925| null| 6.0|
| b| null|46.147936225293186| 8.917730007820797| null| 7.0|
| b| null| 78.05291762864555| 9.636627605010293| null| 8.0|
| b| null|11.827442586893323|3.8344151882577773| null| 9.0|
| c| 63.99210213275238| null| 10.0| null| 7.917250380826646|
| c| 14.33532874090464| null| 11.0| null| 5.288949197529044|
| c| 94.46689170495839| null| 12.0| null| 5.680445610939323|
| c|52.184832175007166| null| 13.0| null| 9.25596638292661|
| c| 41.46619399905236| null| 14.0| null|0.7103605819788694|
+---+------------------+------------------+------------------+------------------+------------------+
也可以使用 udf
,尽管效率较低:
from toolz import unique
from pyspark.sql.types import *
from pyspark.sql.functions import udf
channel_dict = (sensor_channel_df
.groupby("id")
.apply(lambda grp: dict(zip(grp["sensor"], grp["channel"])))
.to_dict())
def remap(d):
fields = sorted(unique(concat(_.keys() for _ in d.values())))
schema = StructType([StructField(f, DoubleType()) for f in fields])
def _(row, id):
return tuple(float(row[d[id].get(f)]) if d[id].get(f) is not None
else None for f in fields)
return udf(_, schema)
(df
.withColumn("vals", remap(channel_dict)(data_cols, "id"))
.select("id", "vals.*"))
+---+------------------+------------------+------------------+------------------+------------------+
| id| acceleration| speed| temp| torque| weight|
+---+------------------+------------------+------------------+------------------+------------------+
| a| null| null| 8.712929970154072|5.4881350392732475| 0.0|
| a| null| null| 2.021839744032572| 7.151893663724195| 1.0|
| a| null| null| 83.2619845547938| 6.027633760716439| 2.0|
| a| null| null| 77.81567509498505| 5.448831829968968| 3.0|
| a| null| null| 87.00121482468191| 4.236547993389047| 4.0|
| b| null| 97.8618342232764| 6.458941130666561| null| 5.0|
| b| null| 79.91585642167236| 4.375872112626925| null| 6.0|
| b| null|46.147936225293186| 8.917730007820797| null| 7.0|
| b| null| 78.05291762864555| 9.636627605010293| null| 8.0|
| b| null|11.827442586893323|3.8344151882577773| null| 9.0|
| c| 63.99210213275238| null| 10.0| null| 7.917250380826646|
| c| 14.33532874090464| null| 11.0| null| 5.288949197529044|
| c| 94.46689170495839| null| 12.0| null| 5.680445610939323|
| c|52.184832175007166| null| 13.0| null| 9.25596638292661|
| c| 41.46619399905236| null| 14.0| null|0.7103605819788694|
+---+------------------+------------------+------------------+------------------+------------------+
在 Spark 2.3 或更高版本中,您可以使用 vectorized UDF 应用当前代码。
* 为了理解这里发生了什么,让我们看一下由 apply
处理的单个组:
grp = sensor_channel_df.groupby("id").get_group("a")
首先,我们将 sensor
传感器列转换为一系列 Spark 文字 Columns
(考虑常量值):
keys = list(map(lit, grp["sensor"]))
keys
Column<b'weight'>, Column<b'torque'>, Column<b'temp'>]
和 sensor
列到 Spark Columns
序列(考虑指向数据的指针):
values = list(map(col, grp["channel"]))
values
[Column<b'chan1'>, Column<b'chan2'>, Column<b'chan3'>]
当在上下文中评估时,前一个将导致恒定输出:
df_ = df.drop_duplicates(subset=["id"])
df_.select(keys).show()
+------+------+----+
|weight|torque|temp|
+------+------+----+
|weight|torque|temp|
|weight|torque|temp|
|weight|torque|temp|
+------+------+----+
而后者会重复数据:
df_.select(values).show(3)
+-----+------------------+-----------------+
|chan1| chan2| chan3|
+-----+------------------+-----------------+
| 10| 7.917250380826646|63.99210213275238|
| 5| 6.458941130666561| 97.8618342232764|
| 0|5.4881350392732475|8.712929970154072|
+-----+------------------+-----------------+
接下来我们将这两个交错并组合成一个 MapType
列:
mapping = create_map(*interleave([keys, values]))
mapping
Column<b'map(weight, chan1, torque, chan2, temp, chan3)'>
这为我们提供了从度量名称到数据列的映射(想想 Python dict
),并且在计算时:
df_.select(mapping).show(3, False)
+---------------------------------------------------------------------------+
|map(weight, chan1, torque, chan2, temp, chan3) |
+---------------------------------------------------------------------------+
|Map(weight -> 10.0, torque -> 7.917250380826646, temp -> 63.99210213275238)|
|Map(weight -> 5.0, torque -> 6.458941130666561, temp -> 97.8618342232764) |
|Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072)|
+---------------------------------------------------------------------------+
最后外部理解对所有组重复此操作,因此 channel_map
是一个 Column
:
Column<b'map(a, map(weight, chan1, torque, chan2, temp, chan3), b, map(weight, chan1, temp, chan2, speed, chan3), c, map(temp, chan1, weight, chan2, acceleration, chan3))'>
评估给出了以下结构:
df_.select(channel_map.alias("channel_map")).show(3, False)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Map(a -> Map(weight -> 10.0, torque -> 7.917250380826646, temp -> 63.99210213275238), b -> Map(weight -> 10.0, temp -> 7.917250380826646, speed -> 63.99210213275238), c -> Map(temp -> 10.0, weight -> 7.917250380826646, acceleration -> 63.99210213275238))|
|Map(a -> Map(weight -> 5.0, torque -> 6.458941130666561, temp -> 97.8618342232764), b -> Map(weight -> 5.0, temp -> 6.458941130666561, speed -> 97.8618342232764), c -> Map(temp -> 5.0, weight -> 6.458941130666561, acceleration -> 97.8618342232764)) |
|Map(a -> Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072), b -> Map(weight -> 0.0, temp -> 5.4881350392732475, speed -> 8.712929970154072), c -> Map(temp -> 0.0, weight -> 5.4881350392732475, acceleration -> 8.712929970154072))|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
最后我们使用id
列来选择感兴趣的map
:
df_.select(channel_map[col("id")].alias("data_mapping")).show(3, False)
+---------------------------------------------------------------------------------+
|data_mapping |
+---------------------------------------------------------------------------------+
|Map(temp -> 10.0, weight -> 7.917250380826646, acceleration -> 63.99210213275238)|
|Map(weight -> 5.0, temp -> 6.458941130666561, speed -> 97.8618342232764) |
|Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072) |
+---------------------------------------------------------------------------------+
和列名以从 map
中提取值:
df_.select(channel_map[col("id")]["weight"].alias("weight")).show(3, False)
+-----------------+
|weight |
+-----------------+
|7.917250380826646|
|5.0 |
|0.0 |
+-----------------+
归根结底,这只是对包含符号表达式的数据结构进行的一系列简单转换。
关于python - 如何在 Spark Dataframe 中按组/分区重命名列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48124389/
我是一名优秀的程序员,十分优秀!