gpt4 book ai didi

python - pyspark:数据框在另一个数据框的列中按ID选择行

转载 作者:行者123 更新时间:2023-12-01 08:00:49 24 4
gpt4 key购买 nike

我想要

  1. time_create==last_timestamp 过滤 df1,
  2. 根据 df1 中选定的 store_product_id 过滤 df2
<小时/>

这里我只用df1为例,

按 time_create 选择很好:

df1[df1.time_create==last_timestamp].show()

enter image description here

但是,使用选定的store_product_id,过滤原始数据帧df1给了我很多行。

df1[df1.store_product_id.isin(df1[df1.time_create==last_timestamp].store_product_id)].show()

enter image description here

我还尝试收集与 time_create==last_timestamp 匹配的 store_product_id 列表。

ids = df1[df1.time_create==last_timestamp].select('store_product_id').collect()
df1[df1.store_product_id.isin(ids)].show()

但出现错误:

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [01e8f3c0-3ad5-4b69-b46d-f5feb3cadd5f]
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:163)
at org.apache.spark.sql.functions$.typedLit(functions.scala:127)
at org.apache.spark.sql.functions$.lit(functions.scala:110)
at org.apache.spark.sql.functions.lit(functions.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

正确的做法是什么?

最佳答案

您正在寻找的函数是join。这是一个基于您的数据的简单示例:

import pyspark as sp
from pyspark.sql import SparkSession

samples = [{'store_product_id':1,'time_create':2,'last_timestamp':3},{'store_product_id':2,'time_create':2,'last_timestamp':2},{'store_product_id':3,'time_create':4,'last_timestamp':4},{'store_product_id':4,'time_create':2,'last_timestamp':5}]

spark = SparkSession \
.builder \
.appName('test') \
.getOrCreate()

df1 = spark.createDataFrame(samples)
df1.show()

这会产生:

+--------------+----------------+-----------+
|last_timestamp|store_product_id|time_create|
+--------------+----------------+-----------+
| 3| 1| 2|
| 2| 2| 2|
| 4| 3| 4|
| 5| 4| 2|
+--------------+----------------+-----------+

让我们按时间进行过滤并从中创建另一个数据框:

df2 = df1.filter(df1.time_create==df1.last_timestamp)
ids = df2.select('store_product_id').show()

+----------------+
|store_product_id|
+----------------+
| 2|
| 3|
+----------------+

这是我们在 store_product_id 上加入两个数据集的地方:

df3 = df1.join(df2,'store_product_id','inner').show()

+----------------+--------------+-----------+--------------+-----------+
|store_product_id|last_timestamp|time_create|last_timestamp|time_create|
+----------------+--------------+-----------+--------------+-----------+
| 3| 4| 4| 4| 4|
| 2| 2| 2| 2| 2|
+----------------+--------------+-----------+--------------+-----------+

内部联接根据store_product_id生成 df1 和 df2 的交集

关于python - pyspark:数据框在另一个数据框的列中按ID选择行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55743070/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com