gpt4 book ai didi

Pyspark:PicklingError:无法序列化对象:

转载 作者:行者123 更新时间:2023-12-04 11:19:05 26 4
gpt4 key购买 nike

我有以下两个数据帧:df_whitelist和df_text

+-------+--------------------+
|keyword| whitelist_terms |
+-------+--------------------+
| LA| LA city|
| LA| US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
| Text| Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+

在df_whitelist中,每个关键字对应一组术语,例如关键字LA对应于“LA city”和“US LA in da”。
在df_text中,我有文本和在此文本中找到的一些关键字。
我想做的是针对每段文字,例如“客户拥有ada ..”,针对其每个关键字“client”和“ada”,检查该关键字的所有白名单术语,以了解如何该术语多次出现在文本中。
我尝试过的内容如下:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
keywords = listOfKeyword.split(";")
found_whiteterms_count = 0
for k in keywords:
if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
found_whiteterms_count = found_whiteterms_count + 0
else:
df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
found_whiteterms_count = found_whiteterms_count + n
return found_whiteterms_count
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))

我得到了错误:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)

尝试了一段时间后,我不知道。任何人都可以帮助指出问题以及如何解决它。谢谢。

最佳答案

您正在将pyspark数据帧df_whitelist传递给UDF,无法对pyspark数据帧进行 pickle 。您还正在对UDF内的数据帧进行计算,这是 Not Acceptable (不可能)。请记住,函数的调用次数将与数据帧中的行数一样多,因此应使计算简单。并且仅在pyspark sql函数无法完成的情况下执行此操作。

您需要做的是在keyword上连接两个数据框。
让我们从您提供的两个示例数据帧开始:

df_whitelist = spark.createDataFrame(
[["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
[["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
["Text", "Keywords"])
Keywords中的 df_text列需要进行一些处理,我们必须将字符串转换为数组,然后对其进行分解,以便每行只有一项:

import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))

+-----------------+-------+
| Text|keyword|
+-----------------+-------+
|the client as ada| client|
|the client as ada| ada|
|this client has l| client|
|this client has l| LA|
+-----------------+-------+

现在我们可以在 keyword上连接两个数据帧:

df = df_text.join(df_whitelist, "keyword", "leftouter")

+-------+-----------------+-----------------+
|keyword| Text| whitelist_terms|
+-------+-----------------+-----------------+
| LA|this client has l| LA city|
| LA|this client has l| US LA in da|
| ada|the client as ada| null|
| client|the client as ada|this client has i|
| client|the client as ada| our client|
| client|this client has l|this client has i|
| client|this client has l| our client|
+-------+-----------------+-----------------+
  • 您可以在UDF中调用的第一个条件可以转换为以下内容:如果keyword中不存在df_text中的df_whitelist,则为0。这等效于说df_whitelist列的值将为NULL,因为它们仅出现在left join中。左侧数据框
  • 第二个条件:您计算whitelist_termsText中出现的次数:Text.count(whitelist_terms)

  • 我们将编写一个 UDF来做到这一点:

    from pyspark.sql.types import IntegerType
    count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
    df = df.select(
    "Text",
    "keyword",
    F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))

    +-----------------+-------+----------------+
    | Text|keyword|whitelist_counts|
    +-----------------+-------+----------------+
    |this client has l| LA| 0|
    |this client has l| LA| 0|
    |the client as ada| ada| 0|
    |the client as ada| client| 0|
    |the client as ada| client| 0|
    |this client has l| client| 0|
    |this client has l| client| 0|
    +-----------------+-------+----------------+

    最后,我们可以汇总以仅使用独特的 Text返回数据框:

    res = df.groupBy("Text").agg(
    F.collect_set("keyword").alias("Keywords"),
    F.sum("whitelist_counts").alias("whitelist_counts"))
    res.show()

    +-----------------+-------------+----------------+
    | Text| Keywords|whitelist_counts|
    +-----------------+-------------+----------------+
    |this client has l| [client, LA]| 0|
    |the client as ada|[ada, client]| 0|
    +-----------------+-------------+----------------+

    关于Pyspark:PicklingError:无法序列化对象:,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47249292/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com